You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/19 18:28:17 UTC
cassandra git commit: Move all hints related tasks to hints internal
executor
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 621899355 -> 351e35b64
Move all hints related tasks to hints internal executor
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-8285
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351e35b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351e35b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351e35b6
Branch: refs/heads/cassandra-2.0
Commit: 351e35b64ac7f364bd4149da399f86b7f36f5ae9
Parents: 6218993
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Dec 19 20:27:32 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Dec 19 20:27:32 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DebuggableScheduledThreadPoolExecutor.java | 5 +
.../JMXEnabledScheduledThreadPoolExecutor.java | 137 +++++++++++++++++++
...EnabledScheduledThreadPoolExecutorMBean.java | 26 ++++
.../cassandra/db/HintedHandOffManager.java | 25 ++--
5 files changed, 179 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 516b4a2..1ad2de5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Move all hints related tasks to hints internal executor (CASSANDRA-8285)
* Fix paging for multi-partition IN queries (CASSANDRA-8408)
* Fix MOVED_NODE topology event never being emitted when a node
moves its token (CASSANDRA-8373)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
index a41df54..1699c0f 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
@@ -33,6 +33,11 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
}
+ public DebuggableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
+ {
+ super(corePoolSize, threadFactory);
+ }
+
public DebuggableScheduledThreadPoolExecutor(String threadPoolName)
{
this(1, threadPoolName, Thread.NORM_PRIORITY);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
new file mode 100644
index 0000000..64d9267
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.metrics.ThreadPoolMetrics;
+
+/**
+ * A JMX enabled wrapper for DebuggableScheduledThreadPoolExecutor.
+ */
+public class JMXEnabledScheduledThreadPoolExecutor extends DebuggableScheduledThreadPoolExecutor implements JMXEnabledScheduledThreadPoolExecutorMBean
+{
+ private final String mbeanName;
+ private final ThreadPoolMetrics metrics;
+
+ public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, NamedThreadFactory threadFactory, String jmxPath)
+ {
+ super(corePoolSize, threadFactory);
+
+ metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
+
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void unregisterMBean()
+ {
+ try
+ {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // release metrics
+ metrics.release();
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ // synchronized, because there is no way to access super.mainLock, which would be
+ // the preferred way to make this threadsafe
+ if (!isShutdown())
+ unregisterMBean();
+
+ super.shutdown();
+ }
+
+ @Override
+ public synchronized List<Runnable> shutdownNow()
+ {
+ // synchronized, because there is no way to access super.mainLock, which would be
+ // the preferred way to make this threadsafe
+ if (!isShutdown())
+ unregisterMBean();
+
+ return super.shutdownNow();
+ }
+
+ /**
+ * Get the number of completed tasks
+ */
+ public long getCompletedTasks()
+ {
+ return getCompletedTaskCount();
+ }
+
+ /**
+ * Get the number of tasks waiting to be executed
+ */
+ public long getPendingTasks()
+ {
+ return getTaskCount() - getCompletedTaskCount();
+ }
+
+ public int getTotalBlockedTasks()
+ {
+ return (int) metrics.totalBlocked.count();
+ }
+
+ public int getCurrentlyBlockedTasks()
+ {
+ return (int) metrics.currentBlocked.count();
+ }
+
+ public int getCoreThreads()
+ {
+ return getCorePoolSize();
+ }
+
+ public void setCoreThreads(int number)
+ {
+ setCorePoolSize(number);
+ }
+
+ public int getMaximumThreads()
+ {
+ return getMaximumPoolSize();
+ }
+
+ public void setMaximumThreads(int number)
+ {
+ setMaximumPoolSize(number);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
new file mode 100644
index 0000000..d9c45e3
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+/**
+ * @see org.apache.cassandra.metrics.ThreadPoolMetrics
+ */
+@Deprecated
+public interface JMXEnabledScheduledThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean
+{
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 87260b2..c8c3845 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -34,20 +34,16 @@ import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -106,12 +102,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
- private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(),
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
- "internal");
+ private final JMXEnabledScheduledThreadPoolExecutor executor =
+ new JMXEnabledScheduledThreadPoolExecutor(
+ DatabaseDescriptor.getMaxHintsThread(),
+ new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
+ "internal");
private final ColumnFamilyStore hintStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.HINTS_CF);
@@ -174,7 +169,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
metrics.log();
}
};
- StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
+ executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
@@ -225,7 +220,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
};
- StorageService.optionalTasks.submit(runnable);
+ executor.submit(runnable);
}
//foobar
@@ -246,7 +241,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
};
- StorageService.optionalTasks.submit(runnable).get();
+ executor.submit(runnable).get();
}