You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/11/16 18:42:47 UTC

[3/3] cassandra git commit: Introduce in-jvm distributed tests

Introduce in-jvm distributed tests

Patch by Alex Petrov and Benedict Elliott Smith; reviewed by Benedict Elliott Smith and Dinesh Joshi for CASSANDRA-14821.

Co-authored-by: Benedict Elliott Smith <be...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f22fec92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f22fec92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f22fec92

Branch: refs/heads/trunk
Commit: f22fec927de7ac291266660c2f34de5b8cc1c695
Parents: 7877035
Author: Alex Petrov <ol...@gmail.com>
Authored: Fri Nov 16 19:41:58 2018 +0100
Committer: Alex Petrov <ol...@gmail.com>
Committed: Fri Nov 16 19:41:58 2018 +0100

----------------------------------------------------------------------
 .circleci/config.yml                            |  14 +-
 build.xml                                       |  10 +-
 ide/idea-iml-file.xml                           |   1 +
 .../org/apache/cassandra/auth/AuthCache.java    |  33 +-
 .../cassandra/batchlog/BatchlogManager.java     |  48 ++-
 .../concurrent/InfiniteLoopExecutor.java        |  83 ++++
 .../JMXEnabledThreadPoolExecutor.java           |  24 +-
 .../concurrent/ScheduledExecutors.java          |  15 +
 .../concurrent/SharedExecutorPool.java          |  15 +-
 .../cassandra/concurrent/StageManager.java      |  10 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |   6 +-
 .../cassandra/db/BlacklistedDirectories.java    |  17 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  48 ++-
 .../cassandra/db/HintedHandOffManager.java      |  15 +-
 .../cassandra/db/commitlog/CommitLog.java       |  15 +-
 .../db/compaction/CompactionManager.java        |  18 +-
 .../cassandra/diag/DiagnosticEventService.java  |  13 +-
 .../cassandra/diag/LastEventIdBroadcaster.java  |  16 +-
 .../apache/cassandra/gms/FailureDetector.java   |  14 +-
 src/java/org/apache/cassandra/gms/Gossiper.java |  14 +-
 .../apache/cassandra/hints/HintsService.java    |  15 +-
 .../cassandra/index/SecondaryIndexManager.java  |  10 +
 .../io/sstable/IndexSummaryManager.java         |  30 +-
 .../cassandra/io/util/DataInputBuffer.java      |   8 +-
 .../locator/DynamicEndpointSnitch.java          |  24 +-
 .../cassandra/locator/EndpointSnitchInfo.java   |  16 +-
 .../metrics/CassandraMetricsRegistry.java       |  30 +-
 .../cassandra/net/ForwardToContainer.java       |   3 +-
 .../org/apache/cassandra/net/MessageIn.java     |  12 +-
 .../apache/cassandra/net/MessagingService.java  |  28 +-
 .../net/async/ByteBufDataInputPlus.java         |   8 +
 .../cassandra/net/async/MessageInHandler.java   |  67 +++-
 .../net/async/MessageInHandlerPre40.java        |  50 ++-
 .../cassandra/net/async/NettyFactory.java       |  12 +-
 .../apache/cassandra/schema/SchemaEvent.java    |   5 +-
 .../cassandra/service/ActiveRepairService.java  |  15 +-
 .../apache/cassandra/service/CacheService.java  |  16 +-
 .../cassandra/service/CassandraDaemon.java      |  11 +-
 .../service/PendingRangeCalculatorService.java  |   9 +
 .../apache/cassandra/service/StorageProxy.java  |  15 +-
 .../cassandra/service/StorageService.java       |  16 +-
 .../apache/cassandra/utils/ByteBufferUtil.java  |  26 ++
 .../apache/cassandra/utils/MBeanWrapper.java    | 179 +++++++++
 .../org/apache/cassandra/utils/Mx4jTool.java    |   4 +-
 .../apache/cassandra/utils/concurrent/Ref.java  |  35 +-
 .../cassandra/utils/memory/BufferPool.java      |  43 +-
 .../utils/memory/MemtableCleanerThread.java     |  77 ++--
 .../cassandra/utils/memory/MemtablePool.java    |   9 +
 test/conf/logback-dtest.xml                     |  79 ++++
 .../cassandra/distributed/Coordinator.java      |  80 ++++
 .../DistributedReadWritePathTest.java           | 348 ++++++++++++++++
 .../distributed/DistributedTestBase.java        |  86 ++++
 .../apache/cassandra/distributed/Instance.java  | 399 +++++++++++++++++++
 .../distributed/InstanceClassLoader.java        | 101 +++++
 .../cassandra/distributed/InstanceConfig.java   |  87 ++++
 .../distributed/InstanceIDDefiner.java          |  38 ++
 .../distributed/InvokableInstance.java          | 133 +++++++
 .../apache/cassandra/distributed/Message.java   |  41 ++
 .../cassandra/distributed/MessageFilters.java   | 175 ++++++++
 .../apache/cassandra/distributed/RowUtil.java   |  47 +++
 .../cassandra/distributed/TestCluster.java      | 308 ++++++++++++++
 .../cassandra/net/async/NettyFactoryTest.java   |   2 +-
 .../async/StreamCompressionSerializerTest.java  |   4 -
 64 files changed, 2661 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/.circleci/config.yml
----------------------------------------------------------------------
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 430354a..3b2b978 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -164,12 +164,20 @@ jobs:
 
             # get all of our unit test filenames
             set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+            # append distributed tests
+            set -eo pipefail && circleci tests glob "$HOME/cassandra/test/distributed/**/*.java" > /tmp/all_java_distributed_tests.txt
 
             # split up the unit tests into groups based on the number of containers we have
             set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
             set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | cut -c 37-1000000 | grep "Test\.java$" > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
             echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
             cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+
+            set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_distributed_tests.txt > /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt
+            set +eo pipefail && cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt | cut -c 44-1000000 | grep "Test\.java$" > /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+            echo "** /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt"
+            cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+
       - run:
          name: Run Unit Tests
          command: |
@@ -181,7 +189,11 @@ jobs:
 
             time mv ~/cassandra /tmp
             cd /tmp/cassandra
-            ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+            ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=unit
+
+            if [ -s "/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt" ]; then
+              ant testclasslist -Dtest.classlistfile=/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=distributed
+            fi
          no_output_timeout: 15m
       - store_test_results:
             path: /tmp/cassandra/build/test/output/

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 3d3014c..f24647e 100644
--- a/build.xml
+++ b/build.xml
@@ -56,12 +56,14 @@
     <property name="test.data" value="${test.dir}/data"/>
     <property name="test.name" value="*Test"/>
     <property name="test.classlistfile" value="testlist.txt"/>
+    <property name="test.classlistprefix" value="unit"/>
     <property name="benchmark.name" value=""/>
     <property name="test.methods" value=""/>
     <property name="test.unit.src" value="${test.dir}/unit"/>
     <property name="test.long.src" value="${test.dir}/long"/>
     <property name="test.burn.src" value="${test.dir}/burn"/>
     <property name="test.microbench.src" value="${test.dir}/microbench"/>
+    <property name="test.distributed.src" value="${test.dir}/distributed"/>
     <property name="dist.dir" value="${build.dir}/dist"/>
     <property name="tmp.dir" value="${java.io.tmpdir}"/>
 
@@ -103,6 +105,7 @@
     <property name="test.timeout" value="240000" />
     <property name="test.long.timeout" value="600000" />
     <property name="test.burn.timeout" value="60000000" />
+    <property name="test.distributed.timeout" value="600000" />
 
     <!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
     <property name="cassandra.test.use_prepared" value="true" />
@@ -1253,6 +1256,7 @@
      <src path="${test.long.src}"/>
      <src path="${test.burn.src}"/>
      <src path="${test.microbench.src}"/>
+     <src path="${test.distributed.src}"/>
     </javac>
 
     <!-- Non-java resources needed by the test suite -->
@@ -1364,7 +1368,7 @@
     <attribute name="test.file.list"/>
     <attribute name="testlist.offset"/>
     <sequential>
-      <testmacrohelper inputdir="${test.unit.src}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
+      <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
         <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
         <jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
         <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
@@ -1468,6 +1472,7 @@
     </concat>
     <path id="all-test-classes-path">
       <fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
+      <fileset dir="${test.distributed.src}" includes="**/${test.name}.java" />
     </path>
     <property name="all-test-classes" refid="all-test-classes-path"/>
     <testparallel testdelegate="testlist-compression" />
@@ -1844,7 +1849,7 @@
   e.g. org/apache/cassandra/hints/HintMessageTest.java -->
   <target name="testclasslist" depends="build-test" description="Parallel-run tests given in file -Dtest.classlistfile (one-class-per-line, e.g. org/apache/cassandra/db/SomeTest.java)">
     <path id="all-test-classes-path">
-      <fileset dir="${test.unit.src}" includesfile="${test.classlistfile}"/>
+      <fileset dir="${test.dir}/${test.classlistprefix}" includesfile="${test.classlistfile}"/>
     </path>
     <property name="all-test-classes" refid="all-test-classes-path"/>
     <testparallel testdelegate="testlist"/>
@@ -1939,6 +1944,7 @@
   <classpathentry kind="src" path="conf" including="hotspot_compiler"/>
   <classpathentry kind="src" output="build/test/classes" path="test/unit"/>
   <classpathentry kind="src" output="build/test/classes" path="test/long"/>
+  <classpathentry kind="src" output="build/test/classes" path="test/distributed"/>
   <classpathentry kind="src" output="build/test/classes" path="test/resources" />
   <classpathentry kind="src" path="tools/stress/src"/>
   <classpathentry kind="src" path="tools/fqltool/src"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/ide/idea-iml-file.xml
----------------------------------------------------------------------
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index b83abfa..0045ae6 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -35,6 +35,7 @@
             <sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" />
+            <sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" />
             <sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" />
             <excludeFolder url="file://$MODULE_DIR$/.idea" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/auth/AuthCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java b/src/java/org/apache/cassandra/auth/AuthCache.java
index 3adf914..4bf15c1 100644
--- a/src/java/org/apache/cassandra/auth/AuthCache.java
+++ b/src/java/org/apache/cassandra/auth/AuthCache.java
@@ -18,22 +18,19 @@
 
 package org.apache.cassandra.auth;
 
-import java.lang.management.ManagementFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.IntConsumer;
 import java.util.function.IntSupplier;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 
 import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -97,33 +94,17 @@ public class AuthCache<K, V> implements AuthCacheMBean
     protected void init()
     {
         cache = initCache(null);
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, getObjectName());
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, getObjectName());
     }
 
     protected void unregisterMBean()
     {
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.unregisterMBean(getObjectName());
-        }
-        catch (Exception e)
-        {
-            logger.warn("Error unregistering {} cache mbean", name, e);
-        }
+        MBeanWrapper.instance.unregisterMBean(getObjectName(), MBeanWrapper.OnException.LOG);
     }
 
-    protected ObjectName getObjectName() throws MalformedObjectNameException
+    protected String getObjectName()
     {
-        return new ObjectName(MBEAN_NAME_BASE + name);
+        return MBEAN_NAME_BASE + name;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 91129ed..b2b851d 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -18,28 +18,37 @@
 package org.apache.cassandra.batchlog;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.RateLimiter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -51,15 +60,20 @@ import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static com.google.common.collect.Iterables.transform;
@@ -93,15 +107,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public void start()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 
         batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
                                              StorageService.RING_DELAY,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
new file mode 100644
index 0000000..1b8173e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class InfiniteLoopExecutor
+{
+    private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
+
+    public interface InterruptibleRunnable
+    {
+        void run() throws InterruptedException;
+    }
+
+    private final Thread thread;
+    private final InterruptibleRunnable runnable;
+    private volatile boolean isShutdown = false;
+
+    public InfiniteLoopExecutor(String name, InterruptibleRunnable runnable)
+    {
+        this.runnable = runnable;
+        this.thread = new Thread(this::loop, name);
+        this.thread.setDaemon(true);
+    }
+
+    private void loop()
+    {
+        while (!isShutdown)
+        {
+            try
+            {
+                runnable.run();
+            }
+            catch (InterruptedException ie)
+            {
+                if (isShutdown)
+                    return;
+                logger.error("Interrupted while executing {}, but not shutdown; continuing with loop", runnable, ie);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Exception thrown by runnable, continuing with loop", t);
+            }
+        }
+    }
+
+    public InfiniteLoopExecutor start()
+    {
+        thread.start();
+        return this;
+    }
+
+    public void shutdown()
+    {
+        isShutdown = true;
+        thread.interrupt();
+    }
+
+    public void awaitTermination(long time, TimeUnit unit) throws InterruptedException
+    {
+        thread.join(unit.toMillis(time));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index 278b399..0e61de9 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -17,16 +17,14 @@
  */
 package org.apache.cassandra.concurrent;
 
-import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.metrics.ThreadPoolMetrics;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
@@ -81,17 +79,8 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
         super.prestartAllCoreThreads();
         metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id).register();
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
-
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, mbeanName);
     }
 
     public JMXEnabledThreadPoolExecutor(int corePoolSize,
@@ -114,14 +103,7 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
 
     private void unregisterMBean()
     {
-        try
-        {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.unregisterMBean(mbeanName);
 
         // release metrics
         metrics.release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 22dc769..e51e4c2 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -17,6 +17,11 @@
  */
 package org.apache.cassandra.concurrent;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Centralized location for shared executors
  */
@@ -41,4 +46,14 @@ public class ScheduledExecutors
      * This executor is used for tasks that do not need to be waited for on shutdown/drain.
      */
     public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
+
+    @VisibleForTesting
+    public static void shutdownAndWait() throws InterruptedException
+    {
+        ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
+        for (ExecutorService executor : executors)
+            executor.shutdown();
+        for (ExecutorService executor : executors)
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 3b0600f..5352ad7 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -21,9 +21,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import static org.apache.cassandra.concurrent.SEPWorker.Work;
 
 /**
@@ -61,7 +64,7 @@ public class SharedExecutorPool
     final AtomicLong workerId = new AtomicLong();
 
     // the collection of executors serviced by this pool; periodically ordered by traffic volume
-    final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
+    public final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
 
     // the number of workers currently in a spinning state
     final AtomicInteger spinningCount = new AtomicInteger();
@@ -109,4 +112,14 @@ public class SharedExecutorPool
         executors.add(executor);
         return executor;
     }
+
+    @VisibleForTesting
+    public static void shutdownSharedPool() throws InterruptedException
+    {
+        for (SEPExecutor executor : SHARED.executors)
+            executor.shutdown();
+
+        for (SEPExecutor executor : SHARED.executors)
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index c102042..608a005 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
 import java.util.EnumMap;
 import java.util.concurrent.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,6 +113,15 @@ public class StageManager
         }
     }
 
+    @VisibleForTesting
+    public static void shutdownAndWait() throws InterruptedException
+    {
+        for (Stage stage : Stage.values())
+            StageManager.stages.get(stage).shutdown();
+        for (Stage stage : Stage.values())
+            StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     /**
      * The executor used for tracing.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bc1e5a2..2f5f49f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -142,6 +143,11 @@ public class DatabaseDescriptor
 
     public static void daemonInitialization() throws ConfigurationException
     {
+        daemonInitialization(DatabaseDescriptor::loadConfig);
+    }
+
+    public static void daemonInitialization(Supplier<Config> config) throws ConfigurationException
+    {
         if (toolInitialized)
             throw new AssertionError("toolInitialization() already called");
         if (clientInitialized)
@@ -152,7 +158,7 @@ public class DatabaseDescriptor
             return;
         daemonInitialized = true;
 
-        setConfig(loadConfig());
+        setConfig(config.get());
         applyAll();
         AuthConfig.applyAuth();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 45db947..b8ec648 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -162,7 +162,8 @@ public class QueryProcessor implements QueryHandler
             SystemKeyspace.resetPreparedStatements();
     }
 
-    private static QueryState internalQueryState()
+    @VisibleForTesting
+    public static QueryState internalQueryState()
     {
         return new QueryState(InternalStateInstance.INSTANCE.clientState);
     }
@@ -265,7 +266,8 @@ public class QueryProcessor implements QueryHandler
             return null;
     }
 
-    private static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
+    @VisibleForTesting
+    public static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
     {
         return makeInternalOptions(prepared, values, ConsistencyLevel.ONE);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f090013..cff9a78 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -21,18 +21,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
 {
@@ -48,17 +45,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
     private BlacklistedDirectories()
     {
         // Register this instance with JMX
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            JVMStabilityInspector.inspectThrowable(e);
-            logger.error("error registering MBean {}", MBEAN_NAME, e);
-            //Allow the server to start even if the bean can't be registered
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME, MBeanWrapper.OnException.LOG);
     }
 
     public Set<File> getUnreadableDirectories()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 877a3c5..c5149cf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
@@ -41,7 +40,6 @@ import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.Snapshot;
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
@@ -72,7 +70,6 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.metrics.Sampler;
 import org.apache.cassandra.metrics.Sampler.Sample;
 import org.apache.cassandra.metrics.Sampler.SamplerType;
@@ -220,12 +217,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private volatile boolean neverPurgeTombstones = false;
 
+    public static void shutdownFlushExecutor() throws InterruptedException
+    {
+        flushExecutor.shutdown();
+        flushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
+
     public static void shutdownPostFlushExecutor() throws InterruptedException
     {
         postFlushExecutor.shutdown();
         postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
     }
 
+    public static void shutdownReclaimExecutor() throws InterruptedException
+    {
+        reclaimExecutor.shutdown();
+        reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
+    public static void shutdownPerDiskFlushExecutors() throws InterruptedException
+    {
+        for (ExecutorService executorService : perDiskflushExecutors)
+            executorService.shutdown();
+        for (ExecutorService executorService : perDiskflushExecutors)
+            executorService.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -425,19 +443,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s",
                                          isIndex() ? "IndexColumnFamilies" : "ColumnFamilies",
                                          keyspace.getName(), name);
-            try
-            {
-                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
-                for (ObjectName objectName : objectNames)
-                {
-                    mbs.registerMBean(this, objectName);
-                }
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
+
+            String[] objectNames = {mbeanName, oldMBeanName};
+            for (String objectName : objectNames)
+                MBeanWrapper.instance.registerMBean(this, objectName);
         }
         else
         {
@@ -548,14 +557,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.removeUnreadableSSTables(directory);
     }
 
-    void unregisterMBean() throws MalformedObjectNameException, InstanceNotFoundException, MBeanRegistrationException
+    void unregisterMBean() throws MalformedObjectNameException
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
         for (ObjectName objectName : objectNames)
         {
-            if (mbs.isRegistered(objectName))
-                mbs.unregisterMBean(objectName);
+            if (MBeanWrapper.instance.isRegistered(objectName))
+                MBeanWrapper.instance.unregisterMBean(objectName);
         }
 
         // unregister metrics

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 3279acf..e26f658 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.db;
 
-import java.lang.management.ManagementFactory;
 import java.util.List;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * A proxy class that implement the deprecated legacy HintedHandoffManagerMBean interface.
@@ -44,15 +41,7 @@ public final class HintedHandOffManager implements HintedHandOffManagerMBean
 
     public void registerMBean()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     public void deleteHintsForEndpoint(String host)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 6537adc..9d2a369 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -18,19 +18,15 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.zip.CRC32;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.*;
@@ -49,6 +45,7 @@ import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
@@ -82,15 +79,7 @@ public class CommitLog implements CommitLogMBean
     {
         CommitLog log = new CommitLog(CommitLogArchiver.construct());
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(log, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
         return log.start();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e56ed60..bc5a883 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -19,14 +19,11 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
@@ -36,7 +33,6 @@ import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
 import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,15 +115,8 @@ public class CompactionManager implements CompactionManagerMBean
     static
     {
         instance = new CompactionManager();
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+
+        MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME);
     }
 
     private final CompactionExecutor executor = new CompactionExecutor();
@@ -232,6 +221,7 @@ public class CompactionManager implements CompactionManagerMBean
         executor.shutdown();
         validationExecutor.shutdown();
         viewBuildExecutor.shutdown();
+        cacheCleanupExecutor.shutdown();
 
         // interrupt compactions and validations
         for (Holder compactionHolder : CompactionMetrics.getCompactions())
@@ -242,7 +232,7 @@ public class CompactionManager implements CompactionManagerMBean
         // wait for tasks to terminate
         // compaction tasks are interrupted above, so it shuold be fairy quick
         // until not interrupted tasks to complete.
-        for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor))
+        for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor, cacheCleanupExecutor))
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
index 3f3de7c..5953a1d 100644
--- a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
+++ b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * Service for publishing and consuming {@link DiagnosticEvent}s.
@@ -62,17 +63,7 @@ public final class DiagnosticEventService implements DiagnosticEventServiceMBean
 
     private DiagnosticEventService()
     {
-
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=DiagnosticEventService");
-            mbs.registerMBean(this, jmxObjectName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this,"org.apache.cassandra.diag:type=DiagnosticEventService");
 
         // register broadcasters for JMX events
         DiagnosticEventPersistence.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
index 9fe5c48..8e991e6 100644
--- a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
+++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
@@ -18,21 +18,19 @@
 
 package org.apache.cassandra.diag;
 
-import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import javax.management.MBeanServer;
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
 
 /**
@@ -61,16 +59,8 @@ final class LastEventIdBroadcaster extends NotificationBroadcasterSupport implem
         super(JMXBroadcastExecutor.executor);
 
         summary.put("last_updated_at", 0L);
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=LastEventIdBroadcaster");
-            mbs.registerMBean(this, jmxObjectName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+
+        MBeanWrapper.instance.registerMBean(this, "org.apache.cassandra.diag:type=LastEventIdBroadcaster");
     }
 
     public static LastEventIdBroadcaster instance()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index d7f73ab..4a16f2a 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -21,15 +21,12 @@ import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.nio.file.Path;
 import java.io.*;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.*;
 
@@ -42,6 +39,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * This FailureDetector is an implementation of the paper titled
@@ -88,15 +86,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
     public FailureDetector()
     {
         // Register this instance with JMX
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     private static long getInitialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index aedcb04..b789fe7 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.gms;
 
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.Map.Entry;
@@ -28,8 +27,6 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Suppliers;
@@ -41,6 +38,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -248,15 +246,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         // Register this instance with JMX
         if (registerJmx)
         {
-            try
-            {
-                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
+            MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 73840d3..1a352c2 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.hints;
 
 import java.io.File;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.db.Keyspace;
@@ -51,6 +47,7 @@ import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 import static com.google.common.collect.Iterables.transform;
 
@@ -138,15 +135,7 @@ public final class HintsService implements HintsServiceMBean
 
     public void registerMBean()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index c9a7cc6..ec54a65 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -1485,4 +1485,14 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
                                      false);
         }
     }
+
+    @VisibleForTesting
+    public static void shutdownExecutors() throws InterruptedException
+    {
+        ExecutorService[] executors = new ExecutorService[]{ asyncExecutor, blockingExecutor };
+        for (ExecutorService executor : executors)
+            executor.shutdown();
+        for (ExecutorService executor : executors)
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index b8d236a..3630c2a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -18,29 +18,32 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -65,16 +68,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
     static
     {
         instance = new IndexSummaryManager();
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
-        try
-        {
-            mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
     }
 
     private IndexSummaryManager()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
index a68dcc2..9df9861 100644
--- a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
@@ -57,14 +56,17 @@ public class DataInputBuffer extends RebufferingInputStream
     }
 
     @Override
-    protected void reBuffer() throws IOException
+    protected void reBuffer()
     {
         //nope, we don't rebuffer, we are done!
     }
 
     @Override
-    public int available() throws IOException
+    public int available()
     {
         return buffer.remaining();
     }
+
+    @Override
+    public void close() {}
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index d35f1fb..ddc8fba 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.locator;
 
-import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.codahale.metrics.Snapshot;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -41,6 +38,7 @@ import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
@@ -141,15 +139,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
 
     private void registerMBean()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, mbeanName);
     }
 
     public void close()
@@ -157,15 +147,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         updateSchedular.cancel(false);
         resetSchedular.cancel(false);
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.unregisterMBean(new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.unregisterMBean(mbeanName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index da90a79..d836cd1 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -17,28 +17,16 @@
  */
 package org.apache.cassandra.locator;
 
-
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
 {
     public static void create()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(new EndpointSnitchInfo(), new ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(new EndpointSnitchInfo(), "org.apache.cassandra.db:type=EndpointSnitchInfo");
     }
 
     public String getDatacenter(String host) throws UnknownHostException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 43d6609..74c3367 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,14 +25,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-
-import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.*;
 import com.google.common.annotations.VisibleForTesting;
 
+import com.codahale.metrics.*;
+import org.apache.cassandra.utils.MBeanWrapper;
+
 /**
  * Makes integrating 3.0 metrics API with 2.0.
  * <p>
@@ -45,7 +44,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
     public static final CassandraMetricsRegistry Metrics = new CassandraMetricsRegistry();
     private final Map<String, ThreadPoolMetrics> threadPoolMetrics = new ConcurrentHashMap<>();
 
-    private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+    private final MBeanWrapper mBeanServer = MBeanWrapper.instance;
 
     private CassandraMetricsRegistry()
     {
@@ -159,11 +158,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
     {
         boolean removed = remove(name.getMetricName());
 
-        try
-        {
-            mBeanServer.unregisterMBean(name.getMBeanName());
-        } catch (Exception ignore) {}
-
+        mBeanServer.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
         return removed;
     }
 
@@ -194,13 +189,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
         else
             throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
 
-        try
-        {
-            mBeanServer.registerMBean(mbean, name);
-        }
-        catch (Exception ignored)
-        {
-        }
+        if (!mBeanServer.isRegistered(name))
+            mBeanServer.registerMBean(mbean, name, MBeanWrapper.OnException.LOG);
     }
 
     private void registerAlias(MetricName existingName, MetricName aliasName)
@@ -213,10 +203,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
 
     private void removeAlias(MetricName name)
     {
-        try
-        {
-            mBeanServer.unregisterMBean(name.getMBeanName());
-        } catch (Exception ignore) {}
+        if (mBeanServer.isRegistered(name.getMBeanName()))
+            MBeanWrapper.instance.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/ForwardToContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java
index ac9e725..b22eed6 100644
--- a/src/java/org/apache/cassandra/net/ForwardToContainer.java
+++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.net;
 
+import java.io.Serializable;
 import java.util.Collection;
 
 import com.google.common.base.Preconditions;
@@ -28,7 +29,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
  * Contains forward to information until it can be serialized as part of a message using a version
  * specific serialization
  */
-public class ForwardToContainer
+public class ForwardToContainer implements Serializable
 {
     public final Collection<InetAddressAndPort> targets;
     public final int[] messageIds;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 1cd39f3..c8f4bfc 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -49,12 +49,12 @@ public class MessageIn<T>
     public final int version;
     public final long constructionTime;
 
-    private MessageIn(InetAddressAndPort from,
-                      T payload,
-                      Map<ParameterType, Object> parameters,
-                      Verb verb,
-                      int version,
-                      long constructionTime)
+    public MessageIn(InetAddressAndPort from,
+                     T payload,
+                     Map<ParameterType, Object> parameters,
+                     Verb verb,
+                     int version,
+                     long constructionTime)
     {
         this.from = from;
         this.payload = payload;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c6e8496..761e210 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.net;
 
 import java.io.IOError;
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,8 +34,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -112,6 +109,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.BooleanSerializer;
 import org.apache.cassandra.utils.ExpiringMap;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.NativeLibrary;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.StatusLogger;
@@ -464,6 +462,20 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
+    public static IVersionedSerializer<?> getVerbSerializer(Verb verb, int id)
+    {
+        IVersionedSerializer serializer = verbSerializers.get(verb);
+        if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
+        {
+            CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id);
+            if (callback == null)
+                return null;
+
+            serializer = callback.serializer;
+        }
+        return serializer;
+    }
+
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
@@ -618,15 +630,7 @@ public final class MessagingService implements MessagingServiceMBean
 
         if (!testOnly)
         {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            try
-            {
-                mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
+            MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
index 23e532c..e0be715 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
@@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import org.apache.cassandra.io.util.DataInputPlus;
 
+import java.io.IOException;
+
 public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInputPlus
 {
     /**
@@ -40,4 +42,10 @@ public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInpu
     {
         return buf;
     }
+
+    @Override
+    public String readUTF() throws IOException
+    {
+        return DataInputStreamPlus.readUTF(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandler.java b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
index 0a194d4..dafa993 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
@@ -18,15 +18,17 @@
 
 package org.apache.cassandra.net.async;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
 
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +104,7 @@ public class MessageInHandler extends BaseMessageInHandler
                     {
                         if (in.readableBytes() < messageHeader.parameterLength)
                             return;
-                        readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters);
+                        readParameters(in, inputPlus, messagingVersion, messageHeader.parameterLength, messageHeader.parameters);
                     }
                     state = State.READ_PAYLOAD_SIZE;
                     // fall-through
@@ -134,17 +136,17 @@ public class MessageInHandler extends BaseMessageInHandler
         }
     }
 
-    private void readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
+    private static void readParameters(ByteBuf buf, DataInputPlus in, int messagingVersion, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
     {
         // makes the assumption we have all the bytes required to read the headers
-        final int endIndex = in.readerIndex() + parameterLength;
-        while (in.readerIndex() < endIndex)
+        final int endIndex = buf.readerIndex() + parameterLength;
+        while (buf.readerIndex() < endIndex)
         {
-            String key = DataInputStream.readUTF(inputPlus);
+            String key = in.readUTF();
             ParameterType parameterType = ParameterType.byName.get(key);
-            long valueLength =  VIntCoding.readUnsignedVInt(in);
+            long valueLength =  in.readUnsignedVInt();
             byte[] value = new byte[Ints.checkedCast(valueLength)];
-            in.readBytes(value);
+            in.readFully(value);
             try (DataInputBuffer buffer = new DataInputBuffer(value))
             {
                 parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -152,6 +154,55 @@ public class MessageInHandler extends BaseMessageInHandler
         }
     }
 
+    private static void readParameters(BooleanSupplier isDone, DataInputPlus in, int messagingVersion, Map<ParameterType, Object> parameters) throws IOException
+    {
+        // makes the assumption we have all the bytes required to read the headers
+        while (!isDone.getAsBoolean())
+        {
+            String key = in.readUTF();
+            ParameterType parameterType = ParameterType.byName.get(key);
+            in.readUnsignedVInt();
+            parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+        }
+    }
+
+    public static MessageIn<?> deserialize(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+    {
+        if (version >= MessagingService.VERSION_40)
+            return deserialize40(in, id, version, from);
+        else
+            return MessageInHandlerPre40.deserializePre40(in, id, version, from);
+    }
+
+    private static MessageIn<?> deserialize40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+    {
+        MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+        Map<ParameterType, Object> parameters = Collections.emptyMap();
+        int parameterLength = (int) in.readUnsignedVInt();
+        if (parameterLength != 0)
+        {
+            parameters = new EnumMap<>(ParameterType.class);
+            byte[] bytes = new byte[parameterLength];
+            in.readFully(bytes);
+            try (DataInputBuffer buffer = new DataInputBuffer(bytes))
+            {
+                readParameters(() -> buffer.available() == 0, buffer, version, parameters);
+            }
+        }
+
+        Object payload = null;
+        int payloadSize = (int) in.readUnsignedVInt();
+        if (payloadSize > 0)
+        {
+            IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+            if (serializer == null) in.skipBytesFully(payloadSize);
+            else payload = serializer.deserialize(in, version);
+        }
+
+        return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+    }
+
     @Override
     MessageHeader getMessageHeader()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
index f5b6fc4..6eeeea7 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
@@ -25,8 +25,11 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -155,10 +158,10 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
             if (!canReadNextParam(in))
                 return false;
 
-            String key = DataInputStream.readUTF(inputPlus);
+            String key = inputPlus.readUTF();
             ParameterType parameterType = ParameterType.byName.get(key);
-            byte[] value = new byte[in.readInt()];
-            in.readBytes(value);
+            byte[] value = new byte[inputPlus.readInt()];
+            inputPlus.readFully(value);
             try (DataInputBuffer buffer = new DataInputBuffer(value))
             {
                 parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -168,6 +171,47 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
         return true;
     }
 
+    private static boolean readParameters(DataInputPlus in, int messagingVersion, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+    {
+        // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+        while (parameters.size() < parameterCount)
+        {
+            String key = in.readUTF();
+            ParameterType parameterType = ParameterType.byName.get(key);
+            in.readInt();
+            parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+        }
+
+        return true;
+    }
+
+    static MessageIn<?> deserializePre40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+    {
+        assert from.equals(CompactEndpointSerializationHelper.instance.deserialize(in, version));
+        MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+        Map<ParameterType, Object> parameters = Collections.emptyMap();
+        int parameterCount = in.readInt();
+        if (parameterCount != 0)
+        {
+            parameters = new EnumMap<>(ParameterType.class);
+            readParameters(in, version, parameterCount, parameters);
+        }
+
+        Object payload = null;
+        int payloadSize = in.readInt();
+        if (payloadSize > 0)
+        {
+            IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+            if (serializer == null) in.skipBytesFully(payloadSize);
+            else payload = serializer.deserialize(in, version);
+        }
+
+        return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+    }
+
+
+
     /**
      * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
      * readIndex back to where it was when this method was invoked.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 989e33c..2366722 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -2,6 +2,7 @@ package org.apache.cassandra.net.async;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.Checksum;
 
 import javax.annotation.Nullable;
@@ -384,12 +385,13 @@ public final class NettyFactory
         }
     }
 
-    public void close()
+    public void close() throws InterruptedException
     {
-        acceptGroup.shutdownGracefully();
-        outboundGroup.shutdownGracefully();
-        inboundGroup.shutdownGracefully();
-        streamingGroup.shutdownGracefully();
+        EventLoopGroup[] groups = new EventLoopGroup[] { acceptGroup, outboundGroup, inboundGroup, streamingGroup };
+        for (EventLoopGroup group : groups)
+            group.shutdownGracefully();
+        for (EventLoopGroup group : groups)
+            group.awaitTermination(60, TimeUnit.SECONDS);
     }
 
     static Lz4FrameEncoder createLz4Encoder(int protocolVersion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/schema/SchemaEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java b/src/java/org/apache/cassandra/schema/SchemaEvent.java
index e26cee5..00c8136 100644
--- a/src/java/org/apache/cassandra/schema/SchemaEvent.java
+++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java
@@ -29,13 +29,14 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapDifference;
 
 import org.apache.cassandra.diag.DiagnosticEvent;
 import org.apache.cassandra.utils.Pair;
 
-final class SchemaEvent extends DiagnosticEvent
+public final class SchemaEvent extends DiagnosticEvent
 {
     private final SchemaEventType type;
 
@@ -62,7 +63,7 @@ final class SchemaEvent extends DiagnosticEvent
     @Nullable
     private final MapDifference<String,TableMetadata> indexesDiff;
 
-    enum SchemaEventType
+    public enum SchemaEventType
     {
         KS_METADATA_LOADED,
         KS_METADATA_RELOADED,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b32f67e..1a54e75 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,15 +18,11 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -78,6 +74,7 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -169,15 +166,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              .maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000))
                                              .build();
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     public void start()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 479470c..5eeaf20 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -27,9 +26,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.util.concurrent.Futures;
 
 import org.slf4j.Logger;
@@ -53,6 +49,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 
 public class CacheService implements CacheServiceMBean
@@ -88,16 +85,7 @@ public class CacheService implements CacheServiceMBean
 
     private CacheService()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 
         keyCache = initKeyCache();
         rowCache = initRowCache();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f0b2dc1..592419a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -569,16 +569,7 @@ public class CassandraDaemon
         {
             applyConfig();
 
-            try
-            {
-                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                mbs.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), new ObjectName(MBEAN_NAME));
-            }
-            catch (Exception e)
-            {
-                logger.error("error registering MBean {}", MBEAN_NAME, e);
-                //Allow the server to start even if the bean can't be registered
-            }
+            MBeanWrapper.instance.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), MBEAN_NAME, MBeanWrapper.OnException.LOG);
 
             if (FBUtilities.isWindows)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 7b6bd58..a3f6b52 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class PendingRangeCalculatorService
 {
     public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
@@ -117,4 +119,11 @@ public class PendingRangeCalculatorService
     {
         StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName);
     }
+
+    @VisibleForTesting
+    public void shutdownExecutor() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org