You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/07 17:45:35 UTC

[02/15] cassandra git commit: Separate pool for hint delivery tasks in HintedHandoffManager

Separate pool for hint delivery tasks in HintedHandoffManager


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c9587cd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c9587cd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c9587cd2

Branch: refs/heads/cassandra-2.1
Commit: c9587cd2bd1ac60d8cd8552592ac16f9c7ddd3b2
Parents: 45bd07f
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Aug 4 17:46:15 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Aug 7 15:36:40 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../JMXEnabledScheduledThreadPoolExecutor.java  | 137 -------------------
 ...EnabledScheduledThreadPoolExecutorMBean.java |  26 ----
 .../cassandra/db/HintedHandOffManager.java      |  25 +++-
 4 files changed, 20 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a8cf796..fe060af 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.17
+ * Remove erroneous pending HH tasks from tpstats/jmx (CASSANDRA-9129)
  * Don't cast expected bf size to an int (CASSANDRA-9959)
  * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
  * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
deleted file mode 100644
index 64d9267..0000000
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-import java.lang.management.ManagementFactory;
-import java.util.List;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.cassandra.metrics.ThreadPoolMetrics;
-
-/**
- * A JMX enabled wrapper for DebuggableScheduledThreadPoolExecutor.
- */
-public class JMXEnabledScheduledThreadPoolExecutor extends DebuggableScheduledThreadPoolExecutor implements JMXEnabledScheduledThreadPoolExecutorMBean
-{
-    private final String mbeanName;
-    private final ThreadPoolMetrics metrics;
-
-    public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, NamedThreadFactory threadFactory, String jmxPath)
-    {
-        super(corePoolSize, threadFactory);
-
-        metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);
-
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
-
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void unregisterMBean()
-    {
-        try
-        {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        // release metrics
-        metrics.release();
-    }
-
-    @Override
-    public synchronized void shutdown()
-    {
-        // synchronized, because there is no way to access super.mainLock, which would be
-        // the preferred way to make this threadsafe
-        if (!isShutdown())
-            unregisterMBean();
-
-        super.shutdown();
-    }
-
-    @Override
-    public synchronized List<Runnable> shutdownNow()
-    {
-        // synchronized, because there is no way to access super.mainLock, which would be
-        // the preferred way to make this threadsafe
-        if (!isShutdown())
-            unregisterMBean();
-
-        return super.shutdownNow();
-    }
-
-    /**
-     * Get the number of completed tasks
-     */
-    public long getCompletedTasks()
-    {
-        return getCompletedTaskCount();
-    }
-
-    /**
-     * Get the number of tasks waiting to be executed
-     */
-    public long getPendingTasks()
-    {
-        return getTaskCount() - getCompletedTaskCount();
-    }
-
-    public int getTotalBlockedTasks()
-    {
-        return (int) metrics.totalBlocked.count();
-    }
-
-    public int getCurrentlyBlockedTasks()
-    {
-        return (int) metrics.currentBlocked.count();
-    }
-
-    public int getCoreThreads()
-    {
-        return getCorePoolSize();
-    }
-
-    public void setCoreThreads(int number)
-    {
-        setCorePoolSize(number);
-    }
-
-    public int getMaximumThreads()
-    {
-        return getMaximumPoolSize();
-    }
-
-    public void setMaximumThreads(int number)
-    {
-        setMaximumPoolSize(number);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
deleted file mode 100644
index d9c45e3..0000000
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.concurrent;
-
-/**
- * @see org.apache.cassandra.metrics.ThreadPoolMetrics
- */
-@Deprecated
-public interface JMXEnabledScheduledThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean
-{
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c9587cd2/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index c8c3845..dab0f75 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -37,7 +37,8 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
@@ -56,7 +57,9 @@ import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
@@ -102,9 +105,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
 
-    private final JMXEnabledScheduledThreadPoolExecutor executor =
-        new JMXEnabledScheduledThreadPoolExecutor(
+    // To keep metrics consistent with earlier versions, where periodic tasks were run on a shared executor,
+    // we run them on this executor and so keep counts separate from those for hint delivery tasks. See CASSANDRA-9129
+    private final DebuggableScheduledThreadPoolExecutor executor =
+        new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("HintedHandoffManager", Thread.MIN_PRIORITY));
+
+    // Non-scheduled executor to run the actual hint delivery tasks.
+    // Per CASSANDRA-9129, this is where the values displayed in nodetool tpstats
+    // and via the HintedHandoff mbean are obtained.
+    private final ThreadPoolExecutor hintDeliveryExecutor =
+        new JMXEnabledThreadPoolExecutor(
             DatabaseDescriptor.getMaxHintsThread(),
+            Integer.MAX_VALUE,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
             new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
             "internal");
 
@@ -242,7 +256,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             }
         };
         executor.submit(runnable).get();
-
     }
 
     @VisibleForTesting
@@ -534,7 +547,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
         logger.debug("Scheduling delivery of Hints to {}", to);
 
-        executor.execute(new Runnable()
+        hintDeliveryExecutor.execute(new Runnable()
         {
             public void run()
             {