You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/11/16 18:42:47 UTC
[3/3] cassandra git commit: Introduce in-jvm distributed tests
Introduce in-jvm distributed tests
Patch by Alex Petrov and Benedict Elliott Smith; reviewed by Benedict Elliott Smith and Dinesh Joshi for CASSANDRA-14821.
Co-authored-by: Benedict Elliott Smith <be...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f22fec92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f22fec92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f22fec92
Branch: refs/heads/trunk
Commit: f22fec927de7ac291266660c2f34de5b8cc1c695
Parents: 7877035
Author: Alex Petrov <ol...@gmail.com>
Authored: Fri Nov 16 19:41:58 2018 +0100
Committer: Alex Petrov <ol...@gmail.com>
Committed: Fri Nov 16 19:41:58 2018 +0100
----------------------------------------------------------------------
.circleci/config.yml | 14 +-
build.xml | 10 +-
ide/idea-iml-file.xml | 1 +
.../org/apache/cassandra/auth/AuthCache.java | 33 +-
.../cassandra/batchlog/BatchlogManager.java | 48 ++-
.../concurrent/InfiniteLoopExecutor.java | 83 ++++
.../JMXEnabledThreadPoolExecutor.java | 24 +-
.../concurrent/ScheduledExecutors.java | 15 +
.../concurrent/SharedExecutorPool.java | 15 +-
.../cassandra/concurrent/StageManager.java | 10 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/cql3/QueryProcessor.java | 6 +-
.../cassandra/db/BlacklistedDirectories.java | 17 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 48 ++-
.../cassandra/db/HintedHandOffManager.java | 15 +-
.../cassandra/db/commitlog/CommitLog.java | 15 +-
.../db/compaction/CompactionManager.java | 18 +-
.../cassandra/diag/DiagnosticEventService.java | 13 +-
.../cassandra/diag/LastEventIdBroadcaster.java | 16 +-
.../apache/cassandra/gms/FailureDetector.java | 14 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 14 +-
.../apache/cassandra/hints/HintsService.java | 15 +-
.../cassandra/index/SecondaryIndexManager.java | 10 +
.../io/sstable/IndexSummaryManager.java | 30 +-
.../cassandra/io/util/DataInputBuffer.java | 8 +-
.../locator/DynamicEndpointSnitch.java | 24 +-
.../cassandra/locator/EndpointSnitchInfo.java | 16 +-
.../metrics/CassandraMetricsRegistry.java | 30 +-
.../cassandra/net/ForwardToContainer.java | 3 +-
.../org/apache/cassandra/net/MessageIn.java | 12 +-
.../apache/cassandra/net/MessagingService.java | 28 +-
.../net/async/ByteBufDataInputPlus.java | 8 +
.../cassandra/net/async/MessageInHandler.java | 67 +++-
.../net/async/MessageInHandlerPre40.java | 50 ++-
.../cassandra/net/async/NettyFactory.java | 12 +-
.../apache/cassandra/schema/SchemaEvent.java | 5 +-
.../cassandra/service/ActiveRepairService.java | 15 +-
.../apache/cassandra/service/CacheService.java | 16 +-
.../cassandra/service/CassandraDaemon.java | 11 +-
.../service/PendingRangeCalculatorService.java | 9 +
.../apache/cassandra/service/StorageProxy.java | 15 +-
.../cassandra/service/StorageService.java | 16 +-
.../apache/cassandra/utils/ByteBufferUtil.java | 26 ++
.../apache/cassandra/utils/MBeanWrapper.java | 179 +++++++++
.../org/apache/cassandra/utils/Mx4jTool.java | 4 +-
.../apache/cassandra/utils/concurrent/Ref.java | 35 +-
.../cassandra/utils/memory/BufferPool.java | 43 +-
.../utils/memory/MemtableCleanerThread.java | 77 ++--
.../cassandra/utils/memory/MemtablePool.java | 9 +
test/conf/logback-dtest.xml | 79 ++++
.../cassandra/distributed/Coordinator.java | 80 ++++
.../DistributedReadWritePathTest.java | 348 ++++++++++++++++
.../distributed/DistributedTestBase.java | 86 ++++
.../apache/cassandra/distributed/Instance.java | 399 +++++++++++++++++++
.../distributed/InstanceClassLoader.java | 101 +++++
.../cassandra/distributed/InstanceConfig.java | 87 ++++
.../distributed/InstanceIDDefiner.java | 38 ++
.../distributed/InvokableInstance.java | 133 +++++++
.../apache/cassandra/distributed/Message.java | 41 ++
.../cassandra/distributed/MessageFilters.java | 175 ++++++++
.../apache/cassandra/distributed/RowUtil.java | 47 +++
.../cassandra/distributed/TestCluster.java | 308 ++++++++++++++
.../cassandra/net/async/NettyFactoryTest.java | 2 +-
.../async/StreamCompressionSerializerTest.java | 4 -
64 files changed, 2661 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/.circleci/config.yml
----------------------------------------------------------------------
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 430354a..3b2b978 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -164,12 +164,20 @@ jobs:
# get all of our unit test filenames
set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+ # append distributed tests
+ set -eo pipefail && circleci tests glob "$HOME/cassandra/test/distributed/**/*.java" > /tmp/all_java_distributed_tests.txt
# split up the unit tests into groups based on the number of containers we have
set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | cut -c 37-1000000 | grep "Test\.java$" > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+
+ set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_distributed_tests.txt > /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt
+ set +eo pipefail && cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt | cut -c 44-1000000 | grep "Test\.java$" > /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+ echo "** /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt"
+ cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+
- run:
name: Run Unit Tests
command: |
@@ -181,7 +189,11 @@ jobs:
time mv ~/cassandra /tmp
cd /tmp/cassandra
- ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+ ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=unit
+
+ if [ -s "/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt" ]; then
+ ant testclasslist -Dtest.classlistfile=/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=distributed
+ fi
no_output_timeout: 15m
- store_test_results:
path: /tmp/cassandra/build/test/output/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 3d3014c..f24647e 100644
--- a/build.xml
+++ b/build.xml
@@ -56,12 +56,14 @@
<property name="test.data" value="${test.dir}/data"/>
<property name="test.name" value="*Test"/>
<property name="test.classlistfile" value="testlist.txt"/>
+ <property name="test.classlistprefix" value="unit"/>
<property name="benchmark.name" value=""/>
<property name="test.methods" value=""/>
<property name="test.unit.src" value="${test.dir}/unit"/>
<property name="test.long.src" value="${test.dir}/long"/>
<property name="test.burn.src" value="${test.dir}/burn"/>
<property name="test.microbench.src" value="${test.dir}/microbench"/>
+ <property name="test.distributed.src" value="${test.dir}/distributed"/>
<property name="dist.dir" value="${build.dir}/dist"/>
<property name="tmp.dir" value="${java.io.tmpdir}"/>
@@ -103,6 +105,7 @@
<property name="test.timeout" value="240000" />
<property name="test.long.timeout" value="600000" />
<property name="test.burn.timeout" value="60000000" />
+ <property name="test.distributed.timeout" value="600000" />
<!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
<property name="cassandra.test.use_prepared" value="true" />
@@ -1253,6 +1256,7 @@
<src path="${test.long.src}"/>
<src path="${test.burn.src}"/>
<src path="${test.microbench.src}"/>
+ <src path="${test.distributed.src}"/>
</javac>
<!-- Non-java resources needed by the test suite -->
@@ -1364,7 +1368,7 @@
<attribute name="test.file.list"/>
<attribute name="testlist.offset"/>
<sequential>
- <testmacrohelper inputdir="${test.unit.src}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
+ <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
<jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
<jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
@@ -1468,6 +1472,7 @@
</concat>
<path id="all-test-classes-path">
<fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
+ <fileset dir="${test.distributed.src}" includes="**/${test.name}.java" />
</path>
<property name="all-test-classes" refid="all-test-classes-path"/>
<testparallel testdelegate="testlist-compression" />
@@ -1844,7 +1849,7 @@
e.g. org/apache/cassandra/hints/HintMessageTest.java -->
<target name="testclasslist" depends="build-test" description="Parallel-run tests given in file -Dtest.classlistfile (one-class-per-line, e.g. org/apache/cassandra/db/SomeTest.java)">
<path id="all-test-classes-path">
- <fileset dir="${test.unit.src}" includesfile="${test.classlistfile}"/>
+ <fileset dir="${test.dir}/${test.classlistprefix}" includesfile="${test.classlistfile}"/>
</path>
<property name="all-test-classes" refid="all-test-classes-path"/>
<testparallel testdelegate="testlist"/>
@@ -1939,6 +1944,7 @@
<classpathentry kind="src" path="conf" including="hotspot_compiler"/>
<classpathentry kind="src" output="build/test/classes" path="test/unit"/>
<classpathentry kind="src" output="build/test/classes" path="test/long"/>
+ <classpathentry kind="src" output="build/test/classes" path="test/distributed"/>
<classpathentry kind="src" output="build/test/classes" path="test/resources" />
<classpathentry kind="src" path="tools/stress/src"/>
<classpathentry kind="src" path="tools/fqltool/src"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/ide/idea-iml-file.xml
----------------------------------------------------------------------
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index b83abfa..0045ae6 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -35,6 +35,7 @@
<sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" />
<sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" />
<excludeFolder url="file://$MODULE_DIR$/.idea" />
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/auth/AuthCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java b/src/java/org/apache/cassandra/auth/AuthCache.java
index 3adf914..4bf15c1 100644
--- a/src/java/org/apache/cassandra/auth/AuthCache.java
+++ b/src/java/org/apache/cassandra/auth/AuthCache.java
@@ -18,22 +18,19 @@
package org.apache.cassandra.auth;
-import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntSupplier;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.MBeanWrapper;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -97,33 +94,17 @@ public class AuthCache<K, V> implements AuthCacheMBean
protected void init()
{
cache = initCache(null);
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, getObjectName());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, getObjectName());
}
protected void unregisterMBean()
{
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.unregisterMBean(getObjectName());
- }
- catch (Exception e)
- {
- logger.warn("Error unregistering {} cache mbean", name, e);
- }
+ MBeanWrapper.instance.unregisterMBean(getObjectName(), MBeanWrapper.OnException.LOG);
}
- protected ObjectName getObjectName() throws MalformedObjectNameException
+ protected String getObjectName()
{
- return new ObjectName(MBEAN_NAME_BASE + name);
+ return MBEAN_NAME_BASE + name;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 91129ed..b2b851d 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -18,28 +18,37 @@
package org.apache.cassandra.batchlog;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -51,15 +60,20 @@ import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.UUIDGen;
import static com.google.common.collect.Iterables.transform;
@@ -93,15 +107,7 @@ public class BatchlogManager implements BatchlogManagerMBean
public void start()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
StorageService.RING_DELAY,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
new file mode 100644
index 0000000..1b8173e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class InfiniteLoopExecutor
+{
+ private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
+
+ public interface InterruptibleRunnable
+ {
+ void run() throws InterruptedException;
+ }
+
+ private final Thread thread;
+ private final InterruptibleRunnable runnable;
+ private volatile boolean isShutdown = false;
+
+ public InfiniteLoopExecutor(String name, InterruptibleRunnable runnable)
+ {
+ this.runnable = runnable;
+ this.thread = new Thread(this::loop, name);
+ this.thread.setDaemon(true);
+ }
+
+ private void loop()
+ {
+ while (!isShutdown)
+ {
+ try
+ {
+ runnable.run();
+ }
+ catch (InterruptedException ie)
+ {
+ if (isShutdown)
+ return;
+ logger.error("Interrupted while executing {}, but not shutdown; continuing with loop", runnable, ie);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Exception thrown by runnable, continuing with loop", t);
+ }
+ }
+ }
+
+ public InfiniteLoopExecutor start()
+ {
+ thread.start();
+ return this;
+ }
+
+ public void shutdown()
+ {
+ isShutdown = true;
+ thread.interrupt();
+ }
+
+ public void awaitTermination(long time, TimeUnit unit) throws InterruptedException
+ {
+ thread.join(unit.toMillis(time));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index 278b399..0e61de9 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -17,16 +17,14 @@
*/
package org.apache.cassandra.concurrent;
-import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import org.apache.cassandra.metrics.ThreadPoolMetrics;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
@@ -81,17 +79,8 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
super.prestartAllCoreThreads();
metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id).register();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
-
- try
- {
- mbs.registerMBean(this, new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, mbeanName);
}
public JMXEnabledThreadPoolExecutor(int corePoolSize,
@@ -114,14 +103,7 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
private void unregisterMBean()
{
- try
- {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.unregisterMBean(mbeanName);
// release metrics
metrics.release();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 22dc769..e51e4c2 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -17,6 +17,11 @@
*/
package org.apache.cassandra.concurrent;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Centralized location for shared executors
*/
@@ -41,4 +46,14 @@ public class ScheduledExecutors
* This executor is used for tasks that do not need to be waited for on shutdown/drain.
*/
public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
+
+ @VisibleForTesting
+ public static void shutdownAndWait() throws InterruptedException
+ {
+ ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
+ for (ExecutorService executor : executors)
+ executor.shutdown();
+ for (ExecutorService executor : executors)
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 3b0600f..5352ad7 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -21,9 +21,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
+
import static org.apache.cassandra.concurrent.SEPWorker.Work;
/**
@@ -61,7 +64,7 @@ public class SharedExecutorPool
final AtomicLong workerId = new AtomicLong();
// the collection of executors serviced by this pool; periodically ordered by traffic volume
- final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
+ public final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
// the number of workers currently in a spinning state
final AtomicInteger spinningCount = new AtomicInteger();
@@ -109,4 +112,14 @@ public class SharedExecutorPool
executors.add(executor);
return executor;
}
+
+ @VisibleForTesting
+ public static void shutdownSharedPool() throws InterruptedException
+ {
+ for (SEPExecutor executor : SHARED.executors)
+ executor.shutdown();
+
+ for (SEPExecutor executor : SHARED.executors)
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index c102042..608a005 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
import java.util.EnumMap;
import java.util.concurrent.*;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +113,15 @@ public class StageManager
}
}
+ @VisibleForTesting
+ public static void shutdownAndWait() throws InterruptedException
+ {
+ for (Stage stage : Stage.values())
+ StageManager.stages.get(stage).shutdown();
+ for (Stage stage : Stage.values())
+ StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS);
+ }
+
/**
* The executor used for tracing.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bc1e5a2..2f5f49f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -142,6 +143,11 @@ public class DatabaseDescriptor
public static void daemonInitialization() throws ConfigurationException
{
+ daemonInitialization(DatabaseDescriptor::loadConfig);
+ }
+
+ public static void daemonInitialization(Supplier<Config> config) throws ConfigurationException
+ {
if (toolInitialized)
throw new AssertionError("toolInitialization() already called");
if (clientInitialized)
@@ -152,7 +158,7 @@ public class DatabaseDescriptor
return;
daemonInitialized = true;
- setConfig(loadConfig());
+ setConfig(config.get());
applyAll();
AuthConfig.applyAuth();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 45db947..b8ec648 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -162,7 +162,8 @@ public class QueryProcessor implements QueryHandler
SystemKeyspace.resetPreparedStatements();
}
- private static QueryState internalQueryState()
+ @VisibleForTesting
+ public static QueryState internalQueryState()
{
return new QueryState(InternalStateInstance.INSTANCE.clientState);
}
@@ -265,7 +266,8 @@ public class QueryProcessor implements QueryHandler
return null;
}
- private static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
+ @VisibleForTesting
+ public static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
{
return makeInternalOptions(prepared, values, ConsistencyLevel.ONE);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f090013..cff9a78 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -21,18 +21,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
{
@@ -48,17 +45,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
private BlacklistedDirectories()
{
// Register this instance with JMX
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- JVMStabilityInspector.inspectThrowable(e);
- logger.error("error registering MBean {}", MBEAN_NAME, e);
- //Allow the server to start even if the bean can't be registered
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME, MBeanWrapper.OnException.LOG);
}
public Set<File> getUnreadableDirectories()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 877a3c5..c5149cf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
-import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
@@ -41,7 +40,6 @@ import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.Snapshot;
import org.apache.cassandra.cache.*;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.*;
@@ -72,7 +70,6 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.metrics.Sampler;
import org.apache.cassandra.metrics.Sampler.Sample;
import org.apache.cassandra.metrics.Sampler.SamplerType;
@@ -220,12 +217,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private volatile boolean neverPurgeTombstones = false;
+ public static void shutdownFlushExecutor() throws InterruptedException
+ {
+ flushExecutor.shutdown();
+ flushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
+
public static void shutdownPostFlushExecutor() throws InterruptedException
{
postFlushExecutor.shutdown();
postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
}
+ public static void shutdownReclaimExecutor() throws InterruptedException
+ {
+ reclaimExecutor.shutdown();
+ reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
+ public static void shutdownPerDiskFlushExecutors() throws InterruptedException
+ {
+ for (ExecutorService executorService : perDiskflushExecutors)
+ executorService.shutdown();
+ for (ExecutorService executorService : perDiskflushExecutors)
+ executorService.awaitTermination(60, TimeUnit.SECONDS);
+ }
+
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -425,19 +443,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s",
isIndex() ? "IndexColumnFamilies" : "ColumnFamilies",
keyspace.getName(), name);
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
- for (ObjectName objectName : objectNames)
- {
- mbs.registerMBean(this, objectName);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+
+ String[] objectNames = {mbeanName, oldMBeanName};
+ for (String objectName : objectNames)
+ MBeanWrapper.instance.registerMBean(this, objectName);
}
else
{
@@ -548,14 +557,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
data.removeUnreadableSSTables(directory);
}
- void unregisterMBean() throws MalformedObjectNameException, InstanceNotFoundException, MBeanRegistrationException
+ void unregisterMBean() throws MalformedObjectNameException
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
for (ObjectName objectName : objectNames)
{
- if (mbs.isRegistered(objectName))
- mbs.unregisterMBean(objectName);
+ if (MBeanWrapper.instance.isRegistered(objectName))
+ MBeanWrapper.instance.unregisterMBean(objectName);
}
// unregister metrics
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 3279acf..e26f658 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -17,13 +17,10 @@
*/
package org.apache.cassandra.db;
-import java.lang.management.ManagementFactory;
import java.util.List;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* A proxy class that implement the deprecated legacy HintedHandoffManagerMBean interface.
@@ -44,15 +41,7 @@ public final class HintedHandOffManager implements HintedHandOffManagerMBean
public void registerMBean()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
public void deleteHintsForEndpoint(String host)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 6537adc..9d2a369 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -18,19 +18,15 @@
package org.apache.cassandra.db.commitlog;
import java.io.*;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.zip.CRC32;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.*;
@@ -49,6 +45,7 @@ import org.apache.cassandra.security.EncryptionContext;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
@@ -82,15 +79,7 @@ public class CommitLog implements CommitLogMBean
{
CommitLog log = new CommitLog(CommitLogArchiver.construct());
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(log, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
return log.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e56ed60..bc5a883 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -19,14 +19,11 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
@@ -36,7 +33,6 @@ import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,15 +115,8 @@ public class CompactionManager implements CompactionManagerMBean
static
{
instance = new CompactionManager();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+
+ MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME);
}
private final CompactionExecutor executor = new CompactionExecutor();
@@ -232,6 +221,7 @@ public class CompactionManager implements CompactionManagerMBean
executor.shutdown();
validationExecutor.shutdown();
viewBuildExecutor.shutdown();
+ cacheCleanupExecutor.shutdown();
// interrupt compactions and validations
for (Holder compactionHolder : CompactionMetrics.getCompactions())
@@ -242,7 +232,7 @@ public class CompactionManager implements CompactionManagerMBean
// wait for tasks to terminate
// compaction tasks are interrupted above, so it shuold be fairy quick
// until not interrupted tasks to complete.
- for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor))
+ for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor, cacheCleanupExecutor))
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
index 3f3de7c..5953a1d 100644
--- a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
+++ b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* Service for publishing and consuming {@link DiagnosticEvent}s.
@@ -62,17 +63,7 @@ public final class DiagnosticEventService implements DiagnosticEventServiceMBean
private DiagnosticEventService()
{
-
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=DiagnosticEventService");
- mbs.registerMBean(this, jmxObjectName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this,"org.apache.cassandra.diag:type=DiagnosticEventService");
// register broadcasters for JMX events
DiagnosticEventPersistence.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
index 9fe5c48..8e991e6 100644
--- a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
+++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
@@ -18,21 +18,19 @@
package org.apache.cassandra.diag;
-import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
-import javax.management.ObjectName;
import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
/**
@@ -61,16 +59,8 @@ final class LastEventIdBroadcaster extends NotificationBroadcasterSupport implem
super(JMXBroadcastExecutor.executor);
summary.put("last_updated_at", 0L);
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=LastEventIdBroadcaster");
- mbs.registerMBean(this, jmxObjectName);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+
+ MBeanWrapper.instance.registerMBean(this, "org.apache.cassandra.diag:type=LastEventIdBroadcaster");
}
public static LastEventIdBroadcaster instance()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index d7f73ab..4a16f2a 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -21,15 +21,12 @@ import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.nio.file.Path;
import java.io.*;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.*;
@@ -42,6 +39,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* This FailureDetector is an implementation of the paper titled
@@ -88,15 +86,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
public FailureDetector()
{
// Register this instance with JMX
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
private static long getInitialValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index aedcb04..b789fe7 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.gms;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.*;
import java.util.Map.Entry;
@@ -28,8 +27,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
@@ -41,6 +38,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,15 +246,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
// Register this instance with JMX
if (registerJmx)
{
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 73840d3..1a352c2 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.hints;
import java.io.File;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
@@ -29,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.db.Keyspace;
@@ -51,6 +47,7 @@ import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.MBeanWrapper;
import static com.google.common.collect.Iterables.transform;
@@ -138,15 +135,7 @@ public final class HintsService implements HintsServiceMBean
public void registerMBean()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index c9a7cc6..ec54a65 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -1485,4 +1485,14 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
false);
}
}
+
+ @VisibleForTesting
+ public static void shutdownExecutors() throws InterruptedException
+ {
+ ExecutorService[] executors = new ExecutorService[]{ asyncExecutor, blockingExecutor };
+ for (ExecutorService executor : executors)
+ executor.shutdown();
+ for (ExecutorService executor : executors)
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index b8d236a..3630c2a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -18,29 +18,32 @@
package org.apache.cassandra.io.sstable;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -65,16 +68,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
static
{
instance = new IndexSummaryManager();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
- try
- {
- mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
}
private IndexSummaryManager()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
index a68dcc2..9df9861 100644
--- a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.io.util;
-import java.io.IOException;
import java.nio.ByteBuffer;
/**
@@ -57,14 +56,17 @@ public class DataInputBuffer extends RebufferingInputStream
}
@Override
- protected void reBuffer() throws IOException
+ protected void reBuffer()
{
//nope, we don't rebuffer, we are done!
}
@Override
- public int available() throws IOException
+ public int available()
{
return buffer.remaining();
}
+
+ @Override
+ public void close() {}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index d35f1fb..ddc8fba 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.locator;
-import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.codahale.metrics.Snapshot;
import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -41,6 +38,7 @@ import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
/**
* A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
@@ -141,15 +139,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
private void registerMBean()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, mbeanName);
}
public void close()
@@ -157,15 +147,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
updateSchedular.cancel(false);
resetSchedular.cancel(false);
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.unregisterMBean(new ObjectName(mbeanName));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.unregisterMBean(mbeanName);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index da90a79..d836cd1 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -17,28 +17,16 @@
*/
package org.apache.cassandra.locator;
-
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
{
public static void create()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(new EndpointSnitchInfo(), new ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(new EndpointSnitchInfo(), "org.apache.cassandra.db:type=EndpointSnitchInfo");
}
public String getDatacenter(String host) throws UnknownHostException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 43d6609..74c3367 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.metrics;
-import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
@@ -26,14 +25,14 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-
-import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import com.codahale.metrics.*;
import com.google.common.annotations.VisibleForTesting;
+import com.codahale.metrics.*;
+import org.apache.cassandra.utils.MBeanWrapper;
+
/**
* Makes integrating 3.0 metrics API with 2.0.
* <p>
@@ -45,7 +44,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
public static final CassandraMetricsRegistry Metrics = new CassandraMetricsRegistry();
private final Map<String, ThreadPoolMetrics> threadPoolMetrics = new ConcurrentHashMap<>();
- private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+ private final MBeanWrapper mBeanServer = MBeanWrapper.instance;
private CassandraMetricsRegistry()
{
@@ -159,11 +158,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
{
boolean removed = remove(name.getMetricName());
- try
- {
- mBeanServer.unregisterMBean(name.getMBeanName());
- } catch (Exception ignore) {}
-
+ mBeanServer.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
return removed;
}
@@ -194,13 +189,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
else
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
- try
- {
- mBeanServer.registerMBean(mbean, name);
- }
- catch (Exception ignored)
- {
- }
+ if (!mBeanServer.isRegistered(name))
+ mBeanServer.registerMBean(mbean, name, MBeanWrapper.OnException.LOG);
}
private void registerAlias(MetricName existingName, MetricName aliasName)
@@ -213,10 +203,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
private void removeAlias(MetricName name)
{
- try
- {
- mBeanServer.unregisterMBean(name.getMBeanName());
- } catch (Exception ignore) {}
+ if (mBeanServer.isRegistered(name.getMBeanName()))
+ MBeanWrapper.instance.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/ForwardToContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java
index ac9e725..b22eed6 100644
--- a/src/java/org/apache/cassandra/net/ForwardToContainer.java
+++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.net;
+import java.io.Serializable;
import java.util.Collection;
import com.google.common.base.Preconditions;
@@ -28,7 +29,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
* Contains forward to information until it can be serialized as part of a message using a version
* specific serialization
*/
-public class ForwardToContainer
+public class ForwardToContainer implements Serializable
{
public final Collection<InetAddressAndPort> targets;
public final int[] messageIds;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 1cd39f3..c8f4bfc 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -49,12 +49,12 @@ public class MessageIn<T>
public final int version;
public final long constructionTime;
- private MessageIn(InetAddressAndPort from,
- T payload,
- Map<ParameterType, Object> parameters,
- Verb verb,
- int version,
- long constructionTime)
+ public MessageIn(InetAddressAndPort from,
+ T payload,
+ Map<ParameterType, Object> parameters,
+ Verb verb,
+ int version,
+ long constructionTime)
{
this.from = from;
this.payload = payload;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c6e8496..761e210 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.net;
import java.io.IOError;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,8 +34,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
@@ -112,6 +109,7 @@ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.BooleanSerializer;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.NativeLibrary;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.StatusLogger;
@@ -464,6 +462,20 @@ public final class MessagingService implements MessagingServiceMBean
}
}
+ public static IVersionedSerializer<?> getVerbSerializer(Verb verb, int id)
+ {
+ IVersionedSerializer serializer = verbSerializers.get(verb);
+ if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
+ {
+ CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id);
+ if (callback == null)
+ return null;
+
+ serializer = callback.serializer;
+ }
+ return serializer;
+ }
+
/* Lookup table for registering message handlers based on the verb. */
private final Map<Verb, IVerbHandler> verbHandlers;
@@ -618,15 +630,7 @@ public final class MessagingService implements MessagingServiceMBean
if (!testOnly)
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
index 23e532c..e0be715 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
@@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.apache.cassandra.io.util.DataInputPlus;
+import java.io.IOException;
+
public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInputPlus
{
/**
@@ -40,4 +42,10 @@ public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInpu
{
return buf;
}
+
+ @Override
+ public String readUTF() throws IOException
+ {
+ return DataInputStreamPlus.readUTF(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandler.java b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
index 0a194d4..dafa993 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
@@ -18,15 +18,17 @@
package org.apache.cassandra.net.async;
-import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
import com.google.common.primitives.Ints;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +104,7 @@ public class MessageInHandler extends BaseMessageInHandler
{
if (in.readableBytes() < messageHeader.parameterLength)
return;
- readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters);
+ readParameters(in, inputPlus, messagingVersion, messageHeader.parameterLength, messageHeader.parameters);
}
state = State.READ_PAYLOAD_SIZE;
// fall-through
@@ -134,17 +136,17 @@ public class MessageInHandler extends BaseMessageInHandler
}
}
- private void readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
+ private static void readParameters(ByteBuf buf, DataInputPlus in, int messagingVersion, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
{
// makes the assumption we have all the bytes required to read the headers
- final int endIndex = in.readerIndex() + parameterLength;
- while (in.readerIndex() < endIndex)
+ final int endIndex = buf.readerIndex() + parameterLength;
+ while (buf.readerIndex() < endIndex)
{
- String key = DataInputStream.readUTF(inputPlus);
+ String key = in.readUTF();
ParameterType parameterType = ParameterType.byName.get(key);
- long valueLength = VIntCoding.readUnsignedVInt(in);
+ long valueLength = in.readUnsignedVInt();
byte[] value = new byte[Ints.checkedCast(valueLength)];
- in.readBytes(value);
+ in.readFully(value);
try (DataInputBuffer buffer = new DataInputBuffer(value))
{
parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -152,6 +154,55 @@ public class MessageInHandler extends BaseMessageInHandler
}
}
+ private static void readParameters(BooleanSupplier isDone, DataInputPlus in, int messagingVersion, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption we have all the bytes required to read the headers
+ while (!isDone.getAsBoolean())
+ {
+ String key = in.readUTF();
+ ParameterType parameterType = ParameterType.byName.get(key);
+ in.readUnsignedVInt();
+ parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+ }
+ }
+
+ public static MessageIn<?> deserialize(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+ {
+ if (version >= MessagingService.VERSION_40)
+ return deserialize40(in, id, version, from);
+ else
+ return MessageInHandlerPre40.deserializePre40(in, id, version, from);
+ }
+
+ private static MessageIn<?> deserialize40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+ {
+ MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+ int parameterLength = (int) in.readUnsignedVInt();
+ if (parameterLength != 0)
+ {
+ parameters = new EnumMap<>(ParameterType.class);
+ byte[] bytes = new byte[parameterLength];
+ in.readFully(bytes);
+ try (DataInputBuffer buffer = new DataInputBuffer(bytes))
+ {
+ readParameters(() -> buffer.available() == 0, buffer, version, parameters);
+ }
+ }
+
+ Object payload = null;
+ int payloadSize = (int) in.readUnsignedVInt();
+ if (payloadSize > 0)
+ {
+ IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+ if (serializer == null) in.skipBytesFully(payloadSize);
+ else payload = serializer.deserialize(in, version);
+ }
+
+ return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+ }
+
@Override
MessageHeader getMessageHeader()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
index f5b6fc4..6eeeea7 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
@@ -25,8 +25,11 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,10 +158,10 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
if (!canReadNextParam(in))
return false;
- String key = DataInputStream.readUTF(inputPlus);
+ String key = inputPlus.readUTF();
ParameterType parameterType = ParameterType.byName.get(key);
- byte[] value = new byte[in.readInt()];
- in.readBytes(value);
+ byte[] value = new byte[inputPlus.readInt()];
+ inputPlus.readFully(value);
try (DataInputBuffer buffer = new DataInputBuffer(value))
{
parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -168,6 +171,47 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
return true;
}
+ private static boolean readParameters(DataInputPlus in, int messagingVersion, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+ {
+ // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+ while (parameters.size() < parameterCount)
+ {
+ String key = in.readUTF();
+ ParameterType parameterType = ParameterType.byName.get(key);
+ in.readInt();
+ parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+ }
+
+ return true;
+ }
+
+ static MessageIn<?> deserializePre40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+ {
+ assert from.equals(CompactEndpointSerializationHelper.instance.deserialize(in, version));
+ MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+ Map<ParameterType, Object> parameters = Collections.emptyMap();
+ int parameterCount = in.readInt();
+ if (parameterCount != 0)
+ {
+ parameters = new EnumMap<>(ParameterType.class);
+ readParameters(in, version, parameterCount, parameters);
+ }
+
+ Object payload = null;
+ int payloadSize = in.readInt();
+ if (payloadSize > 0)
+ {
+ IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+ if (serializer == null) in.skipBytesFully(payloadSize);
+ else payload = serializer.deserialize(in, version);
+ }
+
+ return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+ }
+
+
+
/**
* Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
* readIndex back to where it was when this method was invoked.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 989e33c..2366722 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -2,6 +2,7 @@ package org.apache.cassandra.net.async;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import java.util.zip.Checksum;
import javax.annotation.Nullable;
@@ -384,12 +385,13 @@ public final class NettyFactory
}
}
- public void close()
+ public void close() throws InterruptedException
{
- acceptGroup.shutdownGracefully();
- outboundGroup.shutdownGracefully();
- inboundGroup.shutdownGracefully();
- streamingGroup.shutdownGracefully();
+ EventLoopGroup[] groups = new EventLoopGroup[] { acceptGroup, outboundGroup, inboundGroup, streamingGroup };
+ for (EventLoopGroup group : groups)
+ group.shutdownGracefully();
+ for (EventLoopGroup group : groups)
+ group.awaitTermination(60, TimeUnit.SECONDS);
}
static Lz4FrameEncoder createLz4Encoder(int protocolVersion)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/schema/SchemaEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java b/src/java/org/apache/cassandra/schema/SchemaEvent.java
index e26cee5..00c8136 100644
--- a/src/java/org/apache/cassandra/schema/SchemaEvent.java
+++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java
@@ -29,13 +29,14 @@ import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import org.apache.cassandra.diag.DiagnosticEvent;
import org.apache.cassandra.utils.Pair;
-final class SchemaEvent extends DiagnosticEvent
+public final class SchemaEvent extends DiagnosticEvent
{
private final SchemaEventType type;
@@ -62,7 +63,7 @@ final class SchemaEvent extends DiagnosticEvent
@Nullable
private final MapDifference<String,TableMetadata> indexesDiff;
- enum SchemaEventType
+ public enum SchemaEventType
{
KS_METADATA_LOADED,
KS_METADATA_RELOADED,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b32f67e..1a54e75 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,15 +18,11 @@
package org.apache.cassandra.service;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -78,6 +74,7 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
@@ -169,15 +166,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
.maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000))
.build();
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
}
public void start()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 479470c..5eeaf20 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
@@ -27,9 +26,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
@@ -53,6 +49,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
public class CacheService implements CacheServiceMBean
@@ -88,16 +85,7 @@ public class CacheService implements CacheServiceMBean
private CacheService()
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
- try
- {
- mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
keyCache = initKeyCache();
rowCache = initRowCache();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f0b2dc1..592419a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -569,16 +569,7 @@ public class CassandraDaemon
{
applyConfig();
- try
- {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), new ObjectName(MBEAN_NAME));
- }
- catch (Exception e)
- {
- logger.error("error registering MBean {}", MBEAN_NAME, e);
- //Allow the server to start even if the bean can't be registered
- }
+ MBeanWrapper.instance.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), MBEAN_NAME, MBeanWrapper.OnException.LOG);
if (FBUtilities.isWindows)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 7b6bd58..a3f6b52 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -30,6 +30,8 @@ import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+
public class PendingRangeCalculatorService
{
public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
@@ -117,4 +119,11 @@ public class PendingRangeCalculatorService
{
StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName);
}
+
+ @VisibleForTesting
+ public void shutdownExecutor() throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org