You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/19 18:28:17 UTC

cassandra git commit: Move all hints related tasks to hints internal executor

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 621899355 -> 351e35b64


Move all hints related tasks to hints internal executor

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-8285


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351e35b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351e35b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351e35b6

Branch: refs/heads/cassandra-2.0
Commit: 351e35b64ac7f364bd4149da399f86b7f36f5ae9
Parents: 6218993
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Dec 19 20:27:32 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Dec 19 20:27:32 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DebuggableScheduledThreadPoolExecutor.java  |   5 +
 .../JMXEnabledScheduledThreadPoolExecutor.java  | 137 +++++++++++++++++++
 ...EnabledScheduledThreadPoolExecutorMBean.java |  26 ++++
 .../cassandra/db/HintedHandOffManager.java      |  25 ++--
 5 files changed, 179 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 516b4a2..1ad2de5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Move all hints related tasks to hints internal executor (CASSANDRA-8285)
  * Fix paging for multi-partition IN queries (CASSANDRA-8408)
  * Fix MOVED_NODE topology event never being emitted when a node
    moves its token (CASSANDRA-8373)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
index a41df54..1699c0f 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
@@ -33,6 +33,11 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
         super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
     }
 
+    public DebuggableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
+    {
+        super(corePoolSize, threadFactory);
+    }
+
     public DebuggableScheduledThreadPoolExecutor(String threadPoolName)
     {
         this(1, threadPoolName, Thread.NORM_PRIORITY);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
new file mode 100644
index 0000000..64d9267
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.metrics.ThreadPoolMetrics;
+
+/**
+ * A JMX enabled wrapper for DebuggableScheduledThreadPoolExecutor.
+ */
+public class JMXEnabledScheduledThreadPoolExecutor extends DebuggableScheduledThreadPoolExecutor implements JMXEnabledScheduledThreadPoolExecutorMBean
+{
+    private final String mbeanName;
+    private final ThreadPoolMetrics metrics;
+
+    public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, NamedThreadFactory threadFactory, String jmxPath)
+    {
+        super(corePoolSize, threadFactory);
+
+        metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);
+
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
+
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(mbeanName));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void unregisterMBean()
+    {
+        try
+        {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        // release metrics
+        metrics.release();
+    }
+
+    @Override
+    public synchronized void shutdown()
+    {
+        // synchronized, because there is no way to access super.mainLock, which would be
+        // the preferred way to make this threadsafe
+        if (!isShutdown())
+            unregisterMBean();
+
+        super.shutdown();
+    }
+
+    @Override
+    public synchronized List<Runnable> shutdownNow()
+    {
+        // synchronized, because there is no way to access super.mainLock, which would be
+        // the preferred way to make this threadsafe
+        if (!isShutdown())
+            unregisterMBean();
+
+        return super.shutdownNow();
+    }
+
+    /**
+     * Get the number of completed tasks
+     */
+    public long getCompletedTasks()
+    {
+        return getCompletedTaskCount();
+    }
+
+    /**
+     * Get the number of tasks waiting to be executed
+     */
+    public long getPendingTasks()
+    {
+        return getTaskCount() - getCompletedTaskCount();
+    }
+
+    public int getTotalBlockedTasks()
+    {
+        return (int) metrics.totalBlocked.count();
+    }
+
+    public int getCurrentlyBlockedTasks()
+    {
+        return (int) metrics.currentBlocked.count();
+    }
+
+    public int getCoreThreads()
+    {
+        return getCorePoolSize();
+    }
+
+    public void setCoreThreads(int number)
+    {
+        setCorePoolSize(number);
+    }
+
+    public int getMaximumThreads()
+    {
+        return getMaximumPoolSize();
+    }
+
+    public void setMaximumThreads(int number)
+    {
+        setMaximumPoolSize(number);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
new file mode 100644
index 0000000..d9c45e3
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+/**
+ * @see org.apache.cassandra.metrics.ThreadPoolMetrics
+ */
+@Deprecated
+public interface JMXEnabledScheduledThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean
+{
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 87260b2..c8c3845 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -34,20 +34,16 @@ import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -106,12 +102,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
 
-    private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(),
-                                                                                 Integer.MAX_VALUE,
-                                                                                 TimeUnit.SECONDS,
-                                                                                 new LinkedBlockingQueue<Runnable>(),
-                                                                                 new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
-                                                                                 "internal");
+    private final JMXEnabledScheduledThreadPoolExecutor executor =
+        new JMXEnabledScheduledThreadPoolExecutor(
+            DatabaseDescriptor.getMaxHintsThread(),
+            new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
+            "internal");
 
     private final ColumnFamilyStore hintStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.HINTS_CF);
 
@@ -174,7 +169,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 metrics.log();
             }
         };
-        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
+        executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
     }
 
     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
@@ -225,7 +220,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 }
             }
         };
-        StorageService.optionalTasks.submit(runnable);
+        executor.submit(runnable);
     }
 
     //foobar
@@ -246,7 +241,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 }
             }
         };
-        StorageService.optionalTasks.submit(runnable).get();
+        executor.submit(runnable).get();
 
     }