You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/11/16 18:42:45 UTC

[1/3] cassandra git commit: Introduce in-jvm distributed tests

Repository: cassandra
Updated Branches:
  refs/heads/trunk 787703508 -> f22fec927


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/MessageFilters.java b/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
new file mode 100644
index 0000000..f488e08
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/MessageFilters.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.function.BiConsumer;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+
+public class MessageFilters
+{
+    private final TestCluster cluster;
+    private final Set<Filter> filters = new CopyOnWriteArraySet<>();
+
+    public MessageFilters(TestCluster cluster)
+    {
+        this.cluster = cluster;
+    }
+
+    BiConsumer<InetAddressAndPort, Message> filter(BiConsumer<InetAddressAndPort, Message> applyIfNotFiltered)
+    {
+        return (toAddress, message) ->
+        {
+            int from = cluster.get(message.from).config.num;
+            int to = cluster.get(toAddress).config.num;
+            int verb = message.verb;
+            for (Filter filter : filters)
+            {
+                if (filter.matches(from, to, verb))
+                    return;
+            }
+
+            applyIfNotFiltered.accept(toAddress, message);
+        };
+    }
+
+    public class Filter
+    {
+        final int[] from;
+        final int[] to;
+        final int[] verbs;
+
+        Filter(int[] from, int[] to, int[] verbs)
+        {
+            if (from != null)
+            {
+                from = from.clone();
+                Arrays.sort(from);
+            }
+            if (to != null)
+            {
+                to = to.clone();
+                Arrays.sort(to);
+            }
+            if (verbs != null)
+            {
+                verbs = verbs.clone();
+                Arrays.sort(verbs);
+            }
+            this.from = from;
+            this.to = to;
+            this.verbs = verbs;
+        }
+
+        public int hashCode()
+        {
+            return (from == null ? 0 : Arrays.hashCode(from))
+                    + (to == null ? 0 : Arrays.hashCode(to))
+                    + (verbs == null ? 0 : Arrays.hashCode(verbs));
+        }
+
+        public boolean equals(Object that)
+        {
+            return that instanceof Filter && equals((Filter) that);
+        }
+
+        public boolean equals(Filter that)
+        {
+            return Arrays.equals(from, that.from)
+                    && Arrays.equals(to, that.to)
+                    && Arrays.equals(verbs, that.verbs);
+        }
+
+        public boolean matches(int from, int to, int verb)
+        {
+            return (this.from == null || Arrays.binarySearch(this.from, from) >= 0)
+                    && (this.to == null || Arrays.binarySearch(this.to, to) >= 0)
+                    && (this.verbs == null || Arrays.binarySearch(this.verbs, verb) >= 0);
+        }
+
+        public Filter restore()
+        {
+            filters.remove(this);
+            return this;
+        }
+
+        public Filter drop()
+        {
+            filters.add(this);
+            return this;
+        }
+    }
+
+    public class Builder
+    {
+        int[] from;
+        int[] to;
+        int[] verbs;
+
+        private Builder(int[] verbs)
+        {
+            this.verbs = verbs;
+        }
+
+        public Builder from(int ... nums)
+        {
+            from = nums;
+            return this;
+        }
+
+        public Builder to(int ... nums)
+        {
+            to = nums;
+            return this;
+        }
+
+        public Filter ready()
+        {
+            return new Filter(from, to, verbs);
+        }
+
+        public Filter drop()
+        {
+            return ready().drop();
+        }
+    }
+
+    public Builder verbs(MessagingService.Verb ... verbs)
+    {
+        int[] ids = new int[verbs.length];
+        for (int i = 0 ; i < verbs.length ; ++i)
+            ids[i] = verbs[i].getId();
+        return new Builder(ids);
+    }
+
+    public Builder allVerbs()
+    {
+        return new Builder(null);
+    }
+
+    public void reset()
+    {
+        filters.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/RowUtil.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/RowUtil.java b/test/distributed/org/apache/cassandra/distributed/RowUtil.java
new file mode 100644
index 0000000..bce896d
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/RowUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+public class RowUtil
+{
+    public static Object[][] toObjects(ResultMessage.Rows rows)
+    {
+        Object[][] result = new Object[rows.result.rows.size()][];
+        List<ColumnSpecification> specs = rows.result.metadata.names;
+        for (int i = 0; i < rows.result.rows.size(); i++)
+        {
+            List<ByteBuffer> row = rows.result.rows.get(i);
+            result[i] = new Object[row.size()];
+            for (int j = 0; j < row.size(); j++)
+            {
+                ByteBuffer bb = row.get(j);
+
+                if (bb != null)
+                    result[i][j] = specs.get(j).type.getSerializer().deserialize(bb);
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/TestCluster.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/TestCluster.java b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
new file mode 100644
index 0000000..2b979ee
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaEvent;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+/**
+ * TestCluster creates, initializes and manages Cassandra instances ({@link Instance}.
+ *
+ * All instances created under the same cluster will have a shared ClassLoader that'll preload
+ * common classes required for configuration and communication (byte buffers, primitives, config
+ * objects etc). Shared classes are listed in {@link InstanceClassLoader#commonClasses}.
+ *
+ * Each instance has its own class loader that will load logging, yaml libraries and all non-shared
+ * Cassandra package classes. The rule of thumb is that we'd like to have all Cassandra-specific things
+ * (unless explitily shared through the common classloader) on a per-classloader basis in order to
+ * allow creating more than one instance of DatabaseDescriptor and other Cassandra singletones.
+ *
+ * All actions (reading, writing, schema changes, etc) are executed by serializing lambda/runnables,
+ * transferring them to instance-specific classloaders, deserializing and running them there. Most of
+ * the things can be simply captured in closure or passed through `apply` method of the wrapped serializable
+ * function/callable. You can use {@link InvokableInstance#{applies|runs|consumes}OnInstance} for executing
+ * code on specific instance.
+ *
+ * Each instance has its own logger. Each instance log line will contain INSTANCE{instance_id}.
+ *
+ * As of today, messaging is faked by hooking into MessagingService, so we're not using usual Cassandra
+ * handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
+ * coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
+ */
+public class TestCluster implements AutoCloseable
+{
+    private static ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("cluster-async-tasks"));
+
+    private final File root;
+    private final List<Instance> instances;
+    private final Coordinator coordinator;
+    private final Map<InetAddressAndPort, Instance> instanceMap;
+    private final MessageFilters filters;
+
+    private TestCluster(File root, List<Instance> instances)
+    {
+        this.root = root;
+        this.instances = instances;
+        this.instanceMap = new HashMap<>();
+        this.coordinator = new Coordinator(instances.get(0));
+        this.filters = new MessageFilters(this);
+    }
+
+    void launch()
+    {
+        FBUtilities.waitOnFutures(instances.stream()
+                .map(i -> exec.submit(() -> i.launch(this)))
+                .collect(Collectors.toList())
+        );
+        for (Instance instance : instances)
+            instanceMap.put(instance.getBroadcastAddress(), instance);
+    }
+
+    public int size()
+    {
+        return instances.size();
+    }
+
+    public Coordinator coordinator()
+    {
+        return coordinator;
+    }
+
+    /**
+     * WARNING: we index from 1 here, for consistency with inet address!
+     */
+    public Instance get(int idx)
+    {
+        return instances.get(idx - 1);
+    }
+
+    public Instance get(InetAddressAndPort addr)
+    {
+        return instanceMap.get(addr);
+    }
+
+    MessageFilters filters()
+    {
+        return filters;
+    }
+
+    MessageFilters.Builder verbs(MessagingService.Verb ... verbs)
+    {
+        return filters.verbs(verbs);
+    }
+
+    public void disableAutoCompaction(String keyspace)
+    {
+        for (Instance instance : instances)
+        {
+            instance.runOnInstance(() -> {
+                for (ColumnFamilyStore cs : Keyspace.open(keyspace).getColumnFamilyStores())
+                    cs.disableAutoCompaction();
+            });
+        }
+    }
+
+    public void schemaChange(String query)
+    {
+        try (SchemaChangeMonitor monitor = new SchemaChangeMonitor())
+        {
+            // execute the schema change
+            coordinator().execute(query, ConsistencyLevel.ALL);
+            monitor.waitForAgreement();
+        }
+    }
+
+    /**
+     * Will wait for a schema change AND agreement that occurs after it is created
+     * (and precedes the invocation to waitForAgreement)
+     *
+     * Works by simply checking if all UUIDs agree after any schema version change event,
+     * so long as the waitForAgreement method has been entered (indicating the change has
+     * taken place on the coordinator)
+     *
+     * This could perhaps be made a little more robust, but this should more than suffice.
+     */
+    public class SchemaChangeMonitor implements AutoCloseable
+    {
+        final List<Runnable> cleanup;
+        volatile boolean schemaHasChanged;
+        final SimpleCondition agreement = new SimpleCondition();
+
+        public SchemaChangeMonitor()
+        {
+            this.cleanup = new ArrayList<>(instances.size());
+            for (Instance instance : instances)
+            {
+                cleanup.add(
+                        instance.appliesOnInstance(
+                                (Runnable runnable) -> {
+                                    Consumer<SchemaEvent> consumer = event -> runnable.run();
+                                    DiagnosticEventService.instance().subscribe(SchemaEvent.class, SchemaEvent.SchemaEventType.VERSION_UPDATED, consumer);
+                                    return (Runnable) () -> DiagnosticEventService.instance().unsubscribe(SchemaEvent.class, consumer);
+                                }
+                        ).apply(this::signal)
+                );
+            }
+        }
+
+        private void signal()
+        {
+            if (schemaHasChanged && 1 == instances.stream().map(Instance::getSchemaVersion).distinct().count())
+                agreement.signalAll();
+        }
+
+        @Override
+        public void close()
+        {
+            for (Runnable runnable : cleanup)
+                runnable.run();
+        }
+
+        public void waitForAgreement()
+        {
+            schemaHasChanged = true;
+            signal();
+            try
+            {
+                agreement.await(1L, TimeUnit.MINUTES);
+            } catch (InterruptedException e)
+            {
+                throw new IllegalStateException("Schema agreement not reached");
+            }
+        }
+    }
+
+    public void schemaChange(String statement, int instance)
+    {
+        get(instance).schemaChange(statement);
+    }
+
+    public static TestCluster create(int nodeCount) throws Throwable
+    {
+        return create(nodeCount, Files.createTempDirectory("dtests").toFile());
+    }
+
+    public static TestCluster create(int nodeCount, File root)
+    {
+        root.mkdirs();
+        setupLogging(root);
+
+        IntFunction<ClassLoader> classLoaderFactory = InstanceClassLoader.createFactory(
+                (URLClassLoader) Thread.currentThread().getContextClassLoader());
+        List<Instance> instances = new ArrayList<>();
+        long token = Long.MIN_VALUE + 1, increment = 2 * (Long.MAX_VALUE / nodeCount);
+        for (int i = 0 ; i < nodeCount ; ++i)
+        {
+            InstanceConfig instanceConfig = InstanceConfig.generate(i + 1, root, String.valueOf(token));
+            instances.add(new Instance(instanceConfig, classLoaderFactory.apply(i + 1)));
+            token += increment;
+        }
+
+        TestCluster cluster = new TestCluster(root, instances);
+        cluster.launch();
+        return cluster;
+    }
+
+    private static void setupLogging(File root)
+    {
+        try
+        {
+            String testConfPath = "test/conf/logback-dtest.xml";
+            Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");
+            if (!logConfPath.toFile().exists())
+            {
+                Files.copy(new File(testConfPath).toPath(),
+                           logConfPath);
+            }
+            System.setProperty("logback.configurationFile", "file://" + logConfPath);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        List<Future<?>> futures = instances.stream()
+                .map(i -> exec.submit(i::shutdown))
+                .collect(Collectors.toList());
+
+//        withThreadLeakCheck(futures);
+
+        // Make sure to only delete directory when threads are stopped
+        exec.submit(() -> {
+            FBUtilities.waitOnFutures(futures);
+            FileUtils.deleteRecursive(root);
+        });
+    }
+
+    // We do not want this check to run every time until we fix problems with tread stops
+    private void withThreadLeakCheck(List<Future<?>> futures)
+    {
+        FBUtilities.waitOnFutures(futures);
+
+        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
+        threadSet = Sets.difference(threadSet, Collections.singletonMap(Thread.currentThread(), null).keySet());
+        if (!threadSet.isEmpty())
+        {
+            for (Thread thread : threadSet)
+            {
+                System.out.println(thread);
+                System.out.println(Arrays.toString(thread.getStackTrace()));
+            }
+            throw new RuntimeException(String.format("Not all threads have shut down. %d threads are still running: %s", threadSet.size(), threadSet));
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
index 4607d5c..18d17e8 100644
--- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
+++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
@@ -78,7 +78,7 @@ public class NettyFactoryTest
     }
 
     @After
-    public void tearDown()
+    public void tearDown() throws Exception
     {
         if (factory != null)
             factory.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
index fff7b17..e274f27 100644
--- a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
@@ -127,9 +127,5 @@ public class StreamCompressionSerializerTest
         {
             return true;
         }
-
-        @Override
-        public void close()
-        {   }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/3] cassandra git commit: Introduce in-jvm distributed tests

Posted by if...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 630dc5d..439d8cf 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.io.File;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.util.*;
@@ -26,9 +25,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.CacheLoader;
@@ -123,16 +119,7 @@ public class StorageProxy implements StorageProxyMBean
 
     static
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
+        MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
         HintsService.instance.registerMBean();
         HintedHandOffManager.instance.registerMBean();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e7ca4be..68dba93 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -231,7 +231,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private final List<IEndpointLifecycleSubscriber> lifecycleSubscribers = new CopyOnWriteArrayList<>();
 
-    private final ObjectName jmxObjectName;
+    private final String jmxObjectName;
 
     private Collection<Token> bootstrapTokens = null;
 
@@ -270,17 +270,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // use dedicated executor for handling JMX notifications
         super(JMXBroadcastExecutor.executor);
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService");
-            mbs.registerMBean(this, jmxObjectName);
-            mbs.registerMBean(StreamManager.instance, new ObjectName(StreamManager.OBJECT_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        jmxObjectName = "org.apache.cassandra.db:type=StorageService";
+        MBeanWrapper.instance.registerMBean(this, jmxObjectName);
+        MBeanWrapper.instance.registerMBean(StreamManager.instance, StreamManager.OBJECT_NAME);
 
         ReadCommandVerbHandler readHandler = new ReadCommandVerbHandler();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 151eb18..50d35bc 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -471,6 +471,32 @@ public class ByteBufferUtil
         return bytes.getDouble(bytes.position());
     }
 
+    public static ByteBuffer objectToBytes(Object obj)
+    {
+        if (obj instanceof Integer)
+            return ByteBufferUtil.bytes((int) obj);
+        else if (obj instanceof Byte)
+            return ByteBufferUtil.bytes((byte) obj);
+        else if (obj instanceof Short)
+            return ByteBufferUtil.bytes((short) obj);
+        else if (obj instanceof Long)
+            return ByteBufferUtil.bytes((long) obj);
+        else if (obj instanceof Float)
+            return ByteBufferUtil.bytes((float) obj);
+        else if (obj instanceof Double)
+            return ByteBufferUtil.bytes((double) obj);
+        else if (obj instanceof UUID)
+            return ByteBufferUtil.bytes((UUID) obj);
+        else if (obj instanceof InetAddress)
+            return ByteBufferUtil.bytes((InetAddress) obj);
+        else if (obj instanceof String)
+            return ByteBufferUtil.bytes((String) obj);
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+
     public static ByteBuffer bytes(byte b)
     {
         return ByteBuffer.allocate(1).put(0, b);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/MBeanWrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
new file mode 100644
index 0000000..edee6af
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.lang.management.ManagementFactory;
+import java.util.function.Consumer;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to avoid catching and rethrowing checked exceptions on MBean and
+ * allow turning of MBean registration for test purposes.
+ */
+public interface MBeanWrapper
+{
+    static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class);
+
+    static final MBeanWrapper instance = Boolean.getBoolean("org.apache.cassandra.disable_mbean_registration") ?
+                                         new NoOpMBeanWrapper() :
+                                         new PlatformMBeanWrapper();
+
+    // Passing true for graceful will log exceptions instead of rethrowing them
+    public void registerMBean(Object obj, ObjectName mbeanName, OnException onException);
+    default void registerMBean(Object obj, ObjectName mbeanName)
+    {
+        registerMBean(obj, mbeanName, OnException.THROW);
+    }
+
+    public void registerMBean(Object obj, String mbeanName, OnException onException);
+    default void registerMBean(Object obj, String mbeanName)
+    {
+        registerMBean(obj, mbeanName, OnException.THROW);
+    }
+
+    public boolean isRegistered(ObjectName mbeanName, OnException onException);
+    default boolean isRegistered(ObjectName mbeanName)
+    {
+        return isRegistered(mbeanName, OnException.THROW);
+    }
+
+    public boolean isRegistered(String mbeanName, OnException onException);
+    default boolean isRegistered(String mbeanName)
+    {
+        return isRegistered(mbeanName, OnException.THROW);
+    }
+
+    public void unregisterMBean(ObjectName mbeanName, OnException onException);
+    default void unregisterMBean(ObjectName mbeanName)
+    {
+        unregisterMBean(mbeanName, OnException.THROW);
+    }
+
+    public void unregisterMBean(String mbeanName, OnException onException);
+    default void unregisterMBean(String mbeanName)
+    {
+        unregisterMBean(mbeanName, OnException.THROW);
+    }
+
+    static class NoOpMBeanWrapper implements MBeanWrapper
+    {
+        public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {}
+        public void registerMBean(Object obj, String mbeanName, OnException onException) {}
+        public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false; }
+        public boolean isRegistered(String mbeanName, OnException onException) { return false; }
+        public void unregisterMBean(ObjectName mbeanName, OnException onException) {}
+        public void unregisterMBean(String mbeanName, OnException onException) {}
+    }
+
+    static class PlatformMBeanWrapper implements MBeanWrapper
+    {
+        private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        public void registerMBean(Object obj, ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.registerMBean(obj, mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public void registerMBean(Object obj, String mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.registerMBean(obj, new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public boolean isRegistered(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                return mbs.isRegistered(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+            return false;
+        }
+
+        public boolean isRegistered(String mbeanName, OnException onException)
+        {
+            try
+            {
+                return mbs.isRegistered(new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+            return false;
+        }
+
+        public void unregisterMBean(ObjectName mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.unregisterMBean(mbeanName);
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+
+        public void unregisterMBean(String mbeanName, OnException onException)
+        {
+            try
+            {
+                mbs.unregisterMBean(new ObjectName(mbeanName));
+            }
+            catch (Exception e)
+            {
+                onException.handler.accept(e);
+            }
+        }
+    }
+
+    public enum OnException
+    {
+        THROW(e -> { throw new RuntimeException(e); }),
+        LOG(e -> { logger.error("Error in MBean wrapper: ", e); }),
+        IGNORE(e -> {});
+
+        private Consumer<Exception> handler;
+        OnException(Consumer<Exception> handler)
+        {
+            this.handler = handler;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/Mx4jTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Mx4jTool.java b/src/java/org/apache/cassandra/utils/Mx4jTool.java
index 5baaea2..eda6354 100644
--- a/src/java/org/apache/cassandra/utils/Mx4jTool.java
+++ b/src/java/org/apache/cassandra/utils/Mx4jTool.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.utils;
 
-import java.lang.management.ManagementFactory;
-import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.lang3.StringUtils;
@@ -44,7 +42,7 @@ public class Mx4jTool
         try
         {
             logger.trace("Will try to load mx4j now, if it's in the classpath");
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            MBeanWrapper mbs = MBeanWrapper.instance;
             ObjectName processorName = new ObjectName("Server:name=XSLTProcessor");
 
             Class<?> httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index b69e6bf..1a17a1f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -31,6 +31,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -354,11 +355,10 @@ public final class Ref<T> implements RefCounted<T>
     static final Set<Class<?>> concurrentIterables = Collections.newSetFromMap(new IdentityHashMap<>());
     private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<>());
     static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
-    private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("Reference-Reaper"));
+    private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("Reference-Reaper", Ref::reapOneReference).start();
     static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : Executors.newScheduledThreadPool(1, new NamedThreadFactory("Strong-Reference-Leak-Detector"));
     static
     {
-        EXEC.execute(new ReferenceReaper());
         if (DEBUG_ENABLED)
         {
             STRONG_LEAK_DETECTOR.scheduleAtFixedRate(new Visitor(), 1, 15, TimeUnit.MINUTES);
@@ -367,28 +367,12 @@ public final class Ref<T> implements RefCounted<T>
         concurrentIterables.addAll(Arrays.asList(concurrentIterableClasses));
     }
 
-    static final class ReferenceReaper implements Runnable
+    private static void reapOneReference() throws InterruptedException
     {
-        public void run()
+        Object obj = referenceQueue.remove(100);
+        if (obj instanceof Ref.State)
         {
-            try
-            {
-                while (true)
-                {
-                    Object obj = referenceQueue.remove();
-                    if (obj instanceof Ref.State)
-                    {
-                        ((Ref.State) obj).release(true);
-                    }
-                }
-            }
-            catch (InterruptedException e)
-            {
-            }
-            finally
-            {
-                EXEC.execute(this);
-            }
+            ((Ref.State) obj).release(true);
         }
     }
 
@@ -719,4 +703,11 @@ public final class Ref<T> implements RefCounted<T>
             }
         }
     }
+
+    @VisibleForTesting
+    public static void shutdownReferenceReaper() throws InterruptedException
+    {
+        EXEC.shutdown();
+        EXEC.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index f9ec40c..c8ad078 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -27,11 +27,11 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.util.concurrent.FastThreadLocal;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.FileUtils;
@@ -493,34 +493,16 @@ public class BufferPool
     private static final ConcurrentLinkedQueue<LocalPoolRef> localPoolReferences = new ConcurrentLinkedQueue<>();
 
     private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
-    private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("LocalPool-Cleaner"));
-    static
+    private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start();
+
+    private static void cleanupOneReference() throws InterruptedException
     {
-        EXEC.execute(new Runnable()
+        Object obj = localPoolRefQueue.remove(100);
+        if (obj instanceof LocalPoolRef)
         {
-            public void run()
-            {
-                try
-                {
-                    while (true)
-                    {
-                        Object obj = localPoolRefQueue.remove();
-                        if (obj instanceof LocalPoolRef)
-                        {
-                            ((LocalPoolRef) obj).release();
-                            localPoolReferences.remove(obj);
-                        }
-                    }
-                }
-                catch (InterruptedException e)
-                {
-                }
-                finally
-                {
-                    EXEC.execute(this);
-                }
-            }
-        });
+            ((LocalPoolRef) obj).release();
+            localPoolReferences.remove(obj);
+        }
     }
 
     private static ByteBuffer allocateDirectAligned(int capacity)
@@ -872,4 +854,11 @@ public class BufferPool
         int mask = unit - 1;
         return (size + mask) & ~mask;
     }
+
+    @VisibleForTesting
+    public static void shutdownLocalCleaner() throws InterruptedException
+    {
+        EXEC.shutdown();
+        EXEC.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index 5a90463..b905d2c 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -18,57 +18,70 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 /**
  * A thread that reclaims memory from a MemtablePool on demand.  The actual reclaiming work is delegated to the
  * cleaner Runnable, e.g., FlushLargestColumnFamily
  */
-class MemtableCleanerThread<P extends MemtablePool> extends Thread
+public class MemtableCleanerThread<P extends MemtablePool> extends InfiniteLoopExecutor
 {
-    /** The pool we're cleaning */
-    final P pool;
-
-    /** should ensure that at least some memory has been marked reclaiming after completion */
-    final Runnable cleaner;
+    private static class Clean<P extends MemtablePool> implements InterruptibleRunnable
+    {
+        /** The pool we're cleaning */
+        final P pool;
 
-    /** signalled whenever needsCleaning() may return true */
-    final WaitQueue wait = new WaitQueue();
+        /** should ensure that at least some memory has been marked reclaiming after completion */
+        final Runnable cleaner;
 
-    MemtableCleanerThread(P pool, Runnable cleaner)
-    {
-        super(pool.getClass().getSimpleName() + "Cleaner");
-        this.pool = pool;
-        this.cleaner = cleaner;
-        setDaemon(true);
-    }
+        /** signalled whenever needsCleaning() may return true */
+        final WaitQueue wait = new WaitQueue();
 
-    boolean needsCleaning()
-    {
-        return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
-    }
+        private Clean(P pool, Runnable cleaner)
+        {
+            this.pool = pool;
+            this.cleaner = cleaner;
+        }
 
-    // should ONLY be called when we really think it already needs cleaning
-    void trigger()
-    {
-        wait.signal();
-    }
+        boolean needsCleaning()
+        {
+            return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
+        }
 
-    @Override
-    public void run()
-    {
-        while (true)
+        @Override
+        public void run() throws InterruptedException
         {
-            while (!needsCleaning())
+            if (needsCleaning())
+            {
+                cleaner.run();
+            }
+            else
             {
                 final WaitQueue.Signal signal = wait.register();
                 if (!needsCleaning())
-                    signal.awaitUninterruptibly();
+                    signal.await();
                 else
                     signal.cancel();
             }
-
-            cleaner.run();
         }
     }
+
+    private final Runnable trigger;
+    private MemtableCleanerThread(Clean<P> clean)
+    {
+        super(clean.pool.getClass().getSimpleName() + "Cleaner", clean);
+        this.trigger = clean.wait::signal;
+    }
+
+    MemtableCleanerThread(P pool, Runnable cleaner)
+    {
+        this(new Clean<>(pool, cleaner));
+    }
+
+    // should ONLY be called when we really think it already needs cleaning
+    public void trigger()
+    {
+        trigger.run();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index c082856..684db93 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -18,8 +18,11 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
@@ -63,6 +66,12 @@ public abstract class MemtablePool
         return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
     }
 
+    public void shutdown() throws InterruptedException
+    {
+        cleaner.shutdown();
+        cleaner.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     public abstract MemtableAllocator newAllocator();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/conf/logback-dtest.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
new file mode 100644
index 0000000..b8019f6
--- /dev/null
+++ b/test/conf/logback-dtest.xml
@@ -0,0 +1,79 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+  <define name="instance_id" class="org.apache.cassandra.distributed.InstanceIDDefiner" />
+
+  <!-- Shutdown hook ensures that async appender flushes -->
+  <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+  <!-- Status listener is used to wrap stdout/stderr and tee to log file -->
+  <statusListener class="org.apache.cassandra.LogbackStatusListener" />
+
+  <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+
+    <file>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log</file>
+    <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+      <fileNamePattern>./build/test/logs/${cassandra.testtag}/TEST-${suitename}.log.%i.gz</fileNamePattern>
+      <minIndex>1</minIndex>
+      <maxIndex>20</maxIndex>
+    </rollingPolicy>
+
+    <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+      <maxFileSize>20MB</maxFileSize>
+    </triggeringPolicy>
+
+    <encoder>
+      <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern>
+      <immediateFlush>false</immediateFlush>
+    </encoder>
+  </appender>
+
+  <appender name="STDOUT" target="System.out" class="org.apache.cassandra.ConsoleAppender">
+    <encoder>
+      <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>DEBUG</level>
+    </filter>
+  </appender>
+
+  <appender name="TEE" class="org.apache.cassandra.TeeingAppender">
+      <appender-ref ref="FILE"/>
+      <appender-ref ref="STDOUT"/>
+  </appender>
+
+  <logger name="org.apache.hadoop" level="WARN"/>
+
+  <logger name="org.apache.cassandra.db.monitoring" level="DEBUG"/>
+
+  <!-- Do not change the name of this appender. LogbackStatusListener uses the thread name
+       tied to the appender name to know when to write to real stdout/stderr vs forwarding to logback -->
+  <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
+      <discardingThreshold>0</discardingThreshold>
+      <maxFlushTime>0</maxFlushTime>
+      <queueSize>1024</queueSize>
+      <appender-ref ref="TEE"/>
+      <includeCallerData>true</includeCallerData>
+  </appender>
+
+  <root level="DEBUG">
+    <appender-ref ref="ASYNC" />
+  </root>
+</configuration>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Coordinator.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/Coordinator.java
new file mode 100644
index 0000000..91ab480
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Coordinator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+public class Coordinator
+{
+    final Instance instance;
+
+    public Coordinator(Instance instance)
+    {
+        this.instance = instance;
+    }
+
+    private static Object[][] coordinatorExecute(String query, int consistencyLevel, Object[] bindings)
+    {
+        CQLStatement prepared = QueryProcessor.getStatement(query, ClientState.forInternalCalls());
+        List<ByteBuffer> boundValues = new ArrayList<>();
+        for (Object binding : bindings)
+        {
+            boundValues.add(ByteBufferUtil.objectToBytes(binding));
+        }
+
+        ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
+                                             QueryOptions.create(ConsistencyLevel.fromCode(consistencyLevel),
+                                                                 boundValues,
+                                                                 false,
+                                                                 10,
+                                                                 null,
+                                                                 null,
+                                                                 ProtocolVersion.V4,
+                                                                 null),
+                                             System.nanoTime());
+
+        if (res != null && res.kind == ResultMessage.Kind.ROWS)
+        {
+            return RowUtil.toObjects((ResultMessage.Rows) res);
+        }
+        else
+        {
+            return new Object[][]{};
+        }
+    }
+
+    public Object[][] execute(String query, ConsistencyLevel consistencyLevel, Object... boundValues)
+    {
+        return instance.appliesOnInstance(Coordinator::coordinatorExecute).apply(query, consistencyLevel.code, boundValues);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
new file mode 100644
index 0000000..a61c8af
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/DistributedReadWritePathTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+
+import static org.apache.cassandra.net.MessagingService.Verb.READ_REPAIR;
+
+public class DistributedReadWritePathTest extends DistributedTestBase
+{
+    @Test
+    public void coordinatorRead() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                     ConsistencyLevel.ALL,
+                                                     1),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+        }
+    }
+
+    @Test
+    public void coordinatorWrite() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='none'");
+
+            cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)",
+                                          ConsistencyLevel.QUORUM);
+
+            for (int i = 0; i < 3; i++)
+            {
+                assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                           row(1, 1, 1));
+            }
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void readRepairTest() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+
+            // Verify that data got repaired to the third node
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void failingReadRepairTest() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            cluster.verbs(READ_REPAIR).to(3).drop();
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+
+            // Data was not repaired
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+        }
+    }
+
+    @Test
+    public void writeWithSchemaDisagreement() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
+                                              ConsistencyLevel.QUORUM);
+            }
+            catch (RuntimeException e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getMessage().contains("Exception occurred on the node"));
+            Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown column v2 during deserialization"));
+        }
+    }
+
+    @Test
+    public void readWithSchemaDisagreement() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+            cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)");
+
+            // Introduce schema disagreement
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
+
+            Exception thrown = null;
+            try
+            {
+                assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                         ConsistencyLevel.ALL),
+                           row(1, 1, 1, null));
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+            Assert.assertTrue(thrown.getMessage().contains("Exception occurred on the node"));
+            Assert.assertTrue(thrown.getCause().getMessage().contains("Unknown column v2 during deserialization"));
+        }
+    }
+
+    @Test
+    public void reAddColumnAsStatic() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            for (int i = 1; i <= 3; i++)
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, i, i);
+            }
+
+            // Drop column
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1");
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1),
+                       row(1, 2),
+                       row(1, 3));
+
+            // Drop column
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static");
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, null),
+                       row(1, 2, null),
+                       row(1, 3, null));
+
+            cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, v1) VALUES (?, ?)",
+                                          ConsistencyLevel.ALL,
+                                          1, 1);
+
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, 1),
+                       row(1, 2, 1),
+                       row(1, 3, 1));
+        }
+    }
+
+    @Test
+    public void reAddColumnAsStaticDisagreementCoordinatorSide() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(3))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))");
+
+            for (int i = 1; i <= 3; i++)
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, i, i);
+            }
+
+            // Drop column
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 1);
+
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                              ConsistencyLevel.ALL);
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is not a subset of"));
+
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static", 1);
+
+            try
+            {
+                cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                              ConsistencyLevel.ALL);
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertTrue(thrown.getCause().getMessage().contains("[v1] is not a subset of"));
+        }
+    }
+
+    @Test
+    public void reAddColumnAsStaticDisagreementReplicaSide() throws Throwable
+    {
+        try (TestCluster cluster = createCluster(2))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            for (int i = 1; i <= 3; i++)
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, i, i);
+            }
+
+            // Drop column on the replica
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v1", 2);
+
+            // Columns are going to be read and read-repaired as long as they're available
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+
+            assertRows(cluster.get(2).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1),
+                       row(1, 2),
+                       row(1, 3));
+
+            // Re-add as static on the replica
+            cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v1 int static", 2);
+
+            // Try reading
+            assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                     ConsistencyLevel.ALL),
+                       row(1, 1, 1),
+                       row(1, 2, 2),
+                       row(1, 3, 3));
+
+            // Make sure read-repair did not corrupt the data
+            assertRows(cluster.get(2).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, null),
+                       row(1, 2, null),
+                       row(1, 3, null));
+
+            // Writing to the replica with disagreeing schema should not work
+            Exception thrown = null;
+            try
+            {
+                cluster.coordinator().execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                              ConsistencyLevel.ALL,
+                                              1, 1, 5);
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertNotNull(thrown);
+
+            thrown = null;
+
+            // If somehow replica got new data, reading that data should not be possible, either
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, ?, ?)",
+                                           1, 1, 100);
+
+            try
+            {
+                assertRows(cluster.coordinator().execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                         ConsistencyLevel.ALL),
+                           row(1, 1, 1),
+                           row(1, 2, 2),
+                           row(1, 3, 3));
+            }
+            catch (Exception e)
+            {
+                thrown = e;
+            }
+
+            Assert.assertNotNull(thrown);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
new file mode 100644
index 0000000..f873fce
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/DistributedTestBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+public class DistributedTestBase
+{
+    static String KEYSPACE = "distributed_test_keyspace";
+
+    @BeforeClass
+    public static void setup()
+    {
+        System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+    }
+
+    TestCluster createCluster(int nodeCount) throws Throwable
+    {
+        TestCluster cluster = TestCluster.create(nodeCount);
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + nodeCount + "};");
+
+        return cluster;
+    }
+
+    public static void assertRows(Object[][] actual, Object[]... expected)
+    {
+        Assert.assertEquals(rowsNotEqualErrorMessage(expected, actual),
+                            expected.length, actual.length);
+
+        for (int i = 0; i < expected.length; i++)
+        {
+            Object[] expectedRow = expected[i];
+            Object[] actualRow = actual[i];
+            Assert.assertTrue(rowsNotEqualErrorMessage(actual, expected),
+                              Arrays.equals(expectedRow, actualRow));
+        }
+    }
+
+    public static String rowsNotEqualErrorMessage(Object[][] actual, Object[][] expected)
+    {
+        return String.format("Expected: %s\nActual:%s\n",
+                             rowsToString(expected),
+                             rowsToString(actual));
+    }
+
+    public static String rowsToString(Object[][] rows)
+    {
+        StringBuilder builder = new StringBuilder();
+        builder.append("[");
+        boolean isFirst = true;
+        for (Object[] row : rows)
+        {
+            if (isFirst)
+                isFirst = false;
+            else
+                builder.append(",");
+            builder.append(Arrays.toString(row));
+        }
+        builder.append("]");
+        return builder.toString();
+    }
+
+    public static Object[] row(Object... expected)
+    {
+        return expected;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Instance.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java b/test/distributed/org/apache/cassandra/distributed/Instance.java
new file mode 100644
index 0000000..f9ee5bb
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.SharedExecutorPool;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryHandler;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.SimpleSeedProvider;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageDeliveryTask;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.MessageInHandler;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.memory.BufferPool;
+
+public class Instance extends InvokableInstance
+{
+    public final InstanceConfig config;
+
+    public Instance(InstanceConfig config, ClassLoader classLoader)
+    {
+        super(classLoader);
+        this.config = config;
+    }
+
+    public InetAddressAndPort getBroadcastAddress() { return callOnInstance(FBUtilities::getBroadcastAddressAndPort); }
+
+    public Object[][] executeInternal(String query, Object... args)
+    {
+        return callOnInstance(() ->
+        {
+            QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
+            ResultMessage result = prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
+                    QueryProcessor.makeInternalOptions(prepared.statement, args));
+
+            if (result instanceof ResultMessage.Rows)
+                return RowUtil.toObjects((ResultMessage.Rows)result);
+            else
+                return null;
+        });
+    }
+
+    public UUID getSchemaVersion()
+    {
+        // we do not use method reference syntax here, because we need to invoke on the node-local schema instance
+        //noinspection Convert2MethodRef
+        return callOnInstance(() -> Schema.instance.getVersion());
+    }
+
+    public void schemaChange(String query)
+    {
+        runOnInstance(() ->
+        {
+            try
+            {
+                ClientState state = ClientState.forInternalCalls(SchemaConstants.SYSTEM_KEYSPACE_NAME);
+                QueryState queryState = new QueryState(state);
+
+                CQLStatement statement = QueryProcessor.parseStatement(query, queryState.getClientState());
+                statement.validate(state);
+
+                QueryOptions options = QueryOptions.forInternalCalls(Collections.emptyList());
+                statement.executeLocally(queryState, options);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Error setting schema for test (query was: " + query + ")", e);
+            }
+        });
+    }
+
+    private void registerMockMessaging(TestCluster cluster)
+    {
+        BiConsumer<InetAddressAndPort, Message> deliverToInstance = (to, message) -> cluster.get(to).receiveMessage(message);
+        BiConsumer<InetAddressAndPort, Message> deliverToInstanceIfNotFiltered = cluster.filters().filter(deliverToInstance);
+
+        acceptsOnInstance((BiConsumer<InetAddressAndPort, Message> deliver) ->
+                MessagingService.instance().addMessageSink(new MessageDeliverySink(deliver))
+        ).accept(deliverToInstanceIfNotFiltered);
+    }
+
+    private static class MessageDeliverySink implements IMessageSink
+    {
+        private final BiConsumer<InetAddressAndPort, Message> deliver;
+        MessageDeliverySink(BiConsumer<InetAddressAndPort, Message> deliver)
+        {
+            this.deliver = deliver;
+        }
+
+        public boolean allowOutgoingMessage(MessageOut messageOut, int id, InetAddressAndPort to)
+        {
+            try (DataOutputBuffer out = new DataOutputBuffer(1024))
+            {
+                InetAddressAndPort from = FBUtilities.getBroadcastAddressAndPort();
+                messageOut.serialize(out, MessagingService.current_version);
+                deliver.accept(to, new Message(messageOut.verb.getId(), out.toByteArray(), id, MessagingService.current_version, from));
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return false;
+        }
+
+        public boolean allowIncomingMessage(MessageIn message, int id)
+        {
+            // we can filter to our heart's content on the outgoing message; no need to worry about incoming
+            return true;
+        }
+    }
+
+    private void receiveMessage(Message message)
+    {
+        acceptsOnInstance((Message m) ->
+        {
+            try (DataInputBuffer in = new DataInputBuffer(m.bytes))
+            {
+                MessageIn<?> messageIn = MessageInHandler.deserialize(in, m.id, m.version, m.from);
+                Runnable deliver = new MessageDeliveryTask(messageIn, m.id);
+                deliver.run();
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException("Exception occurred on the node " + FBUtilities.getBroadcastAddressAndPort(), t);
+            }
+
+        }).accept(message);
+    }
+
+    void launch(TestCluster cluster)
+    {
+        try
+        {
+            mkdirs();
+            int id = config.num;
+            runOnInstance(() -> InstanceIDDefiner.instanceId = id); // for logging
+
+            startup();
+            initializeRing(cluster);
+            registerMockMessaging(cluster);
+        }
+        catch (Throwable t)
+        {
+            if (t instanceof RuntimeException)
+                throw (RuntimeException) t;
+            throw new RuntimeException(t);
+        }
+    }
+
+    private void mkdirs()
+    {
+        new File(config.saved_caches_directory).mkdirs();
+        new File(config.hints_directory).mkdirs();
+        new File(config.commitlog_directory).mkdirs();
+        for (String dir : config.data_file_directories)
+            new File(dir).mkdirs();
+    }
+
+    private void startup()
+    {
+        acceptsOnInstance((InstanceConfig config) ->
+        {
+            DatabaseDescriptor.daemonInitialization(() -> loadConfig(config));
+
+            DatabaseDescriptor.createAllDirectories();
+            Keyspace.setInitialized();
+            SystemKeyspace.persistLocalMetadata();
+        }).accept(config);
+    }
+
+
+    public static Config loadConfig(InstanceConfig overrides)
+    {
+        Config config = new Config();
+        // Defaults
+        config.commitlog_sync = Config.CommitLogSync.batch;
+        config.endpoint_snitch = SimpleSnitch.class.getName();
+        config.seed_provider = new ParameterizedClass(SimpleSeedProvider.class.getName(),
+                                                      Collections.singletonMap("seeds", "127.0.0.1:7010"));
+        config.diagnostic_events_enabled = true; // necessary for schema change monitoring
+
+        // Overrides
+        config.partitioner = overrides.partitioner;
+        config.broadcast_address = overrides.broadcast_address;
+        config.listen_address = overrides.listen_address;
+        config.broadcast_rpc_address = overrides.broadcast_rpc_address;
+        config.rpc_address = overrides.rpc_address;
+        config.saved_caches_directory = overrides.saved_caches_directory;
+        config.data_file_directories = overrides.data_file_directories;
+        config.commitlog_directory = overrides.commitlog_directory;
+        config.hints_directory = overrides.hints_directory;
+        config.cdc_raw_directory = overrides.cdc_directory;
+        config.concurrent_writes = overrides.concurrent_writes;
+        config.concurrent_counter_writes = overrides.concurrent_counter_writes;
+        config.concurrent_materialized_view_writes = overrides.concurrent_materialized_view_writes;
+        config.concurrent_reads = overrides.concurrent_reads;
+        config.memtable_flush_writers = overrides.memtable_flush_writers;
+        config.concurrent_compactors = overrides.concurrent_compactors;
+        config.memtable_heap_space_in_mb = overrides.memtable_heap_space_in_mb;
+        config.initial_token = overrides.initial_token;
+        return config;
+    }
+
+    private void initializeRing(TestCluster cluster)
+    {
+        // This should be done outside instance in order to avoid serializing config
+        String partitionerName = config.partitioner;
+        List<String> initialTokens = new ArrayList<>();
+        List<InetAddressAndPort> hosts = new ArrayList<>();
+        List<UUID> hostIds = new ArrayList<>();
+        for (int i = 1 ; i <= cluster.size() ; ++i)
+        {
+            InstanceConfig config = cluster.get(i).config;
+            initialTokens.add(config.initial_token);
+            try
+            {
+                hosts.add(InetAddressAndPort.getByName(config.broadcast_address));
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+            hostIds.add(config.hostId);
+        }
+
+        runOnInstance(() ->
+        {
+            try
+            {
+                IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName);
+                StorageService storageService = StorageService.instance;
+                List<Token> tokens = new ArrayList<>();
+                for (String token : initialTokens)
+                    tokens.add(partitioner.getTokenFactory().fromString(token));
+
+                for (int i = 0; i < tokens.size(); i++)
+                {
+                    InetAddressAndPort ep = hosts.get(i);
+                    Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1);
+                    Gossiper.instance.injectApplicationState(ep,
+                            ApplicationState.TOKENS,
+                            new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(tokens.get(i))));
+                    storageService.onChange(ep,
+                            ApplicationState.STATUS_WITH_PORT,
+                            new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
+                    storageService.onChange(ep,
+                            ApplicationState.STATUS,
+                            new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
+                    Gossiper.instance.realMarkAlive(ep, Gossiper.instance.getEndpointStateForEndpoint(ep));
+                    MessagingService.instance().setVersion(ep, MessagingService.current_version);
+                }
+
+                // check that all nodes are in token metadata
+                for (int i = 0; i < tokens.size(); ++i)
+                    assert storageService.getTokenMetadata().isMember(hosts.get(i));
+            }
+            catch (Throwable e) // UnknownHostException
+            {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    void shutdown()
+    {
+        runOnInstance(() -> {
+            Throwable error = null;
+            error = runAndMergeThrowable(error,
+                    BatchlogManager.instance::shutdown,
+                    HintsService.instance::shutdownBlocking,
+                    CommitLog.instance::shutdownBlocking,
+                    CompactionManager.instance::forceShutdown,
+                    Gossiper.instance::stop,
+                    SecondaryIndexManager::shutdownExecutors,
+                    MessagingService.instance()::shutdown,
+                    ColumnFamilyStore::shutdownFlushExecutor,
+                    ColumnFamilyStore::shutdownPostFlushExecutor,
+                    ColumnFamilyStore::shutdownReclaimExecutor,
+                    ColumnFamilyStore::shutdownPerDiskFlushExecutors,
+                    PendingRangeCalculatorService.instance::shutdownExecutor,
+                    BufferPool::shutdownLocalCleaner,
+                    Ref::shutdownReferenceReaper,
+                    StageManager::shutdownAndWait,
+                    SharedExecutorPool::shutdownSharedPool,
+                    Memtable.MEMORY_POOL::shutdown,
+                    ScheduledExecutors::shutdownAndWait);
+            error = shutdownAndWait(error, ActiveRepairService.repairCommandExecutor);
+            Throwables.maybeFail(error);
+        });
+    }
+
+    private static Throwable shutdownAndWait(Throwable existing, ExecutorService executor)
+    {
+        return runAndMergeThrowable(existing, () -> {
+            executor.shutdownNow();
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+            assert executor.isTerminated() && executor.isShutdown() : executor;
+        });
+    }
+
+    private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable runnable)
+    {
+        try
+        {
+            runnable.run();
+        }
+        catch (Throwable t)
+        {
+            return Throwables.merge(existing, t);
+        }
+
+        return existing;
+    }
+
+    private static Throwable runAndMergeThrowable(Throwable existing, ThrowingRunnable ... runnables)
+    {
+        for (ThrowingRunnable runnable : runnables)
+        {
+            try
+            {
+                runnable.run();
+            }
+            catch (Throwable t)
+            {
+                existing = Throwables.merge(existing, t);
+            }
+        }
+        return existing;
+    }
+
+    public static interface ThrowingRunnable
+    {
+        public void run() throws Throwable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
new file mode 100644
index 0000000..6349d5a
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceClassLoader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import com.google.common.base.Predicate;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.Pair;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.IntFunction;
+
+public class InstanceClassLoader extends URLClassLoader
+{
+    // Classes that have to be shared between instances, for configuration or returning values
+    private final static Class<?>[] commonClasses = new Class[]
+            {
+                    Pair.class,
+                    InstanceConfig.class,
+                    Message.class,
+                    InetAddressAndPort.class,
+                    InvokableInstance.SerializableCallable.class,
+                    InvokableInstance.SerializableRunnable.class,
+                    InvokableInstance.SerializableConsumer.class,
+                    InvokableInstance.SerializableBiConsumer.class,
+                    InvokableInstance.SerializableFunction.class,
+                    InvokableInstance.SerializableBiFunction.class,
+                    InvokableInstance.SerializableTriFunction.class,
+                    InvokableInstance.InstanceFunction.class
+            };
+
+    private final int id; // for debug purposes
+    private final ClassLoader commonClassLoader;
+    private final Predicate<String> isCommonClassName;
+
+    InstanceClassLoader(int id, URL[] urls, Predicate<String> isCommonClassName, ClassLoader commonClassLoader)
+    {
+        super(urls, null);
+        this.id = id;
+        this.commonClassLoader = commonClassLoader;
+        this.isCommonClassName = isCommonClassName;
+    }
+
+    @Override
+    public Class<?> loadClass(String name) throws ClassNotFoundException
+    {
+        // Do not share:
+        //  * yaml, which  is a rare exception because it does mess with loading org.cassandra...Config class instances
+        //  * most of the rest of Cassandra classes (unless they were explicitly shared) g
+        if (name.startsWith("org.slf4j") ||
+                name.startsWith("ch.qos.logback") ||
+                name.startsWith("org.yaml") ||
+                (name.startsWith("org.apache.cassandra") && !isCommonClassName.test(name)))
+            return loadClassInternal(name);
+
+        return commonClassLoader.loadClass(name);
+    }
+
+    Class<?> loadClassInternal(String name) throws ClassNotFoundException
+    {
+        synchronized (getClassLoadingLock(name))
+        {
+            // First, check if the class has already been loaded
+            Class<?> c = findLoadedClass(name);
+
+            if (c == null)
+                c = findClass(name);
+
+            return c;
+        }
+    }
+
+    public static IntFunction<ClassLoader> createFactory(URLClassLoader contextClassLoader)
+    {
+        Set<String> commonClassNames = new HashSet<>();
+        for (Class<?> k : commonClasses)
+            commonClassNames.add(k.getName());
+
+        URL[] urls = contextClassLoader.getURLs();
+        return id -> new InstanceClassLoader(id, urls, commonClassNames::contains, contextClassLoader);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
new file mode 100644
index 0000000..49c2e1f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.UUID;
+
+public class InstanceConfig implements Serializable
+{
+    public final int num;
+    public final UUID hostId =java.util.UUID.randomUUID();
+    public final String partitioner = "org.apache.cassandra.dht.Murmur3Partitioner";
+    public final String broadcast_address;
+    public final String listen_address;
+    public final String broadcast_rpc_address;
+    public final String rpc_address;
+    public final String saved_caches_directory;
+    public final String[] data_file_directories;
+    public final String commitlog_directory;
+    public final String hints_directory;
+    public final String cdc_directory;
+    public final int concurrent_writes = 2;
+    public final int concurrent_counter_writes = 2;
+    public final int concurrent_materialized_view_writes = 2;
+    public final int concurrent_reads = 2;
+    public final int memtable_flush_writers = 1;
+    public final int concurrent_compactors = 1;
+    public final int memtable_heap_space_in_mb = 10;
+    public final String initial_token;
+
+    private InstanceConfig(int num,
+                           String broadcast_address,
+                           String listen_address,
+                           String broadcast_rpc_address,
+                           String rpc_address,
+                           String saved_caches_directory,
+                           String[] data_file_directories,
+                           String commitlog_directory,
+                           String hints_directory,
+                           String cdc_directory,
+                           String initial_token)
+    {
+        this.num = num;
+        this.broadcast_address = broadcast_address;
+        this.listen_address = listen_address;
+        this.broadcast_rpc_address = broadcast_rpc_address;
+        this.rpc_address = rpc_address;
+        this.saved_caches_directory = saved_caches_directory;
+        this.data_file_directories = data_file_directories;
+        this.commitlog_directory = commitlog_directory;
+        this.hints_directory = hints_directory;
+        this.cdc_directory = cdc_directory;
+        this.initial_token = initial_token;
+    }
+
+    public static InstanceConfig generate(int nodeNum, File root, String token)
+    {
+        return new InstanceConfig(nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  "127.0.0." + nodeNum,
+                                  String.format("%s/node%d/saved_caches", root, nodeNum),
+                                  new String[] { String.format("%s/node%d/data", root, nodeNum) },
+                                  String.format("%s/node%d/commitlog", root, nodeNum),
+                                  String.format("%s/node%d/hints", root, nodeNum),
+                                  String.format("%s/node%d/cdc", root, nodeNum),
+                                  token);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
new file mode 100644
index 0000000..1167748
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InstanceIDDefiner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import ch.qos.logback.core.PropertyDefinerBase;
+
+/**
+ * Used by logback to find/define property value, see logback-dtest.xml
+ */
+public class InstanceIDDefiner extends PropertyDefinerBase
+{
+    // Instantiated per classloader, set by Instance
+    public static int instanceId = -1;
+
+    public String getPropertyValue()
+    {
+        if (instanceId == -1)
+            return "<main>";
+        else
+            return "INSTANCE" + instanceId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
new file mode 100644
index 0000000..f646ae1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class InvokableInstance
+{
+    private final ClassLoader classLoader;
+    private final Method deserializeOnInstance;
+
+    public InvokableInstance(ClassLoader classLoader)
+    {
+        this.classLoader = classLoader;
+        try
+        {
+            this.deserializeOnInstance = classLoader.loadClass(InvokableInstance.class.getName()).getDeclaredMethod("deserializeOneObject", byte[].class);
+        }
+        catch (ClassNotFoundException | NoSuchMethodException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public interface SerializableCallable<T> extends Callable<T>, Serializable { public T call(); }
+    public <T> SerializableCallable<T> callsOnInstance(SerializableCallable<T> call) { return (SerializableCallable<T>) transferOneObject(call); }
+    public <T> T callOnInstance(SerializableCallable<T> call) { return callsOnInstance(call).call(); }
+
+    public interface SerializableRunnable extends Runnable, Serializable {}
+    public SerializableRunnable runsOnInstance(SerializableRunnable run) { return (SerializableRunnable) transferOneObject(run); }
+    public void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
+
+    public interface SerializableConsumer<T> extends Consumer<T>, Serializable {}
+    public <T> SerializableConsumer<T> acceptsOnInstance(SerializableConsumer<T> consumer) { return (SerializableConsumer<T>) transferOneObject(consumer); }
+
+    public interface SerializableBiConsumer<T1, T2> extends BiConsumer<T1, T2>, Serializable {}
+    public <T1, T2> SerializableBiConsumer<T1, T2> acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return (SerializableBiConsumer<T1, T2>) transferOneObject(consumer); }
+
+    public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
+    public <I, O> SerializableFunction<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return (SerializableFunction<I, O>) transferOneObject(f); }
+
+    public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
+    public <I1, I2, O> SerializableBiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return (SerializableBiFunction<I1, I2, O>) transferOneObject(f); }
+
+    public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable
+    {
+        O apply(I1 i1, I2 i2, I3 i3);
+    }
+    public <I1, I2, I3, O> SerializableTriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return (SerializableTriFunction<I1, I2, I3, O>) transferOneObject(f); }
+
+    public interface InstanceFunction<I, O> extends SerializableBiFunction<Instance, I, O> {}
+
+    // E must be a functional interface, and lambda must be implemented by a lambda function
+    public <E extends Serializable> E invokesOnInstance(E lambda)
+    {
+        return (E) transferOneObject(lambda);
+    }
+
+    public Object transferOneObject(Object object)
+    {
+        byte[] bytes = serializeOneObject(object);
+        try
+        {
+            Object onInstance = deserializeOnInstance.invoke(null, bytes);
+            if (onInstance.getClass().getClassLoader() != classLoader)
+                throw new IllegalStateException(onInstance + " seemingly from wrong class loader: " + onInstance.getClass().getClassLoader() + ", but expected " + classLoader);
+
+            return onInstance;
+        }
+        catch (IllegalAccessException | InvocationTargetException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private byte[] serializeOneObject(Object object)
+    {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(baos))
+        {
+            oos.writeObject(object);
+            oos.close();
+            return baos.toByteArray();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @SuppressWarnings("unused") // called through method invocation
+    public static Object deserializeOneObject(byte[] bytes)
+    {
+        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+             ObjectInputStream ois = new ObjectInputStream(bais);)
+        {
+            return ois.readObject();
+        }
+        catch (IOException | ClassNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/test/distributed/org/apache/cassandra/distributed/Message.java
----------------------------------------------------------------------
diff --git a/test/distributed/org/apache/cassandra/distributed/Message.java b/test/distributed/org/apache/cassandra/distributed/Message.java
new file mode 100644
index 0000000..b5492a2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/Message.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+// a container for simplifying the method signature for per-instance message handling/delivery
+public class Message
+{
+    public final int verb;
+    public final byte[] bytes;
+    public final int id;
+    public final int version;
+    public final InetAddressAndPort from;
+
+    public Message(int verb, byte[] bytes, int id, int version, InetAddressAndPort from)
+    {
+        this.verb = verb;
+        this.bytes = bytes;
+        this.id = id;
+        this.version = version;
+        this.from = from;
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/3] cassandra git commit: Introduce in-jvm distributed tests

Posted by if...@apache.org.
Introduce in-jvm distributed tests

Patch by Alex Petrov and Benedict Elliott Smith; reviewed by Benedict Elliott Smith and Dinesh Joshi for CASSANDRA-14821.

Co-authored-by: Benedict Elliott Smith <be...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f22fec92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f22fec92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f22fec92

Branch: refs/heads/trunk
Commit: f22fec927de7ac291266660c2f34de5b8cc1c695
Parents: 7877035
Author: Alex Petrov <ol...@gmail.com>
Authored: Fri Nov 16 19:41:58 2018 +0100
Committer: Alex Petrov <ol...@gmail.com>
Committed: Fri Nov 16 19:41:58 2018 +0100

----------------------------------------------------------------------
 .circleci/config.yml                            |  14 +-
 build.xml                                       |  10 +-
 ide/idea-iml-file.xml                           |   1 +
 .../org/apache/cassandra/auth/AuthCache.java    |  33 +-
 .../cassandra/batchlog/BatchlogManager.java     |  48 ++-
 .../concurrent/InfiniteLoopExecutor.java        |  83 ++++
 .../JMXEnabledThreadPoolExecutor.java           |  24 +-
 .../concurrent/ScheduledExecutors.java          |  15 +
 .../concurrent/SharedExecutorPool.java          |  15 +-
 .../cassandra/concurrent/StageManager.java      |  10 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |   6 +-
 .../cassandra/db/BlacklistedDirectories.java    |  17 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  48 ++-
 .../cassandra/db/HintedHandOffManager.java      |  15 +-
 .../cassandra/db/commitlog/CommitLog.java       |  15 +-
 .../db/compaction/CompactionManager.java        |  18 +-
 .../cassandra/diag/DiagnosticEventService.java  |  13 +-
 .../cassandra/diag/LastEventIdBroadcaster.java  |  16 +-
 .../apache/cassandra/gms/FailureDetector.java   |  14 +-
 src/java/org/apache/cassandra/gms/Gossiper.java |  14 +-
 .../apache/cassandra/hints/HintsService.java    |  15 +-
 .../cassandra/index/SecondaryIndexManager.java  |  10 +
 .../io/sstable/IndexSummaryManager.java         |  30 +-
 .../cassandra/io/util/DataInputBuffer.java      |   8 +-
 .../locator/DynamicEndpointSnitch.java          |  24 +-
 .../cassandra/locator/EndpointSnitchInfo.java   |  16 +-
 .../metrics/CassandraMetricsRegistry.java       |  30 +-
 .../cassandra/net/ForwardToContainer.java       |   3 +-
 .../org/apache/cassandra/net/MessageIn.java     |  12 +-
 .../apache/cassandra/net/MessagingService.java  |  28 +-
 .../net/async/ByteBufDataInputPlus.java         |   8 +
 .../cassandra/net/async/MessageInHandler.java   |  67 +++-
 .../net/async/MessageInHandlerPre40.java        |  50 ++-
 .../cassandra/net/async/NettyFactory.java       |  12 +-
 .../apache/cassandra/schema/SchemaEvent.java    |   5 +-
 .../cassandra/service/ActiveRepairService.java  |  15 +-
 .../apache/cassandra/service/CacheService.java  |  16 +-
 .../cassandra/service/CassandraDaemon.java      |  11 +-
 .../service/PendingRangeCalculatorService.java  |   9 +
 .../apache/cassandra/service/StorageProxy.java  |  15 +-
 .../cassandra/service/StorageService.java       |  16 +-
 .../apache/cassandra/utils/ByteBufferUtil.java  |  26 ++
 .../apache/cassandra/utils/MBeanWrapper.java    | 179 +++++++++
 .../org/apache/cassandra/utils/Mx4jTool.java    |   4 +-
 .../apache/cassandra/utils/concurrent/Ref.java  |  35 +-
 .../cassandra/utils/memory/BufferPool.java      |  43 +-
 .../utils/memory/MemtableCleanerThread.java     |  77 ++--
 .../cassandra/utils/memory/MemtablePool.java    |   9 +
 test/conf/logback-dtest.xml                     |  79 ++++
 .../cassandra/distributed/Coordinator.java      |  80 ++++
 .../DistributedReadWritePathTest.java           | 348 ++++++++++++++++
 .../distributed/DistributedTestBase.java        |  86 ++++
 .../apache/cassandra/distributed/Instance.java  | 399 +++++++++++++++++++
 .../distributed/InstanceClassLoader.java        | 101 +++++
 .../cassandra/distributed/InstanceConfig.java   |  87 ++++
 .../distributed/InstanceIDDefiner.java          |  38 ++
 .../distributed/InvokableInstance.java          | 133 +++++++
 .../apache/cassandra/distributed/Message.java   |  41 ++
 .../cassandra/distributed/MessageFilters.java   | 175 ++++++++
 .../apache/cassandra/distributed/RowUtil.java   |  47 +++
 .../cassandra/distributed/TestCluster.java      | 308 ++++++++++++++
 .../cassandra/net/async/NettyFactoryTest.java   |   2 +-
 .../async/StreamCompressionSerializerTest.java  |   4 -
 64 files changed, 2661 insertions(+), 477 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/.circleci/config.yml
----------------------------------------------------------------------
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 430354a..3b2b978 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -164,12 +164,20 @@ jobs:
 
             # get all of our unit test filenames
             set -eo pipefail && circleci tests glob "$HOME/cassandra/test/unit/**/*.java" > /tmp/all_java_unit_tests.txt
+            # append distributed tests
+            set -eo pipefail && circleci tests glob "$HOME/cassandra/test/distributed/**/*.java" > /tmp/all_java_distributed_tests.txt
 
             # split up the unit tests into groups based on the number of containers we have
             set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_unit_tests.txt > /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt
             set -eo pipefail && cat /tmp/java_tests_${CIRCLE_NODE_INDEX}.txt | cut -c 37-1000000 | grep "Test\.java$" > /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
             echo "** /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt"
             cat /tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+
+            set -eo pipefail && circleci tests split --split-by=timings --timings-type=filename --index=${CIRCLE_NODE_INDEX} --total=${CIRCLE_NODE_TOTAL} /tmp/all_java_distributed_tests.txt > /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt
+            set +eo pipefail && cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}.txt | cut -c 44-1000000 | grep "Test\.java$" > /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+            echo "** /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt"
+            cat /tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt
+
       - run:
          name: Run Unit Tests
          command: |
@@ -181,7 +189,11 @@ jobs:
 
             time mv ~/cassandra /tmp
             cd /tmp/cassandra
-            ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt
+            ant testclasslist -Dtest.classlistfile=/tmp/java_tests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=unit
+
+            if [ -s "/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt" ]; then
+              ant testclasslist -Dtest.classlistfile=/tmp/java_dtests_${CIRCLE_NODE_INDEX}_final.txt -Dtest.classlistprefix=distributed
+            fi
          no_output_timeout: 15m
       - store_test_results:
             path: /tmp/cassandra/build/test/output/

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 3d3014c..f24647e 100644
--- a/build.xml
+++ b/build.xml
@@ -56,12 +56,14 @@
     <property name="test.data" value="${test.dir}/data"/>
     <property name="test.name" value="*Test"/>
     <property name="test.classlistfile" value="testlist.txt"/>
+    <property name="test.classlistprefix" value="unit"/>
     <property name="benchmark.name" value=""/>
     <property name="test.methods" value=""/>
     <property name="test.unit.src" value="${test.dir}/unit"/>
     <property name="test.long.src" value="${test.dir}/long"/>
     <property name="test.burn.src" value="${test.dir}/burn"/>
     <property name="test.microbench.src" value="${test.dir}/microbench"/>
+    <property name="test.distributed.src" value="${test.dir}/distributed"/>
     <property name="dist.dir" value="${build.dir}/dist"/>
     <property name="tmp.dir" value="${java.io.tmpdir}"/>
 
@@ -103,6 +105,7 @@
     <property name="test.timeout" value="240000" />
     <property name="test.long.timeout" value="600000" />
     <property name="test.burn.timeout" value="60000000" />
+    <property name="test.distributed.timeout" value="600000" />
 
     <!-- default for cql tests. Can be override by -Dcassandra.test.use_prepared=false -->
     <property name="cassandra.test.use_prepared" value="true" />
@@ -1253,6 +1256,7 @@
      <src path="${test.long.src}"/>
      <src path="${test.burn.src}"/>
      <src path="${test.microbench.src}"/>
+     <src path="${test.distributed.src}"/>
     </javac>
 
     <!-- Non-java resources needed by the test suite -->
@@ -1364,7 +1368,7 @@
     <attribute name="test.file.list"/>
     <attribute name="testlist.offset"/>
     <sequential>
-      <testmacrohelper inputdir="${test.unit.src}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
+      <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" filelist="@{test.file.list}" poffset="@{testlist.offset}" exclude="**/*.java" timeout="${test.timeout}">
         <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
         <jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/>
         <jvmarg value="-Dcassandra.ring_delay_ms=1000"/>
@@ -1468,6 +1472,7 @@
     </concat>
     <path id="all-test-classes-path">
       <fileset dir="${test.unit.src}" includes="**/${test.name}.java" />
+      <fileset dir="${test.distributed.src}" includes="**/${test.name}.java" />
     </path>
     <property name="all-test-classes" refid="all-test-classes-path"/>
     <testparallel testdelegate="testlist-compression" />
@@ -1844,7 +1849,7 @@
   e.g. org/apache/cassandra/hints/HintMessageTest.java -->
   <target name="testclasslist" depends="build-test" description="Parallel-run tests given in file -Dtest.classlistfile (one-class-per-line, e.g. org/apache/cassandra/db/SomeTest.java)">
     <path id="all-test-classes-path">
-      <fileset dir="${test.unit.src}" includesfile="${test.classlistfile}"/>
+      <fileset dir="${test.dir}/${test.classlistprefix}" includesfile="${test.classlistfile}"/>
     </path>
     <property name="all-test-classes" refid="all-test-classes-path"/>
     <testparallel testdelegate="testlist"/>
@@ -1939,6 +1944,7 @@
   <classpathentry kind="src" path="conf" including="hotspot_compiler"/>
   <classpathentry kind="src" output="build/test/classes" path="test/unit"/>
   <classpathentry kind="src" output="build/test/classes" path="test/long"/>
+  <classpathentry kind="src" output="build/test/classes" path="test/distributed"/>
   <classpathentry kind="src" output="build/test/classes" path="test/resources" />
   <classpathentry kind="src" path="tools/stress/src"/>
   <classpathentry kind="src" path="tools/fqltool/src"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/ide/idea-iml-file.xml
----------------------------------------------------------------------
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index b83abfa..0045ae6 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -35,6 +35,7 @@
             <sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" />
+            <sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" />
             <sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" />
             <excludeFolder url="file://$MODULE_DIR$/.idea" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/auth/AuthCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthCache.java b/src/java/org/apache/cassandra/auth/AuthCache.java
index 3adf914..4bf15c1 100644
--- a/src/java/org/apache/cassandra/auth/AuthCache.java
+++ b/src/java/org/apache/cassandra/auth/AuthCache.java
@@ -18,22 +18,19 @@
 
 package org.apache.cassandra.auth;
 
-import java.lang.management.ManagementFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.IntConsumer;
 import java.util.function.IntSupplier;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 
 import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -97,33 +94,17 @@ public class AuthCache<K, V> implements AuthCacheMBean
     protected void init()
     {
         cache = initCache(null);
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, getObjectName());
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, getObjectName());
     }
 
     protected void unregisterMBean()
     {
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.unregisterMBean(getObjectName());
-        }
-        catch (Exception e)
-        {
-            logger.warn("Error unregistering {} cache mbean", name, e);
-        }
+        MBeanWrapper.instance.unregisterMBean(getObjectName(), MBeanWrapper.OnException.LOG);
     }
 
-    protected ObjectName getObjectName() throws MalformedObjectNameException
+    protected String getObjectName()
     {
-        return new ObjectName(MBEAN_NAME_BASE + name);
+        return MBEAN_NAME_BASE + name;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 91129ed..b2b851d 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -18,28 +18,37 @@
 package org.apache.cassandra.batchlog;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.RateLimiter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -51,15 +60,20 @@ import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static com.google.common.collect.Iterables.transform;
@@ -93,15 +107,7 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public void start()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 
         batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
                                              StorageService.RING_DELAY,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
new file mode 100644
index 0000000..1b8173e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.concurrent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class InfiniteLoopExecutor
+{
+    private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
+
+    public interface InterruptibleRunnable
+    {
+        void run() throws InterruptedException;
+    }
+
+    private final Thread thread;
+    private final InterruptibleRunnable runnable;
+    private volatile boolean isShutdown = false;
+
+    public InfiniteLoopExecutor(String name, InterruptibleRunnable runnable)
+    {
+        this.runnable = runnable;
+        this.thread = new Thread(this::loop, name);
+        this.thread.setDaemon(true);
+    }
+
+    private void loop()
+    {
+        while (!isShutdown)
+        {
+            try
+            {
+                runnable.run();
+            }
+            catch (InterruptedException ie)
+            {
+                if (isShutdown)
+                    return;
+                logger.error("Interrupted while executing {}, but not shutdown; continuing with loop", runnable, ie);
+            }
+            catch (Throwable t)
+            {
+                logger.error("Exception thrown by runnable, continuing with loop", t);
+            }
+        }
+    }
+
+    public InfiniteLoopExecutor start()
+    {
+        thread.start();
+        return this;
+    }
+
+    public void shutdown()
+    {
+        isShutdown = true;
+        thread.interrupt();
+    }
+
+    public void awaitTermination(long time, TimeUnit unit) throws InterruptedException
+    {
+        thread.join(unit.toMillis(time));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index 278b399..0e61de9 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -17,16 +17,14 @@
  */
 package org.apache.cassandra.concurrent;
 
-import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.metrics.ThreadPoolMetrics;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * This is a wrapper class for the <i>ScheduledThreadPoolExecutor</i>. It provides an implementation
@@ -81,17 +79,8 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
         super.prestartAllCoreThreads();
         metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id).register();
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
-
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, mbeanName);
     }
 
     public JMXEnabledThreadPoolExecutor(int corePoolSize,
@@ -114,14 +103,7 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor i
 
     private void unregisterMBean()
     {
-        try
-        {
-            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.unregisterMBean(mbeanName);
 
         // release metrics
         metrics.release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 22dc769..e51e4c2 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -17,6 +17,11 @@
  */
 package org.apache.cassandra.concurrent;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Centralized location for shared executors
  */
@@ -41,4 +46,14 @@ public class ScheduledExecutors
      * This executor is used for tasks that do not need to be waited for on shutdown/drain.
      */
     public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
+
+    @VisibleForTesting
+    public static void shutdownAndWait() throws InterruptedException
+    {
+        ExecutorService[] executors = new ExecutorService[] { scheduledFastTasks, scheduledTasks, nonPeriodicTasks, optionalTasks };
+        for (ExecutorService executor : executors)
+            executor.shutdown();
+        for (ExecutorService executor : executors)
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 3b0600f..5352ad7 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -21,9 +21,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import static org.apache.cassandra.concurrent.SEPWorker.Work;
 
 /**
@@ -61,7 +64,7 @@ public class SharedExecutorPool
     final AtomicLong workerId = new AtomicLong();
 
     // the collection of executors serviced by this pool; periodically ordered by traffic volume
-    final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
+    public final List<SEPExecutor> executors = new CopyOnWriteArrayList<>();
 
     // the number of workers currently in a spinning state
     final AtomicInteger spinningCount = new AtomicInteger();
@@ -109,4 +112,14 @@ public class SharedExecutorPool
         executors.add(executor);
         return executor;
     }
+
+    @VisibleForTesting
+    public static void shutdownSharedPool() throws InterruptedException
+    {
+        for (SEPExecutor executor : SHARED.executors)
+            executor.shutdown();
+
+        for (SEPExecutor executor : SHARED.executors)
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index c102042..608a005 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
 import java.util.EnumMap;
 import java.util.concurrent.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,6 +113,15 @@ public class StageManager
         }
     }
 
+    @VisibleForTesting
+    public static void shutdownAndWait() throws InterruptedException
+    {
+        for (Stage stage : Stage.values())
+            StageManager.stages.get(stage).shutdown();
+        for (Stage stage : Stage.values())
+            StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     /**
      * The executor used for tracing.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bc1e5a2..2f5f49f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -142,6 +143,11 @@ public class DatabaseDescriptor
 
     public static void daemonInitialization() throws ConfigurationException
     {
+        daemonInitialization(DatabaseDescriptor::loadConfig);
+    }
+
+    public static void daemonInitialization(Supplier<Config> config) throws ConfigurationException
+    {
         if (toolInitialized)
             throw new AssertionError("toolInitialization() already called");
         if (clientInitialized)
@@ -152,7 +158,7 @@ public class DatabaseDescriptor
             return;
         daemonInitialized = true;
 
-        setConfig(loadConfig());
+        setConfig(config.get());
         applyAll();
         AuthConfig.applyAuth();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 45db947..b8ec648 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -162,7 +162,8 @@ public class QueryProcessor implements QueryHandler
             SystemKeyspace.resetPreparedStatements();
     }
 
-    private static QueryState internalQueryState()
+    @VisibleForTesting
+    public static QueryState internalQueryState()
     {
         return new QueryState(InternalStateInstance.INSTANCE.clientState);
     }
@@ -265,7 +266,8 @@ public class QueryProcessor implements QueryHandler
             return null;
     }
 
-    private static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
+    @VisibleForTesting
+    public static QueryOptions makeInternalOptions(CQLStatement prepared, Object[] values)
     {
         return makeInternalOptions(prepared, values, ConsistencyLevel.ONE);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f090013..cff9a78 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -21,18 +21,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
 {
@@ -48,17 +45,7 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
     private BlacklistedDirectories()
     {
         // Register this instance with JMX
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            JVMStabilityInspector.inspectThrowable(e);
-            logger.error("error registering MBean {}", MBEAN_NAME, e);
-            //Allow the server to start even if the bean can't be registered
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME, MBeanWrapper.OnException.LOG);
     }
 
     public Set<File> getUnreadableDirectories()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 877a3c5..c5149cf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
@@ -41,7 +40,6 @@ import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.Snapshot;
 import org.apache.cassandra.cache.*;
 import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
@@ -72,7 +70,6 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.metrics.Sampler;
 import org.apache.cassandra.metrics.Sampler.Sample;
 import org.apache.cassandra.metrics.Sampler.SamplerType;
@@ -220,12 +217,33 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private volatile boolean neverPurgeTombstones = false;
 
+    public static void shutdownFlushExecutor() throws InterruptedException
+    {
+        flushExecutor.shutdown();
+        flushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
+
     public static void shutdownPostFlushExecutor() throws InterruptedException
     {
         postFlushExecutor.shutdown();
         postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
     }
 
+    public static void shutdownReclaimExecutor() throws InterruptedException
+    {
+        reclaimExecutor.shutdown();
+        reclaimExecutor.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
+    public static void shutdownPerDiskFlushExecutors() throws InterruptedException
+    {
+        for (ExecutorService executorService : perDiskflushExecutors)
+            executorService.shutdown();
+        for (ExecutorService executorService : perDiskflushExecutors)
+            executorService.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -425,19 +443,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s",
                                          isIndex() ? "IndexColumnFamilies" : "ColumnFamilies",
                                          keyspace.getName(), name);
-            try
-            {
-                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
-                for (ObjectName objectName : objectNames)
-                {
-                    mbs.registerMBean(this, objectName);
-                }
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
+
+            String[] objectNames = {mbeanName, oldMBeanName};
+            for (String objectName : objectNames)
+                MBeanWrapper.instance.registerMBean(this, objectName);
         }
         else
         {
@@ -548,14 +557,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         data.removeUnreadableSSTables(directory);
     }
 
-    void unregisterMBean() throws MalformedObjectNameException, InstanceNotFoundException, MBeanRegistrationException
+    void unregisterMBean() throws MalformedObjectNameException
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)};
         for (ObjectName objectName : objectNames)
         {
-            if (mbs.isRegistered(objectName))
-                mbs.unregisterMBean(objectName);
+            if (MBeanWrapper.instance.isRegistered(objectName))
+                MBeanWrapper.instance.unregisterMBean(objectName);
         }
 
         // unregister metrics

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 3279acf..e26f658 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.db;
 
-import java.lang.management.ManagementFactory;
 import java.util.List;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * A proxy class that implement the deprecated legacy HintedHandoffManagerMBean interface.
@@ -44,15 +41,7 @@ public final class HintedHandOffManager implements HintedHandOffManagerMBean
 
     public void registerMBean()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     public void deleteHintsForEndpoint(String host)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 6537adc..9d2a369 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -18,19 +18,15 @@
 package org.apache.cassandra.db.commitlog;
 
 import java.io.*;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.zip.CRC32;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.*;
@@ -49,6 +45,7 @@ import org.apache.cassandra.security.EncryptionContext;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.CommitLogSegmentFileComparator;
@@ -82,15 +79,7 @@ public class CommitLog implements CommitLogMBean
     {
         CommitLog log = new CommitLog(CommitLogArchiver.construct());
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(log, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(log, "org.apache.cassandra.db:type=Commitlog");
         return log.start();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e56ed60..bc5a883 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -19,14 +19,11 @@ package org.apache.cassandra.db.compaction;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
@@ -36,7 +33,6 @@ import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
 import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,15 +115,8 @@ public class CompactionManager implements CompactionManagerMBean
     static
     {
         instance = new CompactionManager();
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(instance, new ObjectName(MBEAN_OBJECT_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+
+        MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME);
     }
 
     private final CompactionExecutor executor = new CompactionExecutor();
@@ -232,6 +221,7 @@ public class CompactionManager implements CompactionManagerMBean
         executor.shutdown();
         validationExecutor.shutdown();
         viewBuildExecutor.shutdown();
+        cacheCleanupExecutor.shutdown();
 
         // interrupt compactions and validations
         for (Holder compactionHolder : CompactionMetrics.getCompactions())
@@ -242,7 +232,7 @@ public class CompactionManager implements CompactionManagerMBean
         // wait for tasks to terminate
         // compaction tasks are interrupted above, so it shuold be fairy quick
         // until not interrupted tasks to complete.
-        for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor))
+        for (ExecutorService exec : Arrays.asList(executor, validationExecutor, viewBuildExecutor, cacheCleanupExecutor))
         {
             try
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
index 3f3de7c..5953a1d 100644
--- a/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
+++ b/src/java/org/apache/cassandra/diag/DiagnosticEventService.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * Service for publishing and consuming {@link DiagnosticEvent}s.
@@ -62,17 +63,7 @@ public final class DiagnosticEventService implements DiagnosticEventServiceMBean
 
     private DiagnosticEventService()
     {
-
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=DiagnosticEventService");
-            mbs.registerMBean(this, jmxObjectName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this,"org.apache.cassandra.diag:type=DiagnosticEventService");
 
         // register broadcasters for JMX events
         DiagnosticEventPersistence.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
index 9fe5c48..8e991e6 100644
--- a/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
+++ b/src/java/org/apache/cassandra/diag/LastEventIdBroadcaster.java
@@ -18,21 +18,19 @@
 
 package org.apache.cassandra.diag;
 
-import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import javax.management.MBeanServer;
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
 
 /**
@@ -61,16 +59,8 @@ final class LastEventIdBroadcaster extends NotificationBroadcasterSupport implem
         super(JMXBroadcastExecutor.executor);
 
         summary.put("last_updated_at", 0L);
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            ObjectName jmxObjectName = new ObjectName("org.apache.cassandra.diag:type=LastEventIdBroadcaster");
-            mbs.registerMBean(this, jmxObjectName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+
+        MBeanWrapper.instance.registerMBean(this, "org.apache.cassandra.diag:type=LastEventIdBroadcaster");
     }
 
     public static LastEventIdBroadcaster instance()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index d7f73ab..4a16f2a 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -21,15 +21,12 @@ import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.nio.file.Path;
 import java.io.*;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.*;
 
@@ -42,6 +39,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * This FailureDetector is an implementation of the paper titled
@@ -88,15 +86,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
     public FailureDetector()
     {
         // Register this instance with JMX
-        try
-        {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     private static long getInitialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index aedcb04..b789fe7 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.gms;
 
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.Map.Entry;
@@ -28,8 +27,6 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Suppliers;
@@ -41,6 +38,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -248,15 +246,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         // Register this instance with JMX
         if (registerJmx)
         {
-            try
-            {
-                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
+            MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 73840d3..1a352c2 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.hints;
 
 import java.io.File;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,9 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.db.Keyspace;
@@ -51,6 +47,7 @@ import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 import static com.google.common.collect.Iterables.transform;
 
@@ -138,15 +135,7 @@ public final class HintsService implements HintsServiceMBean
 
     public void registerMBean()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index c9a7cc6..ec54a65 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -1485,4 +1485,14 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
                                      false);
         }
     }
+
+    @VisibleForTesting
+    public static void shutdownExecutors() throws InterruptedException
+    {
+        ExecutorService[] executors = new ExecutorService[]{ asyncExecutor, blockingExecutor };
+        for (ExecutorService executor : executors)
+            executor.shutdown();
+        for (ExecutorService executor : executors)
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index b8d236a..3630c2a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -18,29 +18,32 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -65,16 +68,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
     static
     {
         instance = new IndexSummaryManager();
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
-        try
-        {
-            mbs.registerMBean(instance, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
     }
 
     private IndexSummaryManager()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
index a68dcc2..9df9861 100644
--- a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
@@ -57,14 +56,17 @@ public class DataInputBuffer extends RebufferingInputStream
     }
 
     @Override
-    protected void reBuffer() throws IOException
+    protected void reBuffer()
     {
         //nope, we don't rebuffer, we are done!
     }
 
     @Override
-    public int available() throws IOException
+    public int available()
     {
         return buffer.remaining();
     }
+
+    @Override
+    public void close() {}
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index d35f1fb..ddc8fba 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.locator;
 
-import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -28,8 +27,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.codahale.metrics.Snapshot;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -41,6 +38,7 @@ import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
@@ -141,15 +139,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
 
     private void registerMBean()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, mbeanName);
     }
 
     public void close()
@@ -157,15 +147,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         updateSchedular.cancel(false);
         resetSchedular.cancel(false);
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.unregisterMBean(new ObjectName(mbeanName));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.unregisterMBean(mbeanName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
index da90a79..d836cd1 100644
--- a/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
+++ b/src/java/org/apache/cassandra/locator/EndpointSnitchInfo.java
@@ -17,28 +17,16 @@
  */
 package org.apache.cassandra.locator;
 
-
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 public class EndpointSnitchInfo implements EndpointSnitchInfoMBean
 {
     public static void create()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(new EndpointSnitchInfo(), new ObjectName("org.apache.cassandra.db:type=EndpointSnitchInfo"));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(new EndpointSnitchInfo(), "org.apache.cassandra.db:type=EndpointSnitchInfo");
     }
 
     public String getDatacenter(String host) throws UnknownHostException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 43d6609..74c3367 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.lang.management.ManagementFactory;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,14 +25,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-
-import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.*;
 import com.google.common.annotations.VisibleForTesting;
 
+import com.codahale.metrics.*;
+import org.apache.cassandra.utils.MBeanWrapper;
+
 /**
  * Makes integrating 3.0 metrics API with 2.0.
  * <p>
@@ -45,7 +44,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
     public static final CassandraMetricsRegistry Metrics = new CassandraMetricsRegistry();
     private final Map<String, ThreadPoolMetrics> threadPoolMetrics = new ConcurrentHashMap<>();
 
-    private final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+    private final MBeanWrapper mBeanServer = MBeanWrapper.instance;
 
     private CassandraMetricsRegistry()
     {
@@ -159,11 +158,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
     {
         boolean removed = remove(name.getMetricName());
 
-        try
-        {
-            mBeanServer.unregisterMBean(name.getMBeanName());
-        } catch (Exception ignore) {}
-
+        mBeanServer.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
         return removed;
     }
 
@@ -194,13 +189,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
         else
             throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
 
-        try
-        {
-            mBeanServer.registerMBean(mbean, name);
-        }
-        catch (Exception ignored)
-        {
-        }
+        if (!mBeanServer.isRegistered(name))
+            mBeanServer.registerMBean(mbean, name, MBeanWrapper.OnException.LOG);
     }
 
     private void registerAlias(MetricName existingName, MetricName aliasName)
@@ -213,10 +203,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
 
     private void removeAlias(MetricName name)
     {
-        try
-        {
-            mBeanServer.unregisterMBean(name.getMBeanName());
-        } catch (Exception ignore) {}
+        if (mBeanServer.isRegistered(name.getMBeanName()))
+            MBeanWrapper.instance.unregisterMBean(name.getMBeanName(), MBeanWrapper.OnException.IGNORE);
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/ForwardToContainer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ForwardToContainer.java b/src/java/org/apache/cassandra/net/ForwardToContainer.java
index ac9e725..b22eed6 100644
--- a/src/java/org/apache/cassandra/net/ForwardToContainer.java
+++ b/src/java/org/apache/cassandra/net/ForwardToContainer.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.net;
 
+import java.io.Serializable;
 import java.util.Collection;
 
 import com.google.common.base.Preconditions;
@@ -28,7 +29,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
  * Contains forward to information until it can be serialized as part of a message using a version
  * specific serialization
  */
-public class ForwardToContainer
+public class ForwardToContainer implements Serializable
 {
     public final Collection<InetAddressAndPort> targets;
     public final int[] messageIds;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 1cd39f3..c8f4bfc 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -49,12 +49,12 @@ public class MessageIn<T>
     public final int version;
     public final long constructionTime;
 
-    private MessageIn(InetAddressAndPort from,
-                      T payload,
-                      Map<ParameterType, Object> parameters,
-                      Verb verb,
-                      int version,
-                      long constructionTime)
+    public MessageIn(InetAddressAndPort from,
+                     T payload,
+                     Map<ParameterType, Object> parameters,
+                     Verb verb,
+                     int version,
+                     long constructionTime)
     {
         this.from = from;
         this.payload = payload;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c6e8496..761e210 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.net;
 
 import java.io.IOError;
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,8 +34,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
@@ -112,6 +109,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.BooleanSerializer;
 import org.apache.cassandra.utils.ExpiringMap;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.NativeLibrary;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.StatusLogger;
@@ -464,6 +462,20 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
+    public static IVersionedSerializer<?> getVerbSerializer(Verb verb, int id)
+    {
+        IVersionedSerializer serializer = verbSerializers.get(verb);
+        if (serializer instanceof MessagingService.CallbackDeterminedSerializer)
+        {
+            CallbackInfo callback = MessagingService.instance().getRegisteredCallback(id);
+            if (callback == null)
+                return null;
+
+            serializer = callback.serializer;
+        }
+        return serializer;
+    }
+
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
@@ -618,15 +630,7 @@ public final class MessagingService implements MessagingServiceMBean
 
         if (!testOnly)
         {
-            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            try
-            {
-                mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
+            MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
index 23e532c..e0be715 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
@@ -22,6 +22,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import org.apache.cassandra.io.util.DataInputPlus;
 
+import java.io.IOException;
+
 public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInputPlus
 {
     /**
@@ -40,4 +42,10 @@ public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInpu
     {
         return buf;
     }
+
+    @Override
+    public String readUTF() throws IOException
+    {
+        return DataInputStreamPlus.readUTF(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandler.java b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
index 0a194d4..dafa993 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandler.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandler.java
@@ -18,15 +18,17 @@
 
 package org.apache.cassandra.net.async;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
 
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +104,7 @@ public class MessageInHandler extends BaseMessageInHandler
                     {
                         if (in.readableBytes() < messageHeader.parameterLength)
                             return;
-                        readParameters(in, inputPlus, messageHeader.parameterLength, messageHeader.parameters);
+                        readParameters(in, inputPlus, messagingVersion, messageHeader.parameterLength, messageHeader.parameters);
                     }
                     state = State.READ_PAYLOAD_SIZE;
                     // fall-through
@@ -134,17 +136,17 @@ public class MessageInHandler extends BaseMessageInHandler
         }
     }
 
-    private void readParameters(ByteBuf in, ByteBufDataInputPlus inputPlus, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
+    private static void readParameters(ByteBuf buf, DataInputPlus in, int messagingVersion, int parameterLength, Map<ParameterType, Object> parameters) throws IOException
     {
         // makes the assumption we have all the bytes required to read the headers
-        final int endIndex = in.readerIndex() + parameterLength;
-        while (in.readerIndex() < endIndex)
+        final int endIndex = buf.readerIndex() + parameterLength;
+        while (buf.readerIndex() < endIndex)
         {
-            String key = DataInputStream.readUTF(inputPlus);
+            String key = in.readUTF();
             ParameterType parameterType = ParameterType.byName.get(key);
-            long valueLength =  VIntCoding.readUnsignedVInt(in);
+            long valueLength =  in.readUnsignedVInt();
             byte[] value = new byte[Ints.checkedCast(valueLength)];
-            in.readBytes(value);
+            in.readFully(value);
             try (DataInputBuffer buffer = new DataInputBuffer(value))
             {
                 parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -152,6 +154,55 @@ public class MessageInHandler extends BaseMessageInHandler
         }
     }
 
+    private static void readParameters(BooleanSupplier isDone, DataInputPlus in, int messagingVersion, Map<ParameterType, Object> parameters) throws IOException
+    {
+        // makes the assumption we have all the bytes required to read the headers
+        while (!isDone.getAsBoolean())
+        {
+            String key = in.readUTF();
+            ParameterType parameterType = ParameterType.byName.get(key);
+            in.readUnsignedVInt();
+            parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+        }
+    }
+
+    public static MessageIn<?> deserialize(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+    {
+        if (version >= MessagingService.VERSION_40)
+            return deserialize40(in, id, version, from);
+        else
+            return MessageInHandlerPre40.deserializePre40(in, id, version, from);
+    }
+
+    private static MessageIn<?> deserialize40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+    {
+        MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+        Map<ParameterType, Object> parameters = Collections.emptyMap();
+        int parameterLength = (int) in.readUnsignedVInt();
+        if (parameterLength != 0)
+        {
+            parameters = new EnumMap<>(ParameterType.class);
+            byte[] bytes = new byte[parameterLength];
+            in.readFully(bytes);
+            try (DataInputBuffer buffer = new DataInputBuffer(bytes))
+            {
+                readParameters(() -> buffer.available() == 0, buffer, version, parameters);
+            }
+        }
+
+        Object payload = null;
+        int payloadSize = (int) in.readUnsignedVInt();
+        if (payloadSize > 0)
+        {
+            IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+            if (serializer == null) in.skipBytesFully(payloadSize);
+            else payload = serializer.deserialize(in, version);
+        }
+
+        return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+    }
+
     @Override
     MessageHeader getMessageHeader()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
index f5b6fc4..6eeeea7 100644
--- a/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
+++ b/src/java/org/apache/cassandra/net/async/MessageInHandlerPre40.java
@@ -25,8 +25,11 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -155,10 +158,10 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
             if (!canReadNextParam(in))
                 return false;
 
-            String key = DataInputStream.readUTF(inputPlus);
+            String key = inputPlus.readUTF();
             ParameterType parameterType = ParameterType.byName.get(key);
-            byte[] value = new byte[in.readInt()];
-            in.readBytes(value);
+            byte[] value = new byte[inputPlus.readInt()];
+            inputPlus.readFully(value);
             try (DataInputBuffer buffer = new DataInputBuffer(value))
             {
                 parameters.put(parameterType, parameterType.serializer.deserialize(buffer, messagingVersion));
@@ -168,6 +171,47 @@ public class MessageInHandlerPre40 extends BaseMessageInHandler
         return true;
     }
 
+    private static boolean readParameters(DataInputPlus in, int messagingVersion, int parameterCount, Map<ParameterType, Object> parameters) throws IOException
+    {
+        // makes the assumption that map.size() is a constant time function (HashMap.size() is)
+        while (parameters.size() < parameterCount)
+        {
+            String key = in.readUTF();
+            ParameterType parameterType = ParameterType.byName.get(key);
+            in.readInt();
+            parameters.put(parameterType, parameterType.serializer.deserialize(in, messagingVersion));
+        }
+
+        return true;
+    }
+
+    static MessageIn<?> deserializePre40(DataInputPlus in, int id, int version, InetAddressAndPort from) throws IOException
+    {
+        assert from.equals(CompactEndpointSerializationHelper.instance.deserialize(in, version));
+        MessagingService.Verb verb = MessagingService.Verb.fromId(in.readInt());
+
+        Map<ParameterType, Object> parameters = Collections.emptyMap();
+        int parameterCount = in.readInt();
+        if (parameterCount != 0)
+        {
+            parameters = new EnumMap<>(ParameterType.class);
+            readParameters(in, version, parameterCount, parameters);
+        }
+
+        Object payload = null;
+        int payloadSize = in.readInt();
+        if (payloadSize > 0)
+        {
+            IVersionedSerializer serializer = MessagingService.getVerbSerializer(verb, id);
+            if (serializer == null) in.skipBytesFully(payloadSize);
+            else payload = serializer.deserialize(in, version);
+        }
+
+        return new MessageIn<>(from, payload, parameters, verb, version, System.nanoTime());
+    }
+
+
+
     /**
      * Determine if we can read the next parameter from the {@link ByteBuf}. This method will *always* set the {@code in}
      * readIndex back to where it was when this method was invoked.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 989e33c..2366722 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -2,6 +2,7 @@ package org.apache.cassandra.net.async;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.Checksum;
 
 import javax.annotation.Nullable;
@@ -384,12 +385,13 @@ public final class NettyFactory
         }
     }
 
-    public void close()
+    public void close() throws InterruptedException
     {
-        acceptGroup.shutdownGracefully();
-        outboundGroup.shutdownGracefully();
-        inboundGroup.shutdownGracefully();
-        streamingGroup.shutdownGracefully();
+        EventLoopGroup[] groups = new EventLoopGroup[] { acceptGroup, outboundGroup, inboundGroup, streamingGroup };
+        for (EventLoopGroup group : groups)
+            group.shutdownGracefully();
+        for (EventLoopGroup group : groups)
+            group.awaitTermination(60, TimeUnit.SECONDS);
     }
 
     static Lz4FrameEncoder createLz4Encoder(int protocolVersion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/schema/SchemaEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaEvent.java b/src/java/org/apache/cassandra/schema/SchemaEvent.java
index e26cee5..00c8136 100644
--- a/src/java/org/apache/cassandra/schema/SchemaEvent.java
+++ b/src/java/org/apache/cassandra/schema/SchemaEvent.java
@@ -29,13 +29,14 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapDifference;
 
 import org.apache.cassandra.diag.DiagnosticEvent;
 import org.apache.cassandra.utils.Pair;
 
-final class SchemaEvent extends DiagnosticEvent
+public final class SchemaEvent extends DiagnosticEvent
 {
     private final SchemaEventType type;
 
@@ -62,7 +63,7 @@ final class SchemaEvent extends DiagnosticEvent
     @Nullable
     private final MapDifference<String,TableMetadata> indexesDiff;
 
-    enum SchemaEventType
+    public enum SchemaEventType
     {
         KS_METADATA_LOADED,
         KS_METADATA_RELOADED,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b32f67e..1a54e75 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,15 +18,11 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -78,6 +74,7 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -169,15 +166,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              .maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000))
                                              .build();
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
     }
 
     public void start()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 479470c..5eeaf20 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -27,9 +26,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
 import com.google.common.util.concurrent.Futures;
 
 import org.slf4j.Logger;
@@ -53,6 +49,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.Pair;
 
 public class CacheService implements CacheServiceMBean
@@ -88,16 +85,7 @@ public class CacheService implements CacheServiceMBean
 
     private CacheService()
     {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
+        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 
         keyCache = initKeyCache();
         rowCache = initRowCache();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f0b2dc1..592419a 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -569,16 +569,7 @@ public class CassandraDaemon
         {
             applyConfig();
 
-            try
-            {
-                MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-                mbs.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), new ObjectName(MBEAN_NAME));
-            }
-            catch (Exception e)
-            {
-                logger.error("error registering MBean {}", MBEAN_NAME, e);
-                //Allow the server to start even if the bean can't be registered
-            }
+            MBeanWrapper.instance.registerMBean(new StandardMBean(new NativeAccess(), NativeAccessMBean.class), MBEAN_NAME, MBeanWrapper.OnException.LOG);
 
             if (FBUtilities.isWindows)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f22fec92/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 7b6bd58..a3f6b52 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class PendingRangeCalculatorService
 {
     public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
@@ -117,4 +119,11 @@ public class PendingRangeCalculatorService
     {
         StorageService.instance.getTokenMetadata().calculatePendingRanges(strategy, keyspaceName);
     }
+
+    @VisibleForTesting
+    public void shutdownExecutor() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org