You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/10/07 15:30:29 UTC

[cassandra] branch trunk updated (5b82447 -> 0c444a7)

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 5b82447  [CASSANDRA-17013] CEP-10 Phase 1: in-jvm-dtest-api changes and version bump
     new ce2a0a2  [CASSANDRA-16932] CEP-10 Phase 2: Minor Gossip Fixes
     new fe9cff6  [CASSANDRA-16930] CEP-10 Phase 2: Improved Configuration For Controlling Determinism
     new 0c444a7  [CASSANDRA-16931] CEP-10 Phase 2: Improve DTest @Shared Annotation Functionality

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/cassandra/auth/AuthKeyspace.java    |   5 +-
 .../cassandra/concurrent/ExecutorBuilder.java      |   4 +
 .../concurrent/ExecutorBuilderFactory.java         |   6 +
 .../cassandra/concurrent/ExecutorFactory.java      |   4 +
 .../apache/cassandra/concurrent/ExecutorPlus.java  |   5 +
 .../apache/cassandra/concurrent/Interruptible.java |   5 +
 .../concurrent/LocalAwareExecutorPlus.java         |   5 +
 .../LocalAwareSequentialExecutorPlus.java          |   5 +
 .../cassandra/concurrent/NamedThreadFactory.java   |   1 +
 .../cassandra/concurrent/ResizableThreadPool.java  |   5 +
 .../concurrent/ScheduledExecutorPlus.java          |   5 +
 .../concurrent/SequentialExecutorPlus.java         |   6 +
 .../apache/cassandra/concurrent/Shutdownable.java  |   5 +
 .../concurrent/SingleThreadExecutorPlus.java       |   6 +
 .../cassandra/concurrent/SyncFutureTask.java       |  17 +-
 .../apache/cassandra/concurrent/TaskFactory.java   |   4 +
 .../concurrent/ThreadPoolExecutorBuilder.java      |   2 +-
 .../config/CassandraRelevantProperties.java        |  79 ++++++-
 src/java/org/apache/cassandra/config/Config.java   |   8 +
 .../cassandra/config/DatabaseDescriptor.java       |  36 ++++
 .../cassandra/config/ParameterizedClass.java       |   5 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   6 +
 src/java/org/apache/cassandra/db/Memtable.java     |  22 +-
 .../db/commitlog/BatchCommitLogService.java        |   4 +-
 .../apache/cassandra/db/lifecycle/LogReplica.java  |  10 +-
 .../cassandra/exceptions/CassandraException.java   |   5 +
 .../apache/cassandra/exceptions/ExceptionCode.java |   4 +
 .../exceptions/RequestExecutionException.java      |   5 +
 .../cassandra/exceptions/TransportException.java   |   5 +
 .../org/apache/cassandra/gms/FailureDetector.java  |   3 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    | 170 ++++++++++++---
 .../apache/cassandra/io/sstable/Descriptor.java    |   3 +-
 .../cassandra/io/sstable/format/SSTableReader.java |  12 +-
 .../apache/cassandra/io/util/DataInputPlus.java    |   4 +
 .../apache/cassandra/io/util/DataOutputPlus.java   |   6 +-
 .../org/apache/cassandra/io/util/FileReader.java   |   1 -
 src/java/org/apache/cassandra/io/util/Memory.java  |   2 +-
 .../org/apache/cassandra/io/util/PathUtils.java    |   2 +-
 ...ewindableDataInput.java => ReadableMemory.java} |  13 +-
 .../locator/AbstractReplicaCollection.java         |  18 ++
 .../org/apache/cassandra/net/RequestCallbacks.java |   8 +-
 .../apache/cassandra/schema/CompressionParams.java |  13 +-
 .../cassandra/schema/MigrationCoordinator.java     |   3 +-
 src/java/org/apache/cassandra/schema/TableId.java  |   5 +
 .../org/apache/cassandra/schema/TableMetadata.java |   5 +-
 .../org/apache/cassandra/service/ClientState.java  |   7 +-
 .../apache/cassandra/service/StorageService.java   |  37 +++-
 .../cassandra/streaming/StreamingChannel.java      |   5 +
 .../streaming/StreamingDataInputPlus.java          |   4 +
 .../streaming/StreamingDataOutputPlus.java         |   5 +
 .../org/apache/cassandra/utils/ByteBufferUtil.java |   2 +
 src/java/org/apache/cassandra/utils/Clock.java     |   6 +-
 src/java/org/apache/cassandra/utils/Closeable.java |   3 +
 .../org/apache/cassandra/utils/FBUtilities.java    |  12 +-
 src/java/org/apache/cassandra/utils/Hex.java       |  17 ++
 .../java/org/apache/cassandra/utils}/Isolated.java |   4 +-
 .../{WrappedBoolean.java => LazyToString.java}     |  30 ++-
 .../org/apache/cassandra/utils/MonotonicClock.java |   8 +-
 .../cassandra/utils/MonotonicClockTranslation.java |   3 +
 .../org/apache/cassandra/utils/NativeLibrary.java  |  43 ++--
 .../java/org/apache/cassandra/utils}/Shared.java   |   8 +-
 .../org/apache/cassandra/utils/SigarLibrary.java   |   4 +-
 src/java/org/apache/cassandra/utils/UUIDGen.java   |  60 +++++-
 .../org/apache/cassandra/utils/WithResources.java  |   3 +
 .../cassandra/utils/concurrent/Awaitable.java      |   3 +
 .../cassandra/utils/concurrent/Condition.java      |   4 +
 .../cassandra/utils/concurrent/CountDownLatch.java |   4 +
 .../apache/cassandra/utils/concurrent/Future.java  |   4 +
 .../apache/cassandra/utils/concurrent/Promise.java |   5 +
 .../cassandra/utils/concurrent/RunnableFuture.java |   5 +
 .../cassandra/utils/concurrent/Semaphore.java      |   3 +
 .../apache/cassandra/utils/concurrent/Threads.java | 135 ++++++++++++
 .../concurrent/UncheckedInterruptedException.java  |   5 +
 .../cassandra/utils/concurrent/WaitQueue.java      |   5 +
 .../apache/cassandra/utils/memory/HeapPool.java    |  76 ++++++-
 .../cassandra/utils/memory/MemtableAllocator.java  |   2 +-
 .../cassandra/utils/memory/MemtablePool.java       |   3 +-
 .../apache/cassandra/utils/memory/NativePool.java  |   4 +-
 .../apache/cassandra/utils/memory/SlabPool.java    |   4 +-
 .../org/apache/cassandra/distributed/Cluster.java  |   3 +-
 .../distributed/impl/AbstractCluster.java          | 239 +++++++++++++++++++--
 .../cassandra/distributed/impl/Coordinator.java    |   2 +-
 .../distributed/impl/INodeProvisionStrategy.java   |   6 +-
 .../cassandra/distributed/impl/InstanceConfig.java |   2 -
 .../apache/cassandra/distributed/impl/Query.java   |   5 +-
 .../cassandra/distributed/shared/ClusterUtils.java |   1 +
 .../test/BootstrapBinaryDisabledTest.java          |   2 +-
 ...AsHibernatingNodeWithoutReplaceAddressTest.java |   8 +-
 .../upgrade/MixedModeMessageForwardTest.java       |   3 +-
 .../config/DatabaseDescriptorRefTest.java          |   1 +
 .../unit/org/apache/cassandra/db/CellSpecTest.java |   2 +-
 .../cassandra/db/ClusteringHeapSizeTest.java       |   2 +-
 .../org/apache/cassandra/db/NativeCellTest.java    |   2 +-
 .../cassandra/db/filter/ColumnFilterTest.java      |   1 +
 .../apache/cassandra/gms/ArrivalWindowTest.java    |   8 +
 95 files changed, 1189 insertions(+), 168 deletions(-)
 copy src/java/org/apache/cassandra/io/util/{RewindableDataInput.java => ReadableMemory.java} (77%)
 rename {test/distributed/org/apache/cassandra/distributed/shared => src/java/org/apache/cassandra/utils}/Isolated.java (95%)
 copy src/java/org/apache/cassandra/utils/{WrappedBoolean.java => LazyToString.java} (73%)
 rename {test/distributed/org/apache/cassandra/distributed/shared => src/java/org/apache/cassandra/utils}/Shared.java (82%)
 create mode 100644 src/java/org/apache/cassandra/utils/concurrent/Threads.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/03: [CASSANDRA-16932] CEP-10 Phase 2: Minor Gossip Fixes

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit ce2a0a28bc9ca21e1fae29f2a38448a877db06c3
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Mon Apr 26 12:09:20 2021 +0100

    [CASSANDRA-16932] CEP-10 Phase 2: Minor Gossip Fixes
    
    * Ensure we apply new states in correct order so as not to lose TOKEN message
    * Permit replacement nodes to join the ring with lower heartbeat state than
      node being replaced
---
 src/java/org/apache/cassandra/gms/Gossiper.java    | 23 +++++++++++++++++++++-
 .../apache/cassandra/service/StorageService.java   |  4 +++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 3219145..e2ebedc 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1587,12 +1587,33 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         if (!hasMajorVersion3Nodes())
             localState.removeMajorVersion3LegacyApplicationStates();
 
+        // need to run STATUS or STATUS_WITH_PORT first to handle BOOT_REPLACE correctly (else won't be a member, so TOKENS won't be processed)
         for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
         {
+            switch (updatedEntry.getKey())
+            {
+                default:
+                    continue;
+                case STATUS:
+                    if (localState.containsApplicationState(ApplicationState.STATUS_WITH_PORT))
+                        continue;
+                case STATUS_WITH_PORT:
+            }
+            doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
+        }
+
+        for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
+        {
+            switch (updatedEntry.getKey())
+            {
+                // We should have alredy handled these two states above:
+                case STATUS_WITH_PORT:
+                case STATUS:
+                    continue;
+            }
             // filters out legacy change notifications
             // only if local state already indicates that the peer has the new fields
             if ((ApplicationState.INTERNAL_IP == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
-                ||(ApplicationState.STATUS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.STATUS_WITH_PORT))
                 || (ApplicationState.RPC_ADDRESS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT)))
                 continue;
             doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1638505..5c0f4bb 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2728,7 +2728,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 tokensToUpdateInMetadata.add(token);
                 tokensToUpdateInSystemKeyspace.add(token);
             }
-            else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+            // Note: in test scenarios, there may not be any delta between the heartbeat generations of the old
+            // and new nodes, so we first check whether the new endpoint is marked as a replacement for the old.
+            else if (endpoint.equals(tokenMetadata.getReplacementNode(currentOwner).orElse(null)) || Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
             {
                 tokensToUpdateInMetadata.add(token);
                 tokensToUpdateInSystemKeyspace.add(token);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 03/03: [CASSANDRA-16931] CEP-10 Phase 2: Improve DTest @Shared Annotation Functionality

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0c444a75e79da8c157813a72d61f2b2f86e187ba
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Wed Jul 28 20:04:03 2021 +0100

    [CASSANDRA-16931] CEP-10 Phase 2: Improve DTest @Shared Annotation Functionality
---
 .../cassandra/concurrent/ExecutorBuilder.java      |   4 +
 .../concurrent/ExecutorBuilderFactory.java         |   6 +
 .../cassandra/concurrent/ExecutorFactory.java      |   4 +
 .../apache/cassandra/concurrent/ExecutorPlus.java  |   5 +
 .../apache/cassandra/concurrent/Interruptible.java |   5 +
 .../concurrent/LocalAwareExecutorPlus.java         |   5 +
 .../LocalAwareSequentialExecutorPlus.java          |   5 +
 .../cassandra/concurrent/ResizableThreadPool.java  |   5 +
 .../concurrent/ScheduledExecutorPlus.java          |   5 +
 .../concurrent/SequentialExecutorPlus.java         |   6 +
 .../apache/cassandra/concurrent/Shutdownable.java  |   5 +
 .../apache/cassandra/concurrent/TaskFactory.java   |   4 +
 .../cassandra/config/ParameterizedClass.java       |   5 +
 .../cassandra/exceptions/CassandraException.java   |   5 +
 .../apache/cassandra/exceptions/ExceptionCode.java |   4 +
 .../exceptions/RequestExecutionException.java      |   5 +
 .../cassandra/exceptions/TransportException.java   |   5 +
 .../apache/cassandra/io/util/DataInputPlus.java    |   4 +
 .../apache/cassandra/io/util/DataOutputPlus.java   |   6 +-
 .../org/apache/cassandra/io/util/FileReader.java   |   1 -
 src/java/org/apache/cassandra/io/util/Memory.java  |   2 +-
 .../util/ReadableMemory.java}                      |  14 +-
 .../cassandra/streaming/StreamingChannel.java      |   5 +
 .../streaming/StreamingDataInputPlus.java          |   4 +
 .../streaming/StreamingDataOutputPlus.java         |   5 +
 src/java/org/apache/cassandra/utils/Clock.java     |   2 +
 src/java/org/apache/cassandra/utils/Closeable.java |   3 +
 .../java/org/apache/cassandra/utils}/Isolated.java |   4 +-
 .../org/apache/cassandra/utils/MonotonicClock.java |   2 +
 .../cassandra/utils/MonotonicClockTranslation.java |   3 +
 .../java/org/apache/cassandra/utils}/Shared.java   |   8 +-
 .../org/apache/cassandra/utils/WithResources.java  |   3 +
 .../cassandra/utils/concurrent/Awaitable.java      |   3 +
 .../cassandra/utils/concurrent/Condition.java      |   4 +
 .../cassandra/utils/concurrent/CountDownLatch.java |   4 +
 .../apache/cassandra/utils/concurrent/Future.java  |   4 +
 .../apache/cassandra/utils/concurrent/Promise.java |   5 +
 .../cassandra/utils/concurrent/RunnableFuture.java |   5 +
 .../cassandra/utils/concurrent/Semaphore.java      |   3 +
 .../concurrent/UncheckedInterruptedException.java  |   5 +
 .../cassandra/utils/concurrent/WaitQueue.java      |   5 +
 .../org/apache/cassandra/distributed/Cluster.java  |   3 +-
 .../distributed/impl/AbstractCluster.java          | 239 +++++++++++++++++++--
 .../cassandra/distributed/impl/Coordinator.java    |   2 +-
 .../distributed/impl/INodeProvisionStrategy.java   |   6 +-
 .../cassandra/distributed/impl/InstanceConfig.java |   2 -
 .../apache/cassandra/distributed/impl/Query.java   |   5 +-
 .../cassandra/distributed/shared/ClusterUtils.java |   1 +
 .../test/BootstrapBinaryDisabledTest.java          |   2 +-
 ...AsHibernatingNodeWithoutReplaceAddressTest.java |   8 +-
 .../upgrade/MixedModeMessageForwardTest.java       |   3 +-
 51 files changed, 409 insertions(+), 49 deletions(-)

diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorBuilder.java b/src/java/org/apache/cassandra/concurrent/ExecutorBuilder.java
index 89ca28a..c1d39d5 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorBuilder.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorBuilder.java
@@ -26,11 +26,15 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * Configure an executor before creating it.
  * See {@link ThreadPoolExecutorBuilder}
  */
+@Shared(scope = SIMULATION)
 public interface ExecutorBuilder<E extends ExecutorService>
 {
     /**
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorBuilderFactory.java b/src/java/org/apache/cassandra/concurrent/ExecutorBuilderFactory.java
index f96def8..465226a 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorBuilderFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorBuilderFactory.java
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * Entry point for configuring and creating new executors.
  *
@@ -29,6 +34,7 @@ package org.apache.cassandra.concurrent;
  * <li>{@link #configureSequential(String)}
  * <li>{@link #configurePooled(String, int)}
  */
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface ExecutorBuilderFactory<E extends ExecutorPlus, S extends SequentialExecutorPlus>
 {
     /**
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
index 52ba94a..f1acd55 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.concurrent;
 import java.util.function.Consumer;
 
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Shared;
 
 import static java.lang.Thread.*;
 import static org.apache.cassandra.concurrent.NamedThreadFactory.createThread;
 import static org.apache.cassandra.concurrent.NamedThreadFactory.setupThread;
 import static org.apache.cassandra.concurrent.ThreadPoolExecutorBuilder.pooledJmx;
 import static org.apache.cassandra.concurrent.ThreadPoolExecutorBuilder.sequentialJmx;
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * Entry point for configuring and creating new executors.
@@ -52,6 +55,7 @@ import static org.apache.cassandra.concurrent.ThreadPoolExecutorBuilder.sequenti
  * Supports shared executors via sub-factory {@code localAware().withJMX()}
  * using {@link LocalAwareSubFactoryWithJMX#shared(String, int, ExecutorPlus.MaximumPoolSizeListener)}
  */
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<ExecutorPlus, SequentialExecutorPlus>
 {
     public interface LocalAwareSubFactoryWithJMX extends ExecutorBuilderFactory<LocalAwareExecutorPlus, LocalAwareSequentialExecutorPlus>
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/ExecutorPlus.java
index c42a475..1fca66f 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorPlus.java
@@ -26,14 +26,19 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.WithResources;
 import org.apache.cassandra.utils.concurrent.Future;
 
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * Cassandra's extension of {@link ExecutorService}, using our own {@link Future}, supporting
  * {@link #inExecutor()}, and execution with associated resources {@link #execute(WithResources, Runnable)}
  * (which is primarily used for encapsulating {@link ExecutorLocals} without leaking implementing classes).
  */
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface ExecutorPlus extends ExecutorService, ResizableThreadPool
 {
     interface MaximumPoolSizeListener
diff --git a/src/java/org/apache/cassandra/concurrent/Interruptible.java b/src/java/org/apache/cassandra/concurrent/Interruptible.java
index cc13a63..8641ec8 100644
--- a/src/java/org/apache/cassandra/concurrent/Interruptible.java
+++ b/src/java/org/apache/cassandra/concurrent/Interruptible.java
@@ -18,8 +18,13 @@
 
 package org.apache.cassandra.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
 import static org.apache.cassandra.concurrent.Interruptible.State.*;
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface Interruptible extends Shutdownable
 {
     public enum State { NORMAL, INTERRUPTED, SHUTTING_DOWN }
diff --git a/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorPlus.java
index 7509619..743cacc 100644
--- a/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/LocalAwareExecutorPlus.java
@@ -18,9 +18,14 @@
 
 package org.apache.cassandra.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * An {@link ExecutorPlus} that is aware of, and propagates to execution, any ExecutorLocals
  */
+@Shared(scope = SIMULATION)
 public interface LocalAwareExecutorPlus extends ExecutorPlus
 {
 }
diff --git a/src/java/org/apache/cassandra/concurrent/LocalAwareSequentialExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/LocalAwareSequentialExecutorPlus.java
index dbcff6c..99e44b0 100644
--- a/src/java/org/apache/cassandra/concurrent/LocalAwareSequentialExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/LocalAwareSequentialExecutorPlus.java
@@ -18,9 +18,14 @@
 
 package org.apache.cassandra.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * A {@link SequentialExecutorPlus} that is aware of, and propagates to execution, any ExecutorLocals
  */
+@Shared(scope = SIMULATION)
 public interface LocalAwareSequentialExecutorPlus extends LocalAwareExecutorPlus, SequentialExecutorPlus
 {
 }
diff --git a/src/java/org/apache/cassandra/concurrent/ResizableThreadPool.java b/src/java/org/apache/cassandra/concurrent/ResizableThreadPool.java
index 760c06e..9c1dba6 100644
--- a/src/java/org/apache/cassandra/concurrent/ResizableThreadPool.java
+++ b/src/java/org/apache/cassandra/concurrent/ResizableThreadPool.java
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface ResizableThreadPool
 {
     /**
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
index 0b512ac..ecf073d 100644
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutorPlus.java
@@ -20,6 +20,11 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface ScheduledExecutorPlus extends ExecutorPlus, ScheduledExecutorService
 {
 }
diff --git a/src/java/org/apache/cassandra/concurrent/SequentialExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/SequentialExecutorPlus.java
index 7ea0e95..2b63f14 100644
--- a/src/java/org/apache/cassandra/concurrent/SequentialExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/SequentialExecutorPlus.java
@@ -18,12 +18,18 @@
 
 package org.apache.cassandra.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * An {@link ExecutorPlus} that guarantees the order of execution matches the order of task submission,
  * and provides a simple mechanism for the recurring pattern of ensuring a job is executed at least once
  * after some point in time (i.e. ensures that at most one copy of the task is queued, with up to one
  * copy running as well)
  */
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface SequentialExecutorPlus extends ExecutorPlus
 {
     public interface AtLeastOnceTrigger
diff --git a/src/java/org/apache/cassandra/concurrent/Shutdownable.java b/src/java/org/apache/cassandra/concurrent/Shutdownable.java
index db89217..185875b 100644
--- a/src/java/org/apache/cassandra/concurrent/Shutdownable.java
+++ b/src/java/org/apache/cassandra/concurrent/Shutdownable.java
@@ -20,6 +20,11 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface Shutdownable
 {
     boolean isTerminated();
diff --git a/src/java/org/apache/cassandra/concurrent/TaskFactory.java b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
index a25a45f..56087d9 100644
--- a/src/java/org/apache/cassandra/concurrent/TaskFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.Callable;
 
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.WithResources;
 import org.apache.cassandra.utils.concurrent.RunnableFuture;
 
 import static org.apache.cassandra.concurrent.FutureTask.callable;
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * A simple mechanism to impose our desired semantics on the execution of a task without requiring a specialised
@@ -32,6 +35,7 @@ import static org.apache.cassandra.concurrent.FutureTask.callable;
  * The encapsulations handle any exceptions in our standard way, as well as ensuring {@link ExecutorLocals} are
  * propagated in the case of {@link #localAware()}
  */
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface TaskFactory
 {
     Runnable toExecute(Runnable runnable);
diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java b/src/java/org/apache/cassandra/config/ParameterizedClass.java
index 4b8cf5a..9b00178 100644
--- a/src/java/org/apache/cassandra/config/ParameterizedClass.java
+++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java
@@ -22,6 +22,11 @@ import java.util.Map;
 
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public class ParameterizedClass
 {
     public static final String CLASS_NAME = "class_name";
diff --git a/src/java/org/apache/cassandra/exceptions/CassandraException.java b/src/java/org/apache/cassandra/exceptions/CassandraException.java
index 58521df..119daac 100644
--- a/src/java/org/apache/cassandra/exceptions/CassandraException.java
+++ b/src/java/org/apache/cassandra/exceptions/CassandraException.java
@@ -17,6 +17,11 @@
  */
 package org.apache.cassandra.exceptions;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public abstract class CassandraException extends RuntimeException implements TransportException
 {
     private final ExceptionCode code;
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index 1766951..8bb0cfd 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -21,10 +21,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * Exceptions code, as defined by the binary protocol.
  */
+@Shared(scope = SIMULATION)
 public enum ExceptionCode
 {
     SERVER_ERROR    (0x0000),
diff --git a/src/java/org/apache/cassandra/exceptions/RequestExecutionException.java b/src/java/org/apache/cassandra/exceptions/RequestExecutionException.java
index 4db108a..d80559a 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestExecutionException.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestExecutionException.java
@@ -17,6 +17,11 @@
  */
 package org.apache.cassandra.exceptions;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public abstract class RequestExecutionException extends CassandraException
 {
     protected RequestExecutionException(ExceptionCode code, String msg)
diff --git a/src/java/org/apache/cassandra/exceptions/TransportException.java b/src/java/org/apache/cassandra/exceptions/TransportException.java
index 70d1da5..9807749 100644
--- a/src/java/org/apache/cassandra/exceptions/TransportException.java
+++ b/src/java/org/apache/cassandra/exceptions/TransportException.java
@@ -17,6 +17,11 @@
  */
 package org.apache.cassandra.exceptions;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface TransportException
 {
     /**
diff --git a/src/java/org/apache/cassandra/io/util/DataInputPlus.java b/src/java/org/apache/cassandra/io/util/DataInputPlus.java
index 41b422a..bda8461 100644
--- a/src/java/org/apache/cassandra/io/util/DataInputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataInputPlus.java
@@ -19,11 +19,15 @@ package org.apache.cassandra.io.util;
 
 import java.io.*;
 
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * Extension to DataInput that provides support for reading varints
  */
+@Shared(scope = SIMULATION)
 public interface DataInputPlus extends DataInput
 {
     default long readVInt() throws IOException
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index b94d097..a8f545e 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -21,18 +21,22 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * Extension to DataOutput that provides for writing ByteBuffer and Memory, potentially with an efficient
  * implementation that is zero copy or at least has reduced bounds checking overhead.
  */
+@Shared(scope = SIMULATION)
 public interface DataOutputPlus extends DataOutput
 {
     // write the buffer without modifying its position
     void write(ByteBuffer buffer) throws IOException;
 
-    default void write(Memory memory, long offset, long length) throws IOException
+    default void write(ReadableMemory memory, long offset, long length) throws IOException
     {
         for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
             write(buffer);
diff --git a/src/java/org/apache/cassandra/io/util/FileReader.java b/src/java/org/apache/cassandra/io/util/FileReader.java
index 55b8fbb..86d0388 100644
--- a/src/java/org/apache/cassandra/io/util/FileReader.java
+++ b/src/java/org/apache/cassandra/io/util/FileReader.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.io.util;
 
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 
 public class FileReader extends InputStreamReader
 {
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index eaa6e91..c55f047 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -32,7 +32,7 @@ import sun.misc.Unsafe;
 /**
  * An off-heap region of memory that must be manually free'd when no longer needed.
  */
-public class Memory implements AutoCloseable
+public class Memory implements AutoCloseable, ReadableMemory
 {
     private static final Unsafe unsafe;
     static
diff --git a/src/java/org/apache/cassandra/streaming/StreamingDataInputPlus.java b/src/java/org/apache/cassandra/io/util/ReadableMemory.java
similarity index 73%
copy from src/java/org/apache/cassandra/streaming/StreamingDataInputPlus.java
copy to src/java/org/apache/cassandra/io/util/ReadableMemory.java
index f3e6111..ccb717d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingDataInputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/ReadableMemory.java
@@ -16,14 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.streaming;
+package org.apache.cassandra.io.util;
 
-import java.io.Closeable;
+import java.nio.ByteBuffer;
 
-import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.Shared;
 
-public interface StreamingDataInputPlus extends DataInputPlus, Closeable
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
+public interface ReadableMemory
 {
-    @Override
-    void close();
+    ByteBuffer[] asByteBuffers(long offset, long length);
 }
diff --git a/src/java/org/apache/cassandra/streaming/StreamingChannel.java b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
index af6e68e..18bb2b7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingChannel.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingChannel.java
@@ -24,7 +24,12 @@ import java.util.function.IntFunction;
 
 import io.netty.util.concurrent.Future;
 import org.apache.cassandra.streaming.async.NettyStreamingConnectionFactory;
+import org.apache.cassandra.utils.Shared;
 
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface StreamingChannel
 {
     public interface Factory
diff --git a/src/java/org/apache/cassandra/streaming/StreamingDataInputPlus.java b/src/java/org/apache/cassandra/streaming/StreamingDataInputPlus.java
index f3e6111..0cfcc0d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingDataInputPlus.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingDataInputPlus.java
@@ -21,7 +21,11 @@ package org.apache.cassandra.streaming;
 import java.io.Closeable;
 
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.Shared;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface StreamingDataInputPlus extends DataInputPlus, Closeable
 {
     @Override
diff --git a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
index d4a514b..3f68b3a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingDataOutputPlus.java
@@ -25,7 +25,12 @@ import java.nio.channels.FileChannel;
 
 import io.netty.channel.FileRegion;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.Shared;
 
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface StreamingDataOutputPlus extends DataOutputPlus, Closeable
 {
     interface BufferSupplier
diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java
index 6fd0efd..1f39493 100644
--- a/src/java/org/apache/cassandra/utils/Clock.java
+++ b/src/java/org/apache/cassandra/utils/Clock.java
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_GLOBAL;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * Wrapper around time related functions that are either implemented by using the default JVM calls
@@ -33,6 +34,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_GLOB
  * Please note that {@link java.time.Clock} wasn't used, as it would not be possible to provide an
  * implementation for {@link #nanoTime()} with the exact same properties of {@link System#nanoTime()}.
  */
+@Shared(scope = SIMULATION)
 public interface Clock
 {
     static final Logger logger = LoggerFactory.getLogger(Clock.class);
diff --git a/src/java/org/apache/cassandra/utils/Closeable.java b/src/java/org/apache/cassandra/utils/Closeable.java
index ccc33ea..1a8c1e9 100644
--- a/src/java/org/apache/cassandra/utils/Closeable.java
+++ b/src/java/org/apache/cassandra/utils/Closeable.java
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.utils;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface Closeable extends java.io.Closeable
 {
     public void close();
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/Isolated.java b/src/java/org/apache/cassandra/utils/Isolated.java
similarity index 95%
rename from test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
rename to src/java/org/apache/cassandra/utils/Isolated.java
index 898631f..12f88e3 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
+++ b/src/java/org/apache/cassandra/utils/Isolated.java
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.shared;
+package org.apache.cassandra.utils;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+import org.apache.cassandra.utils.Shared;
+
 /**
  * Tells jvm-dtest that a class should be isolated and loaded into the instance class loader.
  *
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index 2ef563c..e14fd45 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -33,6 +33,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_APPROX;
 import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_PRECISE;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * Wrapper around time related functions that are either implemented by using the default JVM calls
@@ -45,6 +46,7 @@ import static org.apache.cassandra.utils.Clock.Global.nanoTime;
  *
  * TODO better rationalise MonotonicClock/Clock
  */
+@Shared(scope = SIMULATION)
 public interface MonotonicClock
 {
     /**
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClockTranslation.java b/src/java/org/apache/cassandra/utils/MonotonicClockTranslation.java
index f7f83e4..cef8bd8 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClockTranslation.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClockTranslation.java
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.utils;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface MonotonicClockTranslation
 {
     /** accepts millis since epoch, returns nanoTime in the related clock */
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java b/src/java/org/apache/cassandra/utils/Shared.java
similarity index 82%
rename from test/distributed/org/apache/cassandra/distributed/shared/Shared.java
rename to src/java/org/apache/cassandra/utils/Shared.java
index bb67070..e576c86 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++ b/src/java/org/apache/cassandra/utils/Shared.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.shared;
+package org.apache.cassandra.utils;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
@@ -36,4 +36,10 @@ import java.lang.annotation.Target;
 @Target({ ElementType.TYPE })
 public @interface Shared
 {
+    enum Scope { ANY, SIMULATION }
+    enum Recursive { NONE, INTERFACES /*(and enums and exceptions) */, ALL }
+    Scope[] scope() default Scope.ANY;
+    Recursive inner() default Recursive.NONE;
+    Recursive ancestors() default Recursive.NONE;
+    Recursive members() default Recursive.NONE;
 }
diff --git a/src/java/org/apache/cassandra/utils/WithResources.java b/src/java/org/apache/cassandra/utils/WithResources.java
index 76e218c..0c0bb92 100644
--- a/src/java/org/apache/cassandra/utils/WithResources.java
+++ b/src/java/org/apache/cassandra/utils/WithResources.java
@@ -20,12 +20,15 @@ package org.apache.cassandra.utils;
 
 import org.apache.cassandra.concurrent.ExecutorPlus;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * A generic interface for encapsulating a Runnable task with related work before and after execution,
  * using the built-in try-with-resources functionality offered by {@link Closeable}.
  *
  * See {@link ExecutorPlus#execute(WithResources, Runnable)}
  */
+@Shared(scope = SIMULATION)
 public interface WithResources
 {
     static class None implements WithResources
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
index 5b3d315..03aab5f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Awaitable.java
@@ -23,17 +23,20 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
 
 import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.utils.Shared;
 
 import org.apache.cassandra.utils.Intercept;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * A generic signal consumer, supporting all of the typical patterns used in Cassandra.
  * All of the methods defined in {@link Awaitable} may be waited on without a loop,
  * as this interface declares that there are no spurious wake-ups.
  */
+@Shared(scope = SIMULATION)
 public interface Awaitable
 {
     /**
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Condition.java b/src/java/org/apache/cassandra/utils/concurrent/Condition.java
index f47e20f..eb97848 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Condition.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Condition.java
@@ -19,6 +19,9 @@
 package org.apache.cassandra.utils.concurrent;
 
 import org.apache.cassandra.utils.Intercept;
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * Simpler API than java.util.concurrent.Condition; would be nice to extend it, but also nice
@@ -26,6 +29,7 @@ import org.apache.cassandra.utils.Intercept;
  *
  * {@link Awaitable} for explicit external signals.
  */
+@Shared(scope = SIMULATION)
 public interface Condition extends Awaitable
 {
     /**
diff --git a/src/java/org/apache/cassandra/utils/concurrent/CountDownLatch.java b/src/java/org/apache/cassandra/utils/concurrent/CountDownLatch.java
index 5988375..976abb7 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/CountDownLatch.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/CountDownLatch.java
@@ -21,7 +21,11 @@ package org.apache.cassandra.utils.concurrent;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.cassandra.utils.Intercept;
+import org.apache.cassandra.utils.Shared;
 
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface CountDownLatch extends Awaitable
 {
     /**
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index e21a654..4454a2e 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -30,13 +30,17 @@ import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import io.netty.util.internal.PlatformDependent;
+import org.apache.cassandra.utils.Shared;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * A Future that integrates several different (but equivalent) APIs used within Cassandra into a single concept,
  * integrating also with our {@link Awaitable} abstraction, to overall improve coherency and clarity in the codebase.
  */
+@Shared(scope = SIMULATION, ancestors = INTERFACES)
 public interface Future<V> extends io.netty.util.concurrent.Future<V>, ListenableFuture<V>, Awaitable
 {
     /**
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Promise.java b/src/java/org/apache/cassandra/utils/concurrent/Promise.java
index 10d6275..d9e4623 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Promise.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Promise.java
@@ -25,11 +25,16 @@ import java.util.function.Consumer;
 import com.google.common.util.concurrent.FutureCallback;
 
 import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 /**
  * A Promise that integrates {@link io.netty.util.concurrent.Promise} with our {@link Future} API
  * to improve clarity and coherence in the codebase.
  */
+@Shared(scope = SIMULATION, ancestors = INTERFACES)
 public interface Promise<V> extends io.netty.util.concurrent.Promise<V>, Future<V>
 {
     public static <V> GenericFutureListener<? extends Future<V>> listener(FutureCallback<V> callback)
diff --git a/src/java/org/apache/cassandra/utils/concurrent/RunnableFuture.java b/src/java/org/apache/cassandra/utils/concurrent/RunnableFuture.java
index 74d8678..6ec1daa4 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/RunnableFuture.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/RunnableFuture.java
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.utils.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+@Shared(scope = SIMULATION)
 public interface RunnableFuture<V> extends Future<V>, java.util.concurrent.RunnableFuture<V>
 {
 }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
index f16ffdd..66dd543 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Semaphore.java
@@ -24,10 +24,13 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import net.openhft.chronicle.core.util.ThrowingConsumer;
 import org.apache.cassandra.utils.Intercept;
+import org.apache.cassandra.utils.Shared;
 
 import static java.lang.System.nanoTime;
 import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
+@Shared(scope = SIMULATION)
 public interface Semaphore
 {
     /**
diff --git a/src/java/org/apache/cassandra/utils/concurrent/UncheckedInterruptedException.java b/src/java/org/apache/cassandra/utils/concurrent/UncheckedInterruptedException.java
index 8e85b84..d7248e8 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/UncheckedInterruptedException.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/UncheckedInterruptedException.java
@@ -18,9 +18,14 @@
 
 package org.apache.cassandra.utils.concurrent;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * Unchecked {@link InterruptedException}, to be thrown in places where an interrupt is unexpected
  */
+@Shared(scope = SIMULATION)
 public class UncheckedInterruptedException extends RuntimeException
 {
     public UncheckedInterruptedException()
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
index 4fbe7c6..e9dcdf8 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
@@ -26,9 +26,13 @@ import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 
 import org.apache.cassandra.utils.Intercept;
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.concurrent.Awaitable.AbstractAwaitable;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
 /**
  * <p>A relatively easy to use utility for general purpose thread signalling.</p>
  * <p>Usage on a thread awaiting a state change using a WaitQueue q is:</p>
@@ -73,6 +77,7 @@ import static org.apache.cassandra.utils.Clock.Global.nanoTime;
  *
  * TODO: this class should not be backed by CLQ (should use an intrusive linked-list with lower overhead)
  */
+@Shared(scope = SIMULATION, inner = INTERFACES)
 public interface WaitQueue
 {
     /**
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index 05ea799..116dcd6 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -24,14 +24,13 @@ import java.util.function.Consumer;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.distributed.shared.Shared;
 import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.utils.Shared;
 
 /**
  * A simple cluster supporting only the 'current' Cassandra version, offering easy access to the convenience methods
  * of IInvokableInstance on each node.
  */
-@Shared
 public class Cluster extends AbstractCluster<IInvokableInstance>
 {
     private Cluster(Builder builder)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 2f146bf..be49daa 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -19,6 +19,9 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.net.InetSocketAddress;
 import java.nio.file.FileSystem;
 import java.nio.file.Files;
@@ -27,12 +30,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
@@ -40,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -47,6 +53,7 @@ import java.util.stream.Stream;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,22 +82,29 @@ import org.apache.cassandra.distributed.api.LogAction;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.InstanceClassLoader;
-import org.apache.cassandra.distributed.shared.Isolated;
+import org.apache.cassandra.utils.Isolated;
 import org.apache.cassandra.distributed.shared.MessageFilters;
 import org.apache.cassandra.distributed.shared.Metrics;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.distributed.shared.ShutdownException;
 import org.apache.cassandra.distributed.shared.Versions;
 import org.apache.cassandra.io.util.PathUtils;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.Shared.Recursive;
 import org.apache.cassandra.utils.concurrent.Condition;
 import org.apache.cassandra.utils.FBUtilities;
 import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
 import org.reflections.util.ConfigurationBuilder;
 
+import static java.util.stream.Stream.of;
 import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
+import static org.apache.cassandra.utils.Shared.Recursive.ALL;
+import static org.apache.cassandra.utils.Shared.Recursive.NONE;
+import static org.apache.cassandra.utils.Shared.Scope.ANY;
 import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeCondition;
+import static org.reflections.ReflectionUtils.forNames;
 
 /**
  * AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
@@ -116,7 +130,6 @@ import static org.apache.cassandra.utils.concurrent.Condition.newOneTimeConditio
  * handlers for internode to have more control over it. Messaging is wired by passing verbs manually.
  * coordinator-handling code and hooks to the callbacks can be found in {@link Coordinator}.
  */
-@Shared
 public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
 {
     public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());
@@ -128,16 +141,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
     private static final AtomicInteger GENERATION = new AtomicInteger();
 
     // include byteman so tests can use
-    private static final Set<String> SHARED_CLASSES = findClassesMarkedForSharedClassLoader();
-    private static final Set<String> ISOLATED_CLASSES = findClassesMarkedForInstanceClassLoader();
-    public static final Predicate<String> SHARED_PREDICATE = s -> {
-        if (ISOLATED_CLASSES.contains(s))
-            return false;
-
-        return SHARED_CLASSES.contains(s) ||
-               InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
-               s.startsWith("org.jboss.byteman");
-    };
+    public static final Predicate<String> SHARED_PREDICATE = getSharedClassPredicate(ANY);
 
     private final UUID clusterId = UUID.randomUUID();
     private final Path root;
@@ -996,22 +1000,213 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
                .collect(Collectors.toList());
     }
 
-    private static Set<String> findClassesMarkedForSharedClassLoader()
+    private static Set<String> findClassesMarkedForSharedClassLoader(Class<?>[] share, Shared.Scope ... scopes)
+    {
+        return findClassesMarkedForSharedClassLoader(share, ImmutableSet.copyOf(scopes)::contains);
+    }
+
+    private static Set<String> findClassesMarkedForSharedClassLoader(Class<?>[] share, Predicate<Shared.Scope> scopes)
     {
-        return findClassesMarkedWith(Shared.class);
+        Set<Class<?>> classes = findClassesMarkedWith(Shared.class, a -> of(a.scope()).anyMatch(scopes));
+        Collections.addAll(classes, share);
+        assertTransitiveClosure(classes);
+        return toNames(classes);
     }
 
-    private static Set<String> findClassesMarkedForInstanceClassLoader()
+    private static Set<String> findClassesMarkedForInstanceClassLoader(Class<?>[] isolate)
     {
-        return findClassesMarkedWith(Isolated.class);
+        Set<Class<?>> classes = findClassesMarkedWith(Isolated.class, ignore -> true);
+        Collections.addAll(classes, isolate);
+        return toNames(classes);
     }
 
-    private static Set<String> findClassesMarkedWith(Class<? extends Annotation> annotation)
+    public static Predicate<String> getSharedClassPredicate(Shared.Scope ... scopes)
     {
-        return new Reflections(ConfigurationBuilder.build("org.apache.cassandra").setExpandSuperTypes(false))
-               .getTypesAnnotatedWith(annotation).stream()
-               .map(Class::getName)
+        return getSharedClassPredicate(new Class[0], new Class[0], scopes);
+    }
+
+    public static Predicate<String> getSharedClassPredicate(Class<?>[] isolate, Class<?>[] share, Shared.Scope ... scopes)
+    {
+        Set<String> shared = findClassesMarkedForSharedClassLoader(share, scopes);
+        Set<String> isolated = findClassesMarkedForInstanceClassLoader(isolate);
+        return s -> {
+            if (isolated.contains(s))
+                return false;
+
+            return shared.contains(s) ||
+                   InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
+                   s.startsWith("org.jboss.byteman");
+        };
+    }
+
+    private static <A extends Annotation> Set<Class<?>> findClassesMarkedWith(Class<A> annotation, Predicate<A> testAnnotation)
+    {
+        Reflections reflections = new Reflections(ConfigurationBuilder.build("org.apache.cassandra").setExpandSuperTypes(false));
+        return forNames(reflections.getStore().getAll(TypeAnnotationsScanner.class, annotation.getName()),
+                        reflections.getConfiguration().getClassLoaders())
+               .stream()
+               .filter(testAnnotation(annotation, testAnnotation))
+               .flatMap(expander())
                .collect(Collectors.toSet());
     }
+
+    private static Set<String> toNames(Set<Class<?>> classes)
+    {
+        return classes.stream().map(Class::getName).collect(Collectors.toSet());
+    }
+
+    private static <A extends Annotation> Predicate<Class<?>> testAnnotation(Class<A> annotation, Predicate<A> test)
+    {
+        return clazz -> {
+            A[] annotations = clazz.getDeclaredAnnotationsByType(annotation);
+            for (A a : annotations)
+            {
+                if (!test.test(a))
+                    return false;
+            }
+            return true;
+        };
+    }
+
+    private static void assertTransitiveClosure(Set<Class<?>> classes)
+    {
+        Set<Class<?>> tested = new HashSet<>();
+        for (Class<?> clazz : classes)
+        {
+            forEach(test -> {
+                if (!classes.contains(test))
+                    throw new AssertionError(clazz.getName() + " is shared, but its dependency " + test + " is not");
+            }, new SharedParams(ALL, ALL, NONE), clazz, tested);
+        }
+    }
+
+    private static class SharedParams
+    {
+        final Recursive ancestors, members, inner;
+
+        private SharedParams(Recursive ancestors, Recursive members, Recursive inner)
+        {
+            this.ancestors = ancestors;
+            this.members = members;
+            this.inner = inner;
+        }
+
+        private SharedParams(Shared shared)
+        {
+            this.ancestors = shared.ancestors();
+            this.members = shared.members();
+            this.inner = shared.inner();
+        }
+    }
+
+    private static void forEach(Consumer<Class<?>> forEach, SharedParams shared, Class<?> cur, Set<Class<?>> done)
+    {
+        if (null == (cur = consider(cur, done)))
+            return;
+
+        forEach.accept(cur);
+
+        switch (shared.ancestors)
+        {
+            case ALL:
+                forEach(forEach, shared, cur.getSuperclass(), done);
+            case INTERFACES:
+                for (Class<?> i : cur.getInterfaces())
+                    forEach(forEach, shared, i, done);
+        }
+
+        if (shared.members != NONE)
+        {
+            for (Field field : cur.getDeclaredFields())
+            {
+                if ((field.getModifiers() & Modifier.PRIVATE) == 0)
+                    forEachMatch(shared.members, forEach, shared, field.getType(), done);
+            }
+
+            for (Method method : cur.getDeclaredMethods())
+            {
+                if ((method.getModifiers() & Modifier.PRIVATE) == 0)
+                {
+                    forEachMatch(shared.members, forEach, shared, method.getReturnType(), done);
+                    forEachMatch(shared.members, forEach, shared, method.getParameterTypes(), done);
+                }
+            }
+        }
+
+        if (shared.inner != NONE)
+            forEachMatch(shared.inner, forEach, shared, cur.getDeclaredClasses(), done);
+    }
+
+    private static void forEachMatch(Recursive ifMatches, Consumer<Class<?>> forEach, SharedParams shared, Class<?>[] classes, Set<Class<?>> done)
+    {
+        for (Class<?> cur : classes)
+            forEachMatch(ifMatches, forEach, shared, cur, done);
+    }
+
+    private static void forEachMatch(Recursive ifMatches, Consumer<Class<?>> forEach, SharedParams shared, Class<?> cur, Set<Class<?>> done)
+    {
+        if (ifMatches == ALL || isInterface(cur))
+            forEach(forEach, shared, cur, done);
+    }
+
+    private static boolean isInterface(Class<?> test)
+    {
+        return test.isInterface() || test.isEnum() || Throwable.class.isAssignableFrom(test);
+    }
+
+    private static Function<Class<?>, Stream<Class<?>>> expander()
+    {
+        Set<Class<?>> done = new HashSet<>();
+        return clazz -> expand(clazz, done);
+    }
+
+    private static Stream<Class<?>> expand(Class<?> clazz, Set<Class<?>> done)
+    {
+        Optional<Shared> maybeShared = of(clazz.getDeclaredAnnotationsByType(Shared.class)).findFirst();
+        if (!maybeShared.isPresent())
+            return Stream.of(clazz);
+
+        Shared shared = maybeShared.get();
+        if (shared.inner() == NONE && shared.members() == NONE && shared.ancestors() == NONE)
+            return Stream.of(clazz);
+
+        Set<Class<?>> closure = new HashSet<>();
+        forEach(closure::add, new SharedParams(shared), clazz, done);
+        return closure.stream();
+    }
+
+    private static Class<?> consider(Class<?> consider, Set<Class<?>> considered)
+    {
+        if (consider == null) return null;
+        while (consider.isArray()) // TODO (future): this is inadequate handling of array types (fine for now)
+            consider = consider.getComponentType();
+
+        if (consider.isPrimitive()) return null;
+        if (consider.getPackage() != null && consider.getPackage().getName().startsWith("java.")) return null;
+        if (!considered.add(consider)) return null;
+        if (InstanceClassLoader.getDefaultLoadSharedFilter().test(consider.getName())) return null;
+
+        return consider;
+    }
+
+    // 3.0 and earlier clusters must have unique InetAddressAndPort for each InetAddress
+    public static <I extends IInstance> Map<InetSocketAddress, I> getUniqueAddressLookup(ICluster<I> cluster)
+    {
+        return getUniqueAddressLookup(cluster, Function.identity());
+    }
+
+    public static <I extends IInstance, V> Map<InetSocketAddress, V> getUniqueAddressLookup(ICluster<I> cluster, Function<I, V> function)
+    {
+        Map<InetSocketAddress, V> lookup = new HashMap<>();
+        cluster.stream().forEach(instance -> {
+            InetSocketAddress address = instance.broadcastAddress();
+            if (!address.equals(instance.config().broadcastAddress()))
+                throw new IllegalStateException("addressAndPort mismatch: " + address + " vs " + instance.config().broadcastAddress());
+            V prev = lookup.put(address, function.apply(instance));
+            if (null != prev)
+                throw new IllegalStateException("This version of Cassandra does not support multiple nodes with the same InetAddress: " + address + " vs " + prev);
+        });
+        return lookup;
+    }
 }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 9fdebed..b409c88 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -210,7 +210,7 @@ public class Coordinator implements ICoordinator
         }).call();
     }
 
-    static ClientState makeFakeClientState()
+    public static ClientState makeFakeClientState()
     {
         return ClientState.forExternalCalls(new InetSocketAddress(FBUtilities.getJustLocalAddress(), 9042));
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
index 32f82c0..1ec844e 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
@@ -18,9 +18,13 @@
 
 package org.apache.cassandra.distributed.impl;
 
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+
+@Shared(inner = INTERFACES)
 public interface INodeProvisionStrategy
 {
-
     public enum Strategy
     {
         OneNetworkInterface
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 9978697..c7ae775 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -33,12 +33,10 @@ import com.vdurmont.semver4j.Semver;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.distributed.shared.Shared;
 import org.apache.cassandra.distributed.upgrade.UpgradeTestBase;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SimpleSeedProvider;
 
-@Shared
 public class InstanceConfig implements IInstanceConfig
 {
     public final int num;
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Query.java b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
index 1b03996..7950d75 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Query.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
@@ -35,11 +35,10 @@ import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.distributed.impl.Coordinator.toCassandraCL;
-
-// TODO: maybe just keep with Simulator?
 public class Query implements IIsolatedExecutor.SerializableCallable<Object[][]>
 {
+    private static final long serialVersionUID = 1L;
+
     final String query;
     final long timestamp;
     final org.apache.cassandra.distributed.api.ConsistencyLevel commitConsistencyOrigin;
diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 1821f9c..35d6323 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.impl.InstanceConfig;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Isolated;
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
diff --git a/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
index a7ac605..55b6c86 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.Byteman;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
-import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.utils.Shared;
 
 /**
  * Replaces python dtest bootstrap_test.py::TestBootstrap::test_bootstrap_binary_disabled
diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java
index 78d55e3..7d69248 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,14 +34,15 @@ import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.impl.InstanceIDDefiner;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
-import org.apache.cassandra.distributed.shared.Shared;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Shared;
 import org.assertj.core.api.Assertions;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -109,7 +109,7 @@ public class NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest extends Te
     @Shared
     public static class SharedState
     {
-        public static volatile Cluster cluster;
+        public static volatile ICluster cluster;
         // Instance.shutdown can only be called once so only the caller knows when its done (isShutdown looks at a field set BEFORE shutting down..)
         // since the test needs to know when shutdown completes, add this static state so the caller (bytebuddy rewrite) can update it
         public static final CountDownLatch shutdownComplete = new CountDownLatch(1);
@@ -121,7 +121,7 @@ public class NodeCannotJoinAsHibernatingNodeWithoutReplaceAddressTest extends Te
         {
             fn.run();
             int id = Integer.parseInt(InstanceIDDefiner.getInstanceId().replace("node", ""));
-            Cluster cluster = Objects.requireNonNull(SharedState.cluster);
+            ICluster cluster = Objects.requireNonNull(SharedState.cluster);
             // can't stop here as the stop method and start method share a lock; and block gets called in start...
             ForkJoinPool.commonPool().execute(() -> {
                 ClusterUtils.stopAbrupt(cluster, cluster.get(id));
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
index 78e2219..72b7f1a 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeMessageForwardTest.java
@@ -28,11 +28,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
-import org.apache.cassandra.distributed.shared.Shared;
+import org.apache.cassandra.utils.Shared;
 
 import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 
-@Shared
 public class MixedModeMessageForwardTest extends UpgradeTestBase
 {
     private static final Logger logger = LoggerFactory.getLogger(MixedModeMessageForwardTest.class);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/03: [CASSANDRA-16930] CEP-10 Phase 2: Improved Configuration For Controlling Determinism

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit fe9cff663b48fecdb964caaded2004e83a0c89f4
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Wed Jul 28 20:03:09 2021 +0100

    [CASSANDRA-16930] CEP-10 Phase 2: Improved Configuration For Controlling Determinism
    
    Co-authored-by: Benedict Elliott Smith <be...@apache.org>
    Co-authored-by: Sam Tunnicliffe <sa...@apache.org>
    Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
 .../org/apache/cassandra/auth/AuthKeyspace.java    |   5 +-
 .../cassandra/concurrent/NamedThreadFactory.java   |   1 +
 .../concurrent/SingleThreadExecutorPlus.java       |   6 +
 .../cassandra/concurrent/SyncFutureTask.java       |  17 ++-
 .../concurrent/ThreadPoolExecutorBuilder.java      |   2 +-
 .../config/CassandraRelevantProperties.java        |  79 ++++++++++-
 src/java/org/apache/cassandra/config/Config.java   |   8 ++
 .../cassandra/config/DatabaseDescriptor.java       |  36 +++++
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   6 +
 src/java/org/apache/cassandra/db/Memtable.java     |  22 ++-
 .../db/commitlog/BatchCommitLogService.java        |   4 +-
 .../apache/cassandra/db/lifecycle/LogReplica.java  |  10 +-
 .../org/apache/cassandra/gms/FailureDetector.java  |   3 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    | 147 ++++++++++++++++-----
 .../apache/cassandra/io/sstable/Descriptor.java    |   3 +-
 .../cassandra/io/sstable/format/SSTableReader.java |  12 +-
 .../org/apache/cassandra/io/util/PathUtils.java    |   2 +-
 .../locator/AbstractReplicaCollection.java         |  18 +++
 .../org/apache/cassandra/net/RequestCallbacks.java |   8 +-
 .../apache/cassandra/schema/CompressionParams.java |  13 +-
 .../cassandra/schema/MigrationCoordinator.java     |   3 +-
 src/java/org/apache/cassandra/schema/TableId.java  |   5 +
 .../org/apache/cassandra/schema/TableMetadata.java |   5 +-
 .../org/apache/cassandra/service/ClientState.java  |   7 +-
 .../apache/cassandra/service/StorageService.java   |  33 ++++-
 .../org/apache/cassandra/utils/ByteBufferUtil.java |   2 +
 src/java/org/apache/cassandra/utils/Clock.java     |   4 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  12 +-
 src/java/org/apache/cassandra/utils/Hex.java       |  17 +++
 .../org/apache/cassandra/utils/LazyToString.java   |  36 +++++
 .../org/apache/cassandra/utils/MonotonicClock.java |   6 +-
 .../org/apache/cassandra/utils/NativeLibrary.java  |  43 ++++--
 .../org/apache/cassandra/utils/SigarLibrary.java   |   4 +-
 src/java/org/apache/cassandra/utils/UUIDGen.java   |  60 ++++++++-
 .../apache/cassandra/utils/concurrent/Threads.java | 135 +++++++++++++++++++
 .../apache/cassandra/utils/memory/HeapPool.java    |  76 ++++++++++-
 .../cassandra/utils/memory/MemtableAllocator.java  |   2 +-
 .../cassandra/utils/memory/MemtablePool.java       |   3 +-
 .../apache/cassandra/utils/memory/NativePool.java  |   4 +-
 .../apache/cassandra/utils/memory/SlabPool.java    |   4 +-
 .../config/DatabaseDescriptorRefTest.java          |   1 +
 .../unit/org/apache/cassandra/db/CellSpecTest.java |   2 +-
 .../cassandra/db/ClusteringHeapSizeTest.java       |   2 +-
 .../org/apache/cassandra/db/NativeCellTest.java    |   2 +-
 .../cassandra/db/filter/ColumnFilterTest.java      |   1 +
 .../apache/cassandra/gms/ArrivalWindowTest.java    |   8 ++
 46 files changed, 780 insertions(+), 99 deletions(-)

diff --git a/src/java/org/apache/cassandra/auth/AuthKeyspace.java b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
index a57257c..2271eff 100644
--- a/src/java/org/apache/cassandra/auth/AuthKeyspace.java
+++ b/src/java/org/apache/cassandra/auth/AuthKeyspace.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.auth;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -35,6 +36,8 @@ public final class AuthKeyspace
     {
     }
 
+    private static final int DEFAULT_RF = CassandraRelevantProperties.SYSTEM_AUTH_DEFAULT_RF.getInt();
+
     /**
      * Generation is used as a timestamp for automatic table creation on startup.
      * If you make any changes to the tables below, make sure to increment the
@@ -109,7 +112,7 @@ public final class AuthKeyspace
     public static KeyspaceMetadata metadata()
     {
         return KeyspaceMetadata.create(SchemaConstants.AUTH_KEYSPACE_NAME,
-                                       KeyspaceParams.simple(1),
+                                       KeyspaceParams.simple(DEFAULT_RF),
                                        Tables.of(Roles, RoleMembers, RolePermissions, ResourceRoleIndex, NetworkPermissions));
     }
 }
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 32df3f3..88e0d10 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -96,6 +96,7 @@ public class NamedThreadFactory implements ThreadFactory
     {
         this(id, priority, contextClassLoader, threadGroup, JVMStabilityInspector::uncaughtException);
     }
+
     public NamedThreadFactory(String id, int priority, ClassLoader contextClassLoader, ThreadGroup threadGroup, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
     {
         this.id = id;
diff --git a/src/java/org/apache/cassandra/concurrent/SingleThreadExecutorPlus.java b/src/java/org/apache/cassandra/concurrent/SingleThreadExecutorPlus.java
index e72a6a4..eb28277 100644
--- a/src/java/org/apache/cassandra/concurrent/SingleThreadExecutorPlus.java
+++ b/src/java/org/apache/cassandra/concurrent/SingleThreadExecutorPlus.java
@@ -59,6 +59,12 @@ public class SingleThreadExecutorPlus extends ThreadPoolExecutorPlus implements
             set(false);
             run.run();
         }
+
+        @Override
+        public String toString()
+        {
+            return run.toString();
+        }
     }
 
     SingleThreadExecutorPlus(ThreadPoolExecutorBuilder<? extends SingleThreadExecutorPlus> builder)
diff --git a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
index 19ca27f..4f4aa67 100644
--- a/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
+++ b/src/java/org/apache/cassandra/concurrent/SyncFutureTask.java
@@ -36,10 +36,21 @@ public class SyncFutureTask<T> extends SyncFuture<T> implements RunnableFuture<T
 
     public SyncFutureTask(WithResources withResources, Callable<T> call)
     {
-        this.call = () -> {
-            try (Closeable close = withResources.get())
+        this.call = new Callable<T>()
+        {
+            @Override
+            public T call() throws Exception
+            {
+                try (Closeable close = withResources.get())
+                {
+                    return call.call();
+                }
+            }
+
+            @Override
+            public String toString()
             {
-                return call.call();
+                return call.toString();
             }
         };
     }
diff --git a/src/java/org/apache/cassandra/concurrent/ThreadPoolExecutorBuilder.java b/src/java/org/apache/cassandra/concurrent/ThreadPoolExecutorBuilder.java
index 7c8dd93..5e578a5 100644
--- a/src/java/org/apache/cassandra/concurrent/ThreadPoolExecutorBuilder.java
+++ b/src/java/org/apache/cassandra/concurrent/ThreadPoolExecutorBuilder.java
@@ -40,7 +40,7 @@ import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQu
  * <li>The default {@link UncaughtExceptionHandler} is inherited from {@link MetaFactory}, which in turn receives it
  *     from the {@link ExecutorBuilderFactory}
  */
-class ThreadPoolExecutorBuilder<E extends ExecutorPlus> extends MetaFactory implements ExecutorBuilder<E>
+public class ThreadPoolExecutorBuilder<E extends ExecutorPlus> extends MetaFactory implements ExecutorBuilder<E>
 {
     static <E extends SequentialExecutorPlus> ExecutorBuilder<E> sequential(Function<ThreadPoolExecutorBuilder<E>, E> constructor, ClassLoader contextClassLoader, ThreadGroup threadGroup, UncaughtExceptionHandler uncaughtExceptionHandler, String name)
     {
diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 3eb5df1..b4a20da 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -136,6 +136,8 @@ public enum CassandraRelevantProperties
     /** mx4jport */
     MX4JPORT ("mx4jport"),
 
+    RING_DELAY("cassandra.ring_delay_ms"),
+
     /**
      * When bootstraping we wait for all schema versions found in gossip to be seen, and if not seen in time we fail
      * the bootstrap; this property will avoid failing and allow bootstrap to continue if set to true.
@@ -152,6 +154,10 @@ public enum CassandraRelevantProperties
      */
     GOSSIPER_QUARANTINE_DELAY("cassandra.gossip_quarantine_delay_ms"),
 
+    GOSSIPER_SKIP_WAITING_TO_SETTLE("cassandra.skip_wait_for_gossip_to_settle", "-1"),
+
+    SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", "2000"),
+
     /**
      * When doing a host replacement its possible that the gossip state is "empty" meaning that the endpoint is known
      * but the current state isn't known.  If the host replacement is needed to repair this state, this property must
@@ -171,6 +177,9 @@ public enum CassandraRelevantProperties
     LOG_DIR_AUDIT("cassandra.logdir.audit"),
 
     CONSISTENT_DIRECTORY_LISTINGS("cassandra.consistent_directory_listings", "false"),
+    CLOCK_GLOBAL("cassandra.clock", null),
+    CLOCK_MONOTONIC_APPROX("cassandra.monotonic_clock.approx", null),
+    CLOCK_MONOTONIC_PRECISE("cassandra.monotonic_clock.precise", null),
 
     //cassandra properties (without the "cassandra." prefix)
 
@@ -183,9 +192,6 @@ public enum CassandraRelevantProperties
 
     DEFAULT_PROVIDE_OVERLAPPING_TOMBSTONES ("default.provide.overlapping.tombstones"),
     ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION ("org.apache.cassandra.disable_mbean_registration"),
-    //only for testing
-    ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"),
-    ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
 
     /** This property indicates whether disable_mbean_registration is true */
     IS_DISABLED_MBEAN_REGISTRATION("org.apache.cassandra.disable_mbean_registration"),
@@ -200,7 +206,39 @@ public enum CassandraRelevantProperties
     SNAPSHOT_MIN_ALLOWED_TTL_SECONDS("cassandra.snapshot.min_allowed_ttl_seconds", "60"),
 
     /** what class to use for mbean registeration */
-    MBEAN_REGISTRATION_CLASS("org.apache.cassandra.mbean_registration_class");
+    MBEAN_REGISTRATION_CLASS("org.apache.cassandra.mbean_registration_class"),
+
+    BATCH_COMMIT_LOG_SYNC_INTERVAL("cassandra.batch_commitlog_sync_interval_millis", "1000"),
+
+    SYSTEM_AUTH_DEFAULT_RF("cassandra.system_auth.default_rf", "1"),
+
+    MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"),
+    MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step", "100000"),
+    MIGRATION_DELAY("cassandra.migration_delay_ms", "60000"),
+
+    PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS("cassandra.paxos_repair_retry_timeout_millis", "60000"),
+
+    // properties for debugging simulator ASM output
+    TEST_SIMULATOR_PRINT_ASM("cassandra.test.simulator.print_asm", "none"),
+    TEST_SIMULATOR_PRINT_ASM_TYPES("cassandra.test.simulator.print_asm_types", ""),
+
+    // determinism properties for testing
+    DETERMINISM_SSTABLE_COMPRESSION_DEFAULT("cassandra.sstable_compression_default", "true"),
+    DETERMINISM_CONSISTENT_DIRECTORY_LISTINGS("cassandra.consistent_directory_listings", "false"),
+    DETERMINISM_UNSAFE_UUID_NODE("cassandra.unsafe.deterministicuuidnode", "false"),
+
+    // properties to disable certain behaviours for testing
+    DISABLE_GOSSIP_ENDPOINT_REMOVAL("cassandra.gossip.disable_endpoint_removal"),
+    IGNORE_MISSING_NATIVE_FILE_HINTS("cassandra.require_native_file_hints", "false"),
+    DISABLE_SSTABLE_ACTIVITY_TRACKING("cassandra.sstable_activity_tracking", "true"),
+    TEST_IGNORE_SIGAR("cassandra.test.ignore_sigar", "false"),
+    PAXOS_EXECUTE_ON_SELF("cassandra.paxos.use_self_execution", "true"),
+
+    // for specific tests
+    ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"),
+    ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
+
+    ;
 
 
     CassandraRelevantProperties(String key, String defaultVal)
@@ -302,6 +340,17 @@ public enum CassandraRelevantProperties
 
     /**
      * Gets the value of a system property as a int.
+     * @return system property int value if it exists, defaultValue otherwise.
+     */
+    public long getLong()
+    {
+        String value = System.getProperty(key);
+
+        return LONG_CONVERTER.convert(value == null ? defaultVal : value);
+    }
+
+    /**
+     * Gets the value of a system property as a int.
      * @return system property int value if it exists, overrideDefaultValue otherwise.
      */
     public int getInt(int overrideDefaultValue)
@@ -322,6 +371,15 @@ public enum CassandraRelevantProperties
         System.setProperty(key, Integer.toString(value));
     }
 
+    /**
+     * Sets the value into system properties.
+     * @param value to set
+     */
+    public void setLong(long value)
+    {
+        System.setProperty(key, Long.toString(value));
+    }
+
     private interface PropertyConverter<T>
     {
         T convert(String value);
@@ -344,6 +402,19 @@ public enum CassandraRelevantProperties
         }
     };
 
+    private static final PropertyConverter<Long> LONG_CONVERTER = value ->
+    {
+        try
+        {
+            return Long.decode(value);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("Invalid value for system property: " +
+                                                           "expected integer value but got '%s'", value));
+        }
+    };
+
     /**
      * @return whether a system property is present or not.
      */
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index caa4965..8c498e4 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -81,6 +81,8 @@ public class Config
     public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
     public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
 
+    public volatile boolean use_deterministic_table_id = false;
+
     /* initial token in the ring */
     public String initial_token;
     public Integer num_tokens;
@@ -118,6 +120,7 @@ public class Config
     public int concurrent_writes = 32;
     public int concurrent_counter_writes = 32;
     public int concurrent_materialized_view_writes = 32;
+    public int available_processors = -1;
 
     @Deprecated
     public Integer concurrent_replicates = null;
@@ -280,6 +283,8 @@ public class Config
     public int dynamic_snitch_reset_interval_in_ms = 600000;
     public double dynamic_snitch_badness_threshold = 1.0;
 
+    public String failure_detector = "FailureDetector";
+
     public EncryptionOptions.ServerEncryptionOptions server_encryption_options = new EncryptionOptions.ServerEncryptionOptions();
     public EncryptionOptions client_encryption_options = new EncryptionOptions();
 
@@ -313,6 +318,8 @@ public class Config
     public volatile int counter_cache_save_period = 7200;
     public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;
 
+    public Long paxos_cache_size_in_mb = null;
+
     private static boolean isClientMode = false;
     private static Supplier<Config> overrideLoadConfig = null;
 
@@ -620,6 +627,7 @@ public class Config
     public enum MemtableAllocationType
     {
         unslabbed_heap_buffers,
+        unslabbed_heap_buffers_logged,
         heap_buffers,
         offheap_buffers,
         offheap_objects
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 878c9ec..9d08a62 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -32,6 +32,7 @@ import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.io.util.File;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +104,7 @@ public class DatabaseDescriptor
      */
     static final long LOWEST_ACCEPTED_TIMEOUT = 10L;
 
+    private static Supplier<IFailureDetector> newFailureDetector;
     private static IEndpointSnitch snitch;
     private static InetAddress listenAddress; // leave null so we can fall through to getLocalHost
     private static InetAddress broadcastAddress;
@@ -1131,6 +1133,7 @@ public class DatabaseDescriptor
                 return 1;
             return 0;
         };
+        newFailureDetector = () -> createFailureDetector(conf.failure_detector);
     }
 
     // definitely not safe for tools + clients - implicitly instantiates schema
@@ -1187,6 +1190,14 @@ public class DatabaseDescriptor
         return dynamic ? new DynamicEndpointSnitch(snitch) : snitch;
     }
 
+    private static IFailureDetector createFailureDetector(String detectorClassName) throws ConfigurationException
+    {
+        if (!detectorClassName.contains("."))
+            detectorClassName = "org.apache.cassandra.gms." + detectorClassName;
+        IFailureDetector detector = FBUtilities.construct(detectorClassName, "failure detector");
+        return detector;
+    }
+
     public static IAuthenticator getAuthenticator()
     {
         return authenticator;
@@ -1405,6 +1416,16 @@ public class DatabaseDescriptor
         snitch = eps;
     }
 
+    public static IFailureDetector newFailureDetector()
+    {
+        return newFailureDetector.get();
+    }
+
+    public static void setDefaultFailureDetector()
+    {
+        newFailureDetector = () -> createFailureDetector("FailureDetector");
+    }
+
     public static int getColumnIndexSize()
     {
         return (int) ByteUnit.KIBI_BYTES.toBytes(conf.column_index_size_in_kb);
@@ -1734,6 +1755,11 @@ public class DatabaseDescriptor
         return conf.memtable_flush_writers;
     }
 
+    public static int getAvailableProcessors()
+    {
+        return conf == null ? -1 : conf.available_processors;
+    }
+
     public static int getConcurrentCompactors()
     {
         return conf.concurrent_compactors;
@@ -2437,6 +2463,16 @@ public class DatabaseDescriptor
         return conf.hinted_handoff_disabled_datacenters;
     }
 
+    public static boolean useDeterministicTableID()
+    {
+        return conf != null && conf.use_deterministic_table_id;
+    }
+
+    public static void useDeterministicTableID(boolean value)
+    {
+        conf.use_deterministic_table_id = value;
+    }
+
     public static void enableHintsForDC(String dc)
     {
         conf.hinted_handoff_disabled_datacenters.remove(dc);
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8a60bff..5eabda7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1228,6 +1228,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 }
             }, reclaimExecutor);
         }
+
+        @Override
+        public String toString()
+        {
+            return "Flush " + keyspace + '.' + name;
+        }
     }
 
     // atomically set the upper bound for the commit log
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6e23cec..c755591 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -76,6 +76,8 @@ import org.apache.cassandra.utils.memory.NativePool;
 import org.apache.cassandra.utils.memory.SlabPool;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_OVERHEAD_COMPUTE_STEPS;
+import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_OVERHEAD_SIZE;
 
 public class Memtable implements Comparable<Memtable>
 {
@@ -91,6 +93,8 @@ public class Memtable implements Comparable<Memtable>
         final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable;
         switch (DatabaseDescriptor.getMemtableAllocationType())
         {
+            case unslabbed_heap_buffers_logged:
+                return new HeapPool.Logged(heapLimit, cleaningThreshold, cleaner);
             case unslabbed_heap_buffers:
                 return new HeapPool(heapLimit, cleaningThreshold, cleaner);
             case heap_buffers:
@@ -104,7 +108,13 @@ public class Memtable implements Comparable<Memtable>
         }
     }
 
-    private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000")));
+    private static final int ROW_OVERHEAD_HEAP_SIZE;
+    static
+    {
+        int userDefinedOverhead = MEMTABLE_OVERHEAD_SIZE.getInt(-1);
+        if (userDefinedOverhead > 0) ROW_OVERHEAD_HEAP_SIZE = userDefinedOverhead;
+        else ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(MEMTABLE_OVERHEAD_COMPUTE_STEPS.getInt());
+    }
 
     private final MemtableAllocator allocator;
     private final AtomicLong liveDataSize = new AtomicLong(0);
@@ -157,7 +167,7 @@ public class Memtable implements Comparable<Memtable>
     {
         this.cfs = cfs;
         this.commitLogLowerBound = commitLogLowerBound;
-        this.allocator = MEMORY_POOL.newAllocator();
+        this.allocator = MEMORY_POOL.newAllocator(cfs);
         this.initialComparator = cfs.metadata().comparator;
         this.cfs.scheduleFlush();
         this.columnsCollector = new ColumnsCollector(cfs.metadata().regularAndStaticColumns());
@@ -531,6 +541,12 @@ public class Memtable implements Comparable<Memtable>
             writeSortedContents();
             return writer;
         }
+
+        @Override
+        public String toString()
+        {
+            return "Flush " + cfs.keyspace + '.' + cfs.name;
+        }
     }
 
     private static int estimateRowOverhead(final int count)
@@ -539,7 +555,7 @@ public class Memtable implements Comparable<Memtable>
         try (final OpOrder.Group group = new OpOrder().start())
         {
             int rowOverhead;
-            MemtableAllocator allocator = MEMORY_POOL.newAllocator();
+            MemtableAllocator allocator = MEMORY_POOL.newAllocator(null);
             ConcurrentNavigableMap<PartitionPosition, Object> partitions = new ConcurrentSkipListMap<>();
             final Object val = new Object();
             for (int i = 0 ; i < count ; i++)
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
index 78bf30c..e913e67 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db.commitlog;
 
+import static org.apache.cassandra.config.CassandraRelevantProperties.BATCH_COMMIT_LOG_SYNC_INTERVAL;
+
 class BatchCommitLogService extends AbstractCommitLogService
 {
     /**
@@ -24,7 +26,7 @@ class BatchCommitLogService extends AbstractCommitLogService
      * the disk sync. Instead we trigger it explicitly in {@link #maybeWaitForSync(CommitLogSegment.Allocation)}.
      * This value here is largely irrelevant, but should high enough so the sync thread is not continually waking up.
      */
-    private static final int POLL_TIME_MILLIS = 1000;
+    private static final int POLL_TIME_MILLIS = BATCH_COMMIT_LOG_SYNC_INTERVAL.getInt();
 
     public BatchCommitLogService(CommitLog commitLog)
     {
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
index efd56d8..8be5031 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplica.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NativeLibrary;
 
+import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORE_MISSING_NATIVE_FILE_HINTS;
+
 /**
  * Because a column family may have sstables on different disks and disks can
  * be removed, we duplicate log files into many replicas so as to have a file
@@ -47,6 +49,7 @@ import org.apache.cassandra.utils.NativeLibrary;
 final class LogReplica implements AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(LogReplica.class);
+    private static final boolean REQUIRE_FD = !FBUtilities.isWindows && !IGNORE_MISSING_NATIVE_FILE_HINTS.getBoolean();
 
     private final File file;
     private int directoryDescriptor;
@@ -55,7 +58,7 @@ final class LogReplica implements AutoCloseable
     static LogReplica create(File directory, String fileName)
     {
         int folderFD = NativeLibrary.tryOpenDirectory(directory.path());
-        if (folderFD == -1 && !FBUtilities.isWindows)
+        if (folderFD == -1  && REQUIRE_FD)
             throw new FSReadError(new IOException(String.format("Invalid folder descriptor trying to create log replica %s", directory.path())), directory.path());
 
         return new LogReplica(new File(fileName), folderFD);
@@ -181,4 +184,9 @@ final class LogReplica implements AutoCloseable
             str.append(System.lineSeparator());
         }
     }
+
+    public int hashCode()
+    {
+        return file.hashCode();
+    }
 }
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 522e082..40a2de5 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
 import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
+import static org.apache.cassandra.config.DatabaseDescriptor.newFailureDetector;
 import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
 
 /**
@@ -71,7 +72,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
             return DEFAULT_MAX_PAUSE;
     }
 
-    public static final IFailureDetector instance = new FailureDetector();
+    public static final IFailureDetector instance = newFailureDetector();
     public static final Predicate<InetAddressAndPort> isEndpointAlive = instance::isAlive;
     public static final Predicate<Replica> isReplicaAlive = r -> isEndpointAlive.test(r.endpoint());
 
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index e2ebedc..5630b5b 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -59,7 +59,6 @@ import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
@@ -68,21 +67,23 @@ import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.MBeanWrapper;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.RecomputingSupplier;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+import org.apache.cassandra.utils.concurrent.NotScheduledFuture;
 
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.config.CassandraRelevantProperties.DISABLE_GOSSIP_ENDPOINT_REMOVAL;
 import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
+import static org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_SKIP_WAITING_TO_SETTLE;
+import static org.apache.cassandra.config.CassandraRelevantProperties.SHUTDOWN_ANNOUNCE_DELAY_IN_MS;
+import static org.apache.cassandra.config.DatabaseDescriptor.getClusterName;
+import static org.apache.cassandra.config.DatabaseDescriptor.getPartitionerName;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.ECHO_REQ;
 import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
+import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
@@ -253,6 +254,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     private static final boolean disableThreadValidation = Boolean.getBoolean(Props.DISABLE_THREAD_VALIDATION);
+    private static volatile boolean disableEndpointRemoval = DISABLE_GOSSIP_ENDPOINT_REMOVAL.getBoolean();
 
     private static long getVeryLongTime()
     {
@@ -298,16 +300,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 taskLock.lock();
 
                 /* Update the local heartbeat counter. */
-                endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
+                endpointStateMap.get(getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
                 if (logger.isTraceEnabled())
                     logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
                 final List<GossipDigest> gDigests = new ArrayList<>();
+
                 Gossiper.instance.makeRandomGossipDigest(gDigests);
 
                 if (gDigests.size() > 0)
                 {
-                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
-                                                                           DatabaseDescriptor.getPartitionerName(),
+                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(),
+                                                                           getPartitionerName(),
                                                                            gDigests);
                     Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);
                     /* Gossip to some random live member */
@@ -453,8 +456,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     public Set<InetAddressAndPort> getLiveMembers()
     {
         Set<InetAddressAndPort> liveMembers = new HashSet<>(liveEndpoints);
-        if (!liveMembers.contains(FBUtilities.getBroadcastAddressAndPort()))
-            liveMembers.add(FBUtilities.getBroadcastAddressAndPort());
+        if (!liveMembers.contains(getBroadcastAddressAndPort()))
+            liveMembers.add(getBroadcastAddressAndPort());
         return liveMembers;
     }
 
@@ -532,7 +535,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             return;
         }
 
-        FutureTask task = new FutureTask<>(runnable);
+        FutureTask<?> task = new FutureTask<>(runnable);
         Stage.GOSSIP.execute(task);
         try
         {
@@ -652,6 +655,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 logger.warn("Seeds list is now empty!");
         }
 
+        if (disableEndpointRemoval)
+            return;
+
         liveEndpoints.remove(endpoint);
         unreachableEndpoints.remove(endpoint);
         MessagingService.instance().versions.reset(endpoint);
@@ -690,6 +696,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
      */
     private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpiration)
     {
+        if (disableEndpointRemoval)
+            return;
         justRemovedEndpoints.put(endpoint, quarantineExpiration);
         GossiperDiagnostics.quarantinedEndpoint(this, endpoint, quarantineExpiration);
     }
@@ -701,7 +709,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     public void replacementQuarantine(InetAddressAndPort endpoint)
     {
         // remember, quarantineEndpoint will effectively already add QUARANTINE_DELAY, so this is 2x
-        logger.debug("");
         quarantineEndpoint(endpoint, currentTimeMillis() + QUARANTINE_DELAY);
         GossiperDiagnostics.replacementQuarantine(this, endpoint);
     }
@@ -951,7 +958,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         int size = seeds.size();
         if (size > 0)
         {
-            if (size == 1 && seeds.contains(FBUtilities.getBroadcastAddressAndPort()))
+            if (size == 1 && seeds.contains(getBroadcastAddressAndPort()))
             {
                 return;
             }
@@ -1064,7 +1071,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         Set<InetAddressAndPort> eps = endpointStateMap.keySet();
         for (InetAddressAndPort endpoint : eps)
         {
-            if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
+            if (endpoint.equals(getBroadcastAddressAndPort()))
                 continue;
 
             FailureDetector.instance.interpret(endpoint);
@@ -1264,6 +1271,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     void notifyFailureDetector(InetAddressAndPort endpoint, EndpointState remoteEndpointState)
     {
+        if (remoteEndpointState == null)
+            return;
+
         EndpointState localEndpointState = endpointStateMap.get(endpoint);
         /*
          * If the local endpoint state exists then report to the FD only
@@ -1365,8 +1375,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private void silentlyMarkDead(InetAddressAndPort addr, EndpointState localState)
     {
         localState.markDead();
-        liveEndpoints.remove(addr);
-        unreachableEndpoints.put(addr, nanoTime());
+        if (!disableEndpointRemoval)
+        {
+            liveEndpoints.remove(addr);
+            unreachableEndpoints.put(addr, nanoTime());
+        }
     }
 
     /**
@@ -1485,7 +1498,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         for (Entry<InetAddressAndPort, EndpointState> entry : epStateMap.entrySet())
         {
             InetAddressAndPort ep = entry.getKey();
-            if ( ep.equals(FBUtilities.getBroadcastAddressAndPort()) && !isInShadowRound())
+            if (ep.equals(getBroadcastAddressAndPort()) && !isInShadowRound())
                 continue;
             if (justRemovedEndpoints.containsKey(ep))
             {
@@ -1792,7 +1805,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         buildSeedsList();
         /* initialize the heartbeat state for this localEndpoint */
         maybeInitializeLocalState(generationNbr);
-        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
+        EndpointState localState = endpointStateMap.get(getBroadcastAddressAndPort());
         localState.addApplicationStates(preloadLocalStates);
         minVersionSupplier.recompute();
 
@@ -1839,7 +1852,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         if (seeds.isEmpty() && peers.isEmpty())
             return endpointShadowStateMap;
 
-        boolean isSeed = DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort());
+        boolean isSeed = DatabaseDescriptor.getSeeds().contains(getBroadcastAddressAndPort());
         // We double RING_DELAY if we're not a seed to increase chance of successful startup during a full cluster bounce,
         // giving the seeds a chance to startup before we fail the shadow round
         int shadowRoundDelay =  isSeed ? StorageService.RING_DELAY : StorageService.RING_DELAY * 2;
@@ -1847,9 +1860,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         endpointShadowStateMap.clear();
         // send a completely empty syn
         List<GossipDigest> gDigests = new ArrayList<>();
-        GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
-                DatabaseDescriptor.getPartitionerName(),
-                gDigests);
+        GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), getPartitionerName(), gDigests);
         Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);
 
         inShadowRound = true;
@@ -1905,7 +1916,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     {
         for (InetAddressAndPort seed : DatabaseDescriptor.getSeeds())
         {
-            if (seed.equals(FBUtilities.getBroadcastAddressAndPort()))
+            if (seed.equals(getBroadcastAddressAndPort()))
                 continue;
             seeds.add(seed);
         }
@@ -1924,7 +1935,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         {
             for (InetAddressAndPort seed : DatabaseDescriptor.getSeeds())
             {
-                if (seed.equals(FBUtilities.getBroadcastAddressAndPort()))
+                if (seed.equals(getBroadcastAddressAndPort()))
                     continue;
                 tmp.add(seed);
             }
@@ -1978,12 +1989,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         HeartBeatState hbState = new HeartBeatState(generationNbr);
         EndpointState localState = new EndpointState(hbState);
         localState.markAlive();
-        endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddressAndPort(), localState);
+        endpointStateMap.putIfAbsent(getBroadcastAddressAndPort(), localState);
     }
 
     public void forceNewerGeneration()
     {
-        EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
+        EndpointState epstate = endpointStateMap.get(getBroadcastAddressAndPort());
         epstate.getHeartBeatState().forceNewerGenerationUnsafe();
     }
 
@@ -1994,7 +2005,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     public void addSavedEndpoint(InetAddressAndPort ep)
     {
         checkProperThreadForStateMutation();
-        if (ep.equals(FBUtilities.getBroadcastAddressAndPort()))
+        if (ep.equals(getBroadcastAddressAndPort()))
         {
             logger.debug("Attempt to add self as saved endpoint");
             return;
@@ -2023,7 +2034,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private void addLocalApplicationStateInternal(ApplicationState state, VersionedValue value)
     {
         assert taskLock.isHeldByCurrentThread();
-        InetAddressAndPort epAddr = FBUtilities.getBroadcastAddressAndPort();
+        InetAddressAndPort epAddr = getBroadcastAddressAndPort();
         EndpointState epState = endpointStateMap.get(epAddr);
         assert epState != null : "Can't find endpoint state for " + epAddr;
         // Fire "before change" notifications:
@@ -2061,7 +2072,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public void stop()
     {
-        EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
+        EndpointState mystate = endpointStateMap.get(getBroadcastAddressAndPort());
         if (mystate != null && !isSilentShutdownState(mystate) && StorageService.instance.isJoined())
         {
             logger.info("Announcing shutdown");
@@ -2070,7 +2081,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             Message message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload);
             for (InetAddressAndPort ep : liveEndpoints)
                 MessagingService.instance().send(message, ep);
-            Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS);
+            Uninterruptibles.sleepUninterruptibly(SHUTDOWN_ANNOUNCE_DELAY_IN_MS.getInt(), TimeUnit.MILLISECONDS);
         }
         else
             logger.warn("No local state, state is in silent shutdown, or node hasn't joined, not announcing shutdown");
@@ -2175,6 +2186,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         Map<ApplicationState, VersionedValue> states = new EnumMap<>(ApplicationState.class);
         states.put(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion(netVersion));
         states.put(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(uuid));
+        states.put(ApplicationState.RPC_ADDRESS, StorageService.instance.valueFactory.rpcaddress(addr.getAddress()));
+        states.put(ApplicationState.INTERNAL_ADDRESS_AND_PORT, StorageService.instance.valueFactory.internalAddressAndPort(addr));
+        states.put(ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
         localState.addApplicationStates(states);
     }
 
@@ -2246,7 +2260,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public static void waitToSettle()
     {
-        int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
+        int forceAfter = GOSSIPER_SKIP_WAITING_TO_SETTLE.getInt();
         if (forceAfter == 0)
         {
             return;
@@ -2403,7 +2417,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         CassandraVersion minVersion = null;
 
         for (InetAddressAndPort addr : Iterables.concat(Gossiper.instance.getLiveMembers(),
-                                                 Gossiper.instance.getUnreachableMembers()))
+                                                        Gossiper.instance.getUnreachableMembers()))
         {
             String versionString = getReleaseVersionString(addr);
             // Raced with changes to gossip state, wait until next iteration
@@ -2432,4 +2446,71 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
         return minVersion;
     }
+
+    public void unsafeSetEnabled()
+    {
+        scheduledGossipTask = new NotScheduledFuture<>();
+        firstSynSendAt = 1;
+    }
+
+    public Collection<InetAddressAndPort> unsafeClearRemoteState()
+    {
+        List<InetAddressAndPort> removed = new ArrayList<>();
+        for (InetAddressAndPort ep : endpointStateMap.keySet())
+        {
+            if (ep.equals(getBroadcastAddressAndPort()))
+                continue;
+
+            for (IEndpointStateChangeSubscriber subscriber : subscribers)
+                subscriber.onRemove(ep);
+
+            removed.add(ep);
+        }
+        this.endpointStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
+        this.endpointShadowStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
+        this.expireTimeEndpointMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
+        this.justRemovedEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
+        this.unreachableEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
+        return removed;
+    }
+
+    public void unsafeGossipWith(InetAddressAndPort ep)
+    {
+        /* Update the local heartbeat counter. */
+        EndpointState epState = endpointStateMap.get(getBroadcastAddressAndPort());
+        if (epState != null)
+        {
+            epState.getHeartBeatState().updateHeartBeat();
+            if (logger.isTraceEnabled())
+                logger.trace("My heartbeat is now {}", epState.getHeartBeatState().getHeartBeatVersion());
+        }
+
+        final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+        Gossiper.instance.makeRandomGossipDigest(gDigests);
+
+        GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(),
+                getPartitionerName(),
+                gDigests);
+        Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);
+
+        MessagingService.instance().send(message, ep);
+    }
+
+    public void unsafeSendShutdown(InetAddressAndPort to)
+    {
+        Message<?> message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload);
+        MessagingService.instance().send(message, to);
+    }
+
+    public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep)
+    {
+        /* Update the local heartbeat counter. */
+        EndpointState epState = endpointStateMap.get(getBroadcastAddressAndPort());
+        if (epState == null)
+            throw new IllegalStateException();
+
+        GossipDigestAck2 digestAck2Message = new GossipDigestAck2(Collections.singletonMap(getBroadcastAddressAndPort(), epState));
+        Message<GossipDigestAck2> message = Message.out(Verb.GOSSIP_DIGEST_ACK2, digestAck2Message);
+        MessagingService.instance().send(message, ep);
+    }
 }
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index faca487..f4e7f00 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -96,7 +96,8 @@ public class Descriptor
         this.generation = generation;
         this.formatType = formatType;
 
-        hashCode = Objects.hashCode(version, this.directory, generation, ksname, cfname, formatType);
+        // directory is unnecessary for hashCode, and for simulator consistency we do not include it
+        hashCode = Objects.hashCode(version, generation, ksname, cfname, formatType);
     }
 
     public Descriptor withGeneration(int newGeneration)
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ebdb2ef..4af5743 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -144,6 +144,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 
+    private static final boolean TRACK_ACTIVITY = CassandraRelevantProperties.DISABLE_SSTABLE_ACTIVITY_TRACKING.getBoolean();
+
     private static final ScheduledExecutorPlus syncExecutor = initSyncExecutor();
     private static ScheduledExecutorPlus initSyncExecutor()
     {
@@ -1981,7 +1983,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
 
     void setup(boolean trackHotness)
     {
-        tidy.setup(this, trackHotness);
+        tidy.setup(this, TRACK_ACTIVITY && trackHotness);
         this.readMeter = tidy.global.readMeter;
     }
 
@@ -2097,6 +2099,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     if (logger.isTraceEnabled())
                         logger.trace("Async instance tidier for {}, completed", descriptor);
                 }
+
+                @Override
+                public String toString()
+                {
+                    return "Tidy " + descriptor.ksname + '.' + descriptor.cfname + '-' + descriptor.generation;
+                }
             });
         }
 
@@ -2150,7 +2158,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
             // the read meter when in client mode.
             // Also, do not track read rates when running in client or tools mode (syncExecuter isn't available in these modes)
-            if (SchemaConstants.isLocalSystemKeyspace(desc.ksname) || DatabaseDescriptor.isClientOrToolInitialized())
+            if (!TRACK_ACTIVITY || SchemaConstants.isLocalSystemKeyspace(desc.ksname) || DatabaseDescriptor.isClientOrToolInitialized())
             {
                 readMeter = null;
                 readMeterSyncFuture = NULL;
diff --git a/src/java/org/apache/cassandra/io/util/PathUtils.java b/src/java/org/apache/cassandra/io/util/PathUtils.java
index 6df5be1..28ce020 100644
--- a/src/java/org/apache/cassandra/io/util/PathUtils.java
+++ b/src/java/org/apache/cassandra/io/util/PathUtils.java
@@ -56,7 +56,7 @@ import static org.apache.cassandra.utils.Throwables.merge;
  */
 public final class PathUtils
 {
-    private static final boolean consistentDirectoryListings = CassandraRelevantProperties.CONSISTENT_DIRECTORY_LISTINGS.getBoolean();
+    private static final boolean consistentDirectoryListings = CassandraRelevantProperties.DETERMINISM_CONSISTENT_DIRECTORY_LISTINGS.getBoolean();
 
     private static final Set<StandardOpenOption> READ_OPTIONS = unmodifiableSet(EnumSet.of(READ));
     private static final Set<StandardOpenOption> WRITE_OPTIONS = unmodifiableSet(EnumSet.of(WRITE, CREATE));
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
index 2ec555c..16db492 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -23,12 +23,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 
+import java.util.AbstractList;
 import java.util.AbstractMap;
 import java.util.AbstractSet;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
@@ -396,6 +398,22 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
         return snapshot(subList);
     }
 
+    public final <T> List<T> asList(Function<Replica, T> viewTransform)
+    {
+        return new AbstractList<T>()
+        {
+            public T get(int index)
+            {
+                return viewTransform.apply(list.get(index));
+            }
+
+            public int size()
+            {
+                return list.size;
+            }
+        };
+    }
+
     /** see {@link ReplicaCollection#count(Predicate)}*/
     public int count(Predicate<Replica> predicate)
     {
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index 8cec096..6275c15 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -89,7 +89,8 @@ public class RequestCallbacks implements OutboundMessageCallbacks
      * Remove and return the {@link CallbackInfo} associated with given id and peer, if known.
      */
     @Nullable
-    CallbackInfo remove(long id, InetAddressAndPort peer)
+    @VisibleForTesting
+    public CallbackInfo remove(long id, InetAddressAndPort peer)
     {
         return callbacks.remove(key(id, peer));
     }
@@ -250,13 +251,14 @@ public class RequestCallbacks implements OutboundMessageCallbacks
         }
     }
 
-    static class CallbackInfo
+    @VisibleForTesting
+    public static class CallbackInfo
     {
         final long createdAtNanos;
         final long expiresAtNanos;
 
         final InetAddressAndPort peer;
-        final RequestCallback callback;
+        public final RequestCallback callback;
 
         @Deprecated // for 3.0 compatibility purposes only
         public final Verb responseVerb;
diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java
index c68bdf1d..fca7e4c 100644
--- a/src/java/org/apache/cassandra/schema/CompressionParams.java
+++ b/src/java/org/apache/cassandra/schema/CompressionParams.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -65,11 +66,13 @@ public final class CompressionParams
     public static final String ENABLED = "enabled";
     public static final String MIN_COMPRESS_RATIO = "min_compress_ratio";
 
-    public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.<String, String>emptyMap()),
-                                                                          DEFAULT_CHUNK_LENGTH,
-                                                                          calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
-                                                                          DEFAULT_MIN_COMPRESS_RATIO,
-                                                                          Collections.emptyMap());
+    public static final CompressionParams DEFAULT = !CassandraRelevantProperties.DETERMINISM_SSTABLE_COMPRESSION_DEFAULT.getBoolean()
+                                                    ? noCompression()
+                                                    : new CompressionParams(LZ4Compressor.create(Collections.emptyMap()),
+                                                                            DEFAULT_CHUNK_LENGTH,
+                                                                            calcMaxCompressedLength(DEFAULT_CHUNK_LENGTH, DEFAULT_MIN_COMPRESS_RATIO),
+                                                                            DEFAULT_MIN_COMPRESS_RATIO,
+                                                                            Collections.emptyMap());
 
     public static final CompressionParams NOOP = new CompressionParams(NoopCompressor.create(Collections.emptyMap()),
                                                                        // 4 KiB is often the underlying disk block size
diff --git a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
index f0f700c..3050886 100644
--- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
+++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java
@@ -39,6 +39,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.cassandra.concurrent.FutureTask;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.slf4j.Logger;
@@ -78,7 +79,7 @@ public class MigrationCoordinator
     }
 
 
-    private static final int MIGRATION_DELAY_IN_MS = 60000;
+    private static final int MIGRATION_DELAY_IN_MS = CassandraRelevantProperties.MIGRATION_DELAY.getInt();
     private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
 
     public static final MigrationCoordinator instance = new MigrationCoordinator();
diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java
index 695147f..0633105 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -68,6 +68,11 @@ public class TableId
     public static TableId forSystemTable(String keyspace, String table)
     {
         assert SchemaConstants.isLocalSystemKeyspace(keyspace) || SchemaConstants.isReplicatedSystemKeyspace(keyspace);
+        return unsafeDeterministic(keyspace, table);
+    }
+
+    public static TableId unsafeDeterministic(String keyspace, String table)
+    {
         return new TableId(UUID.nameUUIDFromBytes(ArrayUtils.addAll(keyspace.getBytes(), table.getBytes())));
     }
 
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java
index a5530ed..1e85d88 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -739,7 +739,10 @@ public class TableMetadata implements SchemaElement
                 partitioner = DatabaseDescriptor.getPartitioner();
 
             if (id == null)
-                id = TableId.generate();
+            {
+                if (DatabaseDescriptor.useDeterministicTableID()) id = TableId.unsafeDeterministic(keyspace, name);
+                else id = TableId.generate();
+            }
 
             if (Flag.isCQLTable(flags))
                 return new TableMetadata(this);
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 24099c3..ed53a20 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -250,7 +250,7 @@ public class ClientState
      * it may be returned multiple times). Note that we still ensure Paxos "ballot" are unique (for different
      * proposal) by (securely) randomizing the non-timestamp part of the UUID.
      */
-    public long getTimestampForPaxos(long minTimestampToUse)
+    public static long getTimestampForPaxos(long minTimestampToUse)
     {
         while (true)
         {
@@ -266,6 +266,11 @@ public class ClientState
         }
     }
 
+    public static long getLastTimestampMicros()
+    {
+        return lastTimestampMicros.get();
+    }
+
     public Optional<String> getDriverName()
     {
         return Optional.ofNullable(driverName);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5c0f4bb..93bf478 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import java.util.regex.MatchResult;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -61,6 +62,7 @@ import com.google.common.util.concurrent.*;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
 import org.apache.cassandra.fql.FullQueryLogger;
@@ -139,6 +141,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.cassandra.concurrent.FutureTask.callable;
 import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
 import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
 import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY;
@@ -171,7 +174,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private static int getRingDelay()
     {
-        String newdelay = System.getProperty("cassandra.ring_delay_ms");
+        String newdelay = CassandraRelevantProperties.RING_DELAY.getString();
         if (newdelay != null)
         {
             logger.info("Overriding RING_DELAY to {}ms", newdelay);
@@ -1818,6 +1821,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public Future<StreamState> startBootstrap(Collection<Token> tokens)
     {
+        return startBootstrap(tokens, replacing);
+    }
+
+    public Future<StreamState> startBootstrap(Collection<Token> tokens, boolean replacing)
+    {
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
         BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
         bootstrapper.addProgressListener(progressSupport);
@@ -4118,6 +4126,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     public Pair<Integer, Future<?>> repair(String keyspace, Map<String, String> repairSpec, List<ProgressListener> listeners)
     {
         RepairOption option = RepairOption.parse(repairSpec, tokenMetadata.partitioner);
+        return repair(keyspace, option, listeners);
+    }
+
+    public Pair<Integer, Future<?>> repair(String keyspace, RepairOption option, List<ProgressListener> listeners)
+    {
         // if ranges are not specified
         if (option.getRanges().isEmpty())
         {
@@ -4209,6 +4222,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         ActiveRepairService.instance.terminateSessions();
     }
 
+    // TODO: remove this after forward-porting Paxos, before final commit
+    public Future<?> startRepairPaxosForTopologyChange(String reason)
+    {
+        // PLACEHOLDER METHOD for simple porting of Simulator without breaking correctness
+        return ImmediateFuture.success(null);
+    }
+
     @Nullable
     public List<String> getParentRepairStatus(int cmd)
     {
@@ -4593,7 +4613,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         Uninterruptibles.sleepUninterruptibly(delay, MILLISECONDS);
     }
 
-    private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException
+    public Supplier<Future<StreamState>> prepareUnbootstrapStreaming()
     {
         Map<String, EndpointsByReplica> rangesToStream = new HashMap<>();
 
@@ -4607,11 +4627,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             rangesToStream.put(keyspaceName, rangesMM);
         }
 
+        return () -> streamRanges(rangesToStream);
+    }
+
+    private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException
+    {
+        Supplier<Future<StreamState>> startStreaming = prepareUnbootstrapStreaming();
+
         setMode(Mode.LEAVING, "replaying batch log and streaming data to other nodes", true);
 
         // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint.
         Future<?> batchlogReplay = BatchlogManager.instance.startBatchlogReplay();
-        Future<StreamState> streamSuccess = streamRanges(rangesToStream);
+        Future<StreamState> streamSuccess = startStreaming.get();
 
         // Wait for batch log to complete before streaming hints.
         logger.debug("waiting for batch log processing.");
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index d0ab6b2..9319fba 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -522,6 +522,8 @@ public class ByteBufferUtil
             return ByteBufferUtil.bytes((InetAddress) obj);
         else if (obj instanceof String)
             return ByteBufferUtil.bytes((String) obj);
+        else if (obj instanceof byte[])
+            return ByteBuffer.wrap((byte[]) obj);
         else if (obj instanceof ByteBuffer)
             return (ByteBuffer) obj;
         else
diff --git a/src/java/org/apache/cassandra/utils/Clock.java b/src/java/org/apache/cassandra/utils/Clock.java
index 629a585..6fd0efd 100644
--- a/src/java/org/apache/cassandra/utils/Clock.java
+++ b/src/java/org/apache/cassandra/utils/Clock.java
@@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_GLOBAL;
+
 /**
  * Wrapper around time related functions that are either implemented by using the default JVM calls
  * or by using a custom implementation for testing purposes.
@@ -46,7 +48,7 @@ public interface Clock
 
         static
         {
-            String classname = System.getProperty("cassandra.clock");
+            String classname = CLOCK_GLOBAL.getString();
             Clock clock = new Default();
             if (classname != null)
             {
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 3905a77..912d907 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -115,11 +115,17 @@ public class FBUtilities
 
     private static volatile String previousReleaseVersionString;
 
+    private static int availableProcessors = Integer.getInteger("cassandra.available_processors", DatabaseDescriptor.getAvailableProcessors());
+
+    public static void setAvailableProcessors(int value)
+    {
+        availableProcessors = value;
+    }
+
     public static int getAvailableProcessors()
     {
-        String availableProcessors = System.getProperty("cassandra.available_processors");
-        if (!Strings.isNullOrEmpty(availableProcessors))
-            return Integer.parseInt(availableProcessors);
+        if (availableProcessors > 0)
+            return availableProcessors;
         else
             return Runtime.getRuntime().availableProcessors();
     }
diff --git a/src/java/org/apache/cassandra/utils/Hex.java b/src/java/org/apache/cassandra/utils/Hex.java
index 9163067..b8044b8 100644
--- a/src/java/org/apache/cassandra/utils/Hex.java
+++ b/src/java/org/apache/cassandra/utils/Hex.java
@@ -86,6 +86,23 @@ public class Hex
         return wrapCharArray(c);
     }
 
+    public static long parseLong(String hex, int start, int end)
+    {
+        int len = end - start;
+        if (len > 16)
+            throw new IllegalArgumentException();
+
+        long result = 0;
+        int shift = 4 * (len - 1);
+        for (int i = start ; i < end ; ++i)
+        {
+            char c = hex.charAt(i);
+            result |= (long)(c - (c >= 'a' ? 'a' - 10 : '0')) << shift;
+            shift -= 4;
+        }
+        return result;
+    }
+
     /**
      * Create a String from a char array with zero-copy (if available), using reflection to access a package-protected constructor of String.
      * */
diff --git a/src/java/org/apache/cassandra/utils/LazyToString.java b/src/java/org/apache/cassandra/utils/LazyToString.java
new file mode 100644
index 0000000..e719445
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/LazyToString.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.function.Supplier;
+
+public interface LazyToString
+{
+    public static LazyToString lazy(Supplier<String> castAsLambda)
+    {
+        return new LazyToString()
+        {
+            @Override
+            public String toString()
+            {
+                return castAsLambda.get();
+            }
+        };
+    }
+}
diff --git a/src/java/org/apache/cassandra/utils/MonotonicClock.java b/src/java/org/apache/cassandra/utils/MonotonicClock.java
index ae98466..2ef563c 100644
--- a/src/java/org/apache/cassandra/utils/MonotonicClock.java
+++ b/src/java/org/apache/cassandra/utils/MonotonicClock.java
@@ -30,6 +30,8 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.Config;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_APPROX;
+import static org.apache.cassandra.config.CassandraRelevantProperties.CLOCK_MONOTONIC_PRECISE;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 /**
@@ -79,7 +81,7 @@ public interface MonotonicClock
 
         private static MonotonicClock precise()
         {
-            String sclock = System.getProperty("cassandra.monotonic_clock.precise");
+            String sclock = CLOCK_MONOTONIC_PRECISE.getString();
 
             if (sclock != null)
             {
@@ -99,7 +101,7 @@ public interface MonotonicClock
 
         private static MonotonicClock approx(MonotonicClock precise)
         {
-            String sclock = System.getProperty("cassandra.monotonic_clock.approx");
+            String sclock = CLOCK_MONOTONIC_APPROX.getString();
             if (sclock != null)
             {
                 try
diff --git a/src/java/org/apache/cassandra/utils/NativeLibrary.java b/src/java/org/apache/cassandra/utils/NativeLibrary.java
index 01225aa..b34f626 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibrary.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibrary.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Field;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileInputStreamPlus;
 import org.slf4j.Logger;
@@ -32,6 +33,7 @@ import com.sun.jna.LastErrorException;
 
 import org.apache.cassandra.io.FSWriteError;
 
+import static org.apache.cassandra.config.CassandraRelevantProperties.IGNORE_MISSING_NATIVE_FILE_HINTS;
 import static org.apache.cassandra.config.CassandraRelevantProperties.OS_ARCH;
 import static org.apache.cassandra.config.CassandraRelevantProperties.OS_NAME;
 import static org.apache.cassandra.utils.NativeLibrary.OSType.LINUX;
@@ -42,6 +44,7 @@ import static org.apache.cassandra.utils.NativeLibrary.OSType.AIX;
 public final class NativeLibrary
 {
     private static final Logger logger = LoggerFactory.getLogger(NativeLibrary.class);
+    private static final boolean REQUIRE = !IGNORE_MISSING_NATIVE_FILE_HINTS.getBoolean();
 
     public enum OSType
     {
@@ -160,7 +163,8 @@ public final class NativeLibrary
         }
         catch (NoSuchMethodError x)
         {
-            logger.warn("Obsolete version of JNA present; unable to read errno. Upgrade to JNA 3.2.7 or later");
+            if (REQUIRE)
+                logger.warn("Obsolete version of JNA present; unable to read errno. Upgrade to JNA 3.2.7 or later");
             return 0;
         }
     }
@@ -292,7 +296,8 @@ public final class NativeLibrary
             if (!(e instanceof LastErrorException))
                 throw e;
 
-            logger.warn("fcntl({}, {}, {}) failed, errno ({}).", fd, command, flags, errno(e));
+            if (REQUIRE)
+                logger.warn("fcntl({}, {}, {}) failed, errno ({}).", fd, command, flags, errno(e));
         }
 
         return result;
@@ -315,7 +320,8 @@ public final class NativeLibrary
             if (!(e instanceof LastErrorException))
                 throw e;
 
-            logger.warn("open({}, O_RDONLY) failed, errno ({}).", path, errno(e));
+            if (REQUIRE)
+                logger.warn("open({}, O_RDONLY) failed, errno ({}).", path, errno(e));
         }
 
         return fd;
@@ -339,9 +345,12 @@ public final class NativeLibrary
             if (!(e instanceof LastErrorException))
                 throw e;
 
-            String errMsg = String.format("fsync(%s) failed, errno (%s) %s", fd, errno(e), e.getMessage());
-            logger.warn(errMsg);
-            throw new FSWriteError(e, errMsg);
+            if (REQUIRE)
+            {
+                String errMsg = String.format("fsync(%s) failed, errno (%s) %s", fd, errno(e), e.getMessage());
+                logger.warn(errMsg);
+                throw new FSWriteError(e, errMsg);
+            }
         }
     }
 
@@ -363,9 +372,12 @@ public final class NativeLibrary
             if (!(e instanceof LastErrorException))
                 throw e;
 
-            String errMsg = String.format("close(%d) failed, errno (%d).", fd, errno(e));
-            logger.warn(errMsg);
-            throw new FSWriteError(e, errMsg);
+            if (REQUIRE)
+            {
+                String errMsg = String.format("close(%d) failed, errno (%d).", fd, errno(e));
+                logger.warn(errMsg);
+                throw new FSWriteError(e, errMsg);
+            }
         }
     }
 
@@ -377,7 +389,8 @@ public final class NativeLibrary
         }
         catch (IllegalArgumentException|IllegalAccessException e)
         {
-            logger.warn("Unable to read fd field from FileChannel");
+            if (REQUIRE)
+                logger.warn("Unable to read fd field from FileChannel", e);
         }
         return -1;
     }
@@ -395,8 +408,11 @@ public final class NativeLibrary
         }
         catch (Exception e)
         {
-            JVMStabilityInspector.inspectThrowable(e);
-            logger.warn("Unable to read fd field from FileDescriptor");
+            if (REQUIRE)
+            {
+                JVMStabilityInspector.inspectThrowable(e);
+                logger.warn("Unable to read fd field from FileDescriptor", e);
+            }
         }
 
         return -1;
@@ -417,7 +433,8 @@ public final class NativeLibrary
         }
         catch (Exception e)
         {
-            logger.info("Failed to get PID from JNA", e);
+            if (REQUIRE)
+                logger.info("Failed to get PID from JNA", e);
         }
 
         return -1;
diff --git a/src/java/org/apache/cassandra/utils/SigarLibrary.java b/src/java/org/apache/cassandra/utils/SigarLibrary.java
index 246a9c8..81e44d2 100644
--- a/src/java/org/apache/cassandra/utils/SigarLibrary.java
+++ b/src/java/org/apache/cassandra/utils/SigarLibrary.java
@@ -21,6 +21,8 @@ import org.hyperic.sigar.*;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
+
 public class SigarLibrary
 {
     private Logger logger = LoggerFactory.getLogger(SigarLibrary.class);
@@ -169,7 +171,7 @@ public class SigarLibrary
             boolean goodAddressSpace = hasAcceptableAddressSpace();
             boolean goodFileLimits = hasAcceptableFileLimits();
             boolean goodProcNumber = hasAcceptableProcNumber();
-            if (swapEnabled || !goodAddressSpace || !goodFileLimits || !goodProcNumber)
+            if (swapEnabled || !goodAddressSpace || !goodFileLimits || !goodProcNumber || CassandraRelevantProperties.TEST_IGNORE_SIGAR.getBoolean())
             {
                 logger.warn("Cassandra server running in degraded mode. Is swap disabled? : {},  Address space adequate? : {}, " +
                             " nofile limit adequate? : {}, nproc limit adequate? : {} ", !swapEnabled, goodAddressSpace,
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 1e727d4..cab7256 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -36,6 +36,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.Ints;
@@ -104,6 +105,16 @@ public class UUIDGen
     }
 
     /**
+     * Creates a type 1 UUID (time-based UUID) with the timestamp of @param when, in milliseconds.
+     *
+     * @return a UUID instance
+     */
+    public static UUID getTimeUUIDWithClockSeqAndNode(long when, long clockSeqAndNode)
+    {
+        return new UUID(createTime(fromUnixTimestamp(when)), clockSeqAndNode);
+    }
+
+    /**
      * Returns a version 1 UUID using the provided timestamp and the local clock and sequence.
      * <p>
      * Note that this method is generally only safe to use if you can guarantee that the provided
@@ -130,16 +141,44 @@ public class UUIDGen
      * through randomization.
      *
      * @param whenInMicros a unix time in microseconds.
+     * @param flag a value between 0 and 9 (inclusive) to put in least significant (and unused) part of the timestamp
      * @return a new UUID {@code id} such that {@code microsTimestamp(id) == whenInMicros}. The UUID returned
      * by different calls will be unique even if {@code whenInMicros} is not.
      */
-    public static UUID getRandomTimeUUIDFromMicros(long whenInMicros)
+    public static UUID getRandomTimeUUIDFromMicros(long whenInMicros, int flag)
     {
+        Preconditions.checkArgument(0 <= flag  && flag < 10);
         long whenInMillis = whenInMicros / 1000;
-        long nanos = (whenInMicros - (whenInMillis * 1000)) * 10;
+        long nanos = (whenInMicros - (whenInMillis * 1000)) * 10 + flag;
         return new UUID(createTime(fromUnixTimestamp(whenInMillis, nanos)), secureRandom.nextLong());
     }
 
+    @Deprecated
+    public static UUID getRandomTimeUUIDFromMicros(long whenInMicros)
+    {
+        return getRandomTimeUUIDFromMicros(whenInMicros, 0);
+    }
+
+    /**
+     * Similar to {@link #getTimeUUIDFromMicros}, but randomize (using SecureRandom) the clock and sequence.
+     * <p>
+     * If you can guarantee that the {@code whenInMicros} argument is unique (for this JVM instance) for
+     * every call, then you should prefer {@link #getTimeUUIDFromMicros} which is faster. If you can't
+     * guarantee this however, this method will ensure the returned UUID are still unique (accross calls)
+     * through randomization.
+     *
+     * @param whenInMicros a unix time in microseconds.
+     * @return a new UUID {@code id} such that {@code microsTimestamp(id) == whenInMicros}. The UUID returned
+     * by different calls will be unique even if {@code whenInMicros} is not.
+     */
+    public static UUID getRandomTimeUUIDFromMicrosAndRandom(long whenInMicros, long randomPart, int flag)
+    {
+        Preconditions.checkArgument(0 <= flag && flag < 10);
+        long whenInMillis = whenInMicros / 1000;
+        long nanos = (whenInMicros - (whenInMillis * 1000)) * 10 + flag;
+        return new UUID(createTime(fromUnixTimestamp(whenInMillis, nanos)), randomPart);
+    }
+
     public static UUID getTimeUUID(long when, long nanos)
     {
         return new UUID(createTime(fromUnixTimestamp(when, nanos)), clockSeqAndNode);
@@ -280,7 +319,11 @@ public class UUIDGen
 
     private static byte[] createTimeUUIDBytes(long msb)
     {
-        long lsb = clockSeqAndNode;
+        return createTimeUUIDBytes(msb, clockSeqAndNode);
+    }
+
+    public static byte[] createTimeUUIDBytes(long msb, long lsb)
+    {
         byte[] uuidBytes = new byte[16];
 
         for (int i = 0; i < 8; i++)
@@ -308,6 +351,15 @@ public class UUIDGen
 
     private static long makeClockSeqAndNode()
     {
+        if (Boolean.getBoolean("cassandra.unsafe.deterministicuuidnode"))
+            return FBUtilities.getBroadcastAddressAndPort().addressBytes[3];
+
+        Long specified = Long.getLong("cassandra.unsafe.timeuuidnode");
+        if (specified != null)
+            return specified
+                    ^ FBUtilities.getBroadcastAddressAndPort().addressBytes[3]
+                    ^ (FBUtilities.getBroadcastAddressAndPort().addressBytes[2] << 8);
+
         long clock = new SecureRandom().nextLong();
 
         long lsb = 0;
@@ -352,7 +404,7 @@ public class UUIDGen
         return createTime(nanosSince);
     }
 
-    private static long createTime(long nanosSince)
+    public static long createTime(long nanosSince)
     {
         long msb = 0L;
         msb |= (0x00000000ffffffffL & nanosSince) << 32;
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Threads.java b/src/java/org/apache/cassandra/utils/concurrent/Threads.java
new file mode 100644
index 0000000..439a77f
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Threads.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+
+public class Threads
+{
+    public static class StackTraceCombiner implements Collector<StackTraceElement, StringBuilder, String>, Supplier<StringBuilder>, Function<StringBuilder, String>
+    {
+        final boolean printBriefPackages;
+        final String prefix;
+        final String delimiter;
+        final String suffix;
+
+        public StackTraceCombiner(boolean printBriefPackages, String prefix, String delimiter, String suffix)
+        {
+            this.printBriefPackages = printBriefPackages;
+            this.prefix = prefix;
+            this.delimiter = delimiter;
+            this.suffix = suffix;
+        }
+
+        public Supplier<StringBuilder> supplier()
+        {
+
+            return this;
+        }
+
+        public BiConsumer<StringBuilder, StackTraceElement> accumulator()
+        {
+            return (sb, ste) ->
+            {
+                if (sb.length() > prefix.length())
+                    sb.append(delimiter);
+
+                String className = ste.getClassName();
+
+                if (printBriefPackages)
+                {
+                    int afterPrevDot = 0;
+                    while (true)
+                    {
+                        int dot = className.indexOf('.', afterPrevDot);
+                        if (dot < 0)
+                            break;
+
+                        sb.append(className.charAt(afterPrevDot));
+                        sb.append('.');
+                        afterPrevDot = dot + 1;
+                    }
+                    sb.append(className, afterPrevDot, className.length());
+                }
+                else
+                {
+                    sb.append(className);
+                }
+                sb.append('.');
+                sb.append(ste.getMethodName());
+                sb.append(':');
+                sb.append(ste.getLineNumber());
+            };
+        }
+
+        public BinaryOperator<StringBuilder> combiner()
+        {
+            return (sb1, sb2) -> sb1.append("; ").append(sb2);
+        }
+
+        public Function<StringBuilder, String> finisher()
+        {
+            return this;
+        }
+
+        public Set<Characteristics> characteristics()
+        {
+            return Collections.emptySet();
+        }
+
+        public StringBuilder get()
+        {
+            return new StringBuilder(prefix);
+        }
+
+        public String apply(StringBuilder finish)
+        {
+            finish.append(suffix);
+            return finish.toString();
+        }
+    }
+
+    public static String prettyPrintStackTrace(Thread thread, boolean printBriefPackages, String delimiter)
+    {
+        return prettyPrint(thread.getStackTrace(), printBriefPackages, delimiter);
+    }
+
+    public static String prettyPrintStackTrace(Thread thread, boolean printBriefPackages, String prefix, String delimiter, String suffix)
+    {
+        return prettyPrint(thread.getStackTrace(), printBriefPackages, prefix, delimiter, suffix);
+    }
+
+    public static String prettyPrint(StackTraceElement[] st, boolean printBriefPackages, String delimiter)
+    {
+        return prettyPrint(st, printBriefPackages, "", delimiter, "");
+    }
+
+    public static String prettyPrint(StackTraceElement[] st, boolean printBriefPackages, String prefix, String delimiter, String suffix)
+    {
+        return Stream.of(st).collect(new StackTraceCombiner(printBriefPackages, prefix, delimiter, suffix));
+    }
+
+}
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 6371bda..0596aeb 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -20,23 +20,25 @@ package org.apache.cassandra.utils.memory;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class HeapPool extends MemtablePool
 {
+    private static final EnsureOnHeap ENSURE_NOOP = new EnsureOnHeap.NoOp();
+
     public HeapPool(long maxOnHeapMemory, float cleanupThreshold, MemtableCleaner cleaner)
     {
         super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
     }
 
-    public MemtableAllocator newAllocator()
+    public MemtableAllocator newAllocator(ColumnFamilyStore table)
     {
         return new Allocator(this);
     }
 
     private static class Allocator extends MemtableBufferAllocator
     {
-        private static final EnsureOnHeap ENSURE_NOOP = new EnsureOnHeap.NoOp();
         Allocator(HeapPool pool)
         {
             super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
@@ -53,4 +55,74 @@ public class HeapPool extends MemtablePool
             return ENSURE_NOOP;
         }
     }
+
+    public static class Logged extends MemtablePool
+    {
+        public interface Listener
+        {
+            public void accept(long size, String table);
+        }
+
+        private static Listener onAllocated = (i, table) -> {};
+
+        class LoggedSubPool extends SubPool
+        {
+            public LoggedSubPool(long limit, float cleanThreshold)
+            {
+                super(limit, cleanThreshold);
+            }
+
+            public MemtableAllocator.SubAllocator newAllocator(String table)
+            {
+                return new MemtableAllocator.SubAllocator(this)
+                {
+                    public void allocate(long size, OpOrder.Group opGroup)
+                    {
+                        onAllocated.accept(size, table);
+                        super.allocate(size, opGroup);
+                    }
+                };
+            }
+        }
+
+        public Logged(long maxOnHeapMemory, float cleanupThreshold, MemtableCleaner cleaner)
+        {
+            super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
+        }
+
+        public MemtableAllocator newAllocator(ColumnFamilyStore cfs)
+        {
+            return new Allocator(this, cfs == null ? "" : cfs.keyspace.getName() + '.' + cfs.name);
+        }
+
+        private static class Allocator extends MemtableBufferAllocator
+        {
+            Allocator(Logged pool, String table)
+            {
+                super(((LoggedSubPool) pool.onHeap).newAllocator(table), ((LoggedSubPool) pool.offHeap).newAllocator(table));
+            }
+
+            public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+            {
+                super.onHeap().allocate(size, opGroup);
+                return ByteBuffer.allocate(size);
+            }
+
+            @Override
+            public EnsureOnHeap ensureOnHeap()
+            {
+                return ENSURE_NOOP;
+            }
+        }
+
+        SubPool getSubPool(long limit, float cleanThreshold)
+        {
+            return new LoggedSubPool(limit, cleanThreshold);
+        }
+
+        public static void setListener(Listener listener)
+        {
+            onAllocated = listener;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index 2883830..d7076f5 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -103,7 +103,7 @@ public abstract class MemtableAllocator
     }
 
     /** Mark the BB as unused, permitting it to be reclaimed */
-    public static final class SubAllocator
+    public static class SubAllocator
     {
         // the tracker we are owning memory from
         private final MemtablePool.SubPool parent;
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 707f164..b43973d 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -81,7 +82,7 @@ public abstract class MemtablePool
         ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
     }
 
-    public abstract MemtableAllocator newAllocator();
+    public abstract MemtableAllocator newAllocator(ColumnFamilyStore table);
 
     public boolean needsCleaning()
     {
diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java
index e88b4d7..03c42e1 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+
 public class NativePool extends MemtablePool
 {
     public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner)
@@ -26,7 +28,7 @@ public class NativePool extends MemtablePool
     }
 
     @Override
-    public NativeAllocator newAllocator()
+    public NativeAllocator newAllocator(ColumnFamilyStore table)
     {
         return new NativeAllocator(this);
     }
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
index 416d1dd..adeb14f 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+
 public class SlabPool extends MemtablePool
 {
     final boolean allocateOnHeap;
@@ -28,7 +30,7 @@ public class SlabPool extends MemtablePool
         this.allocateOnHeap = maxOffHeapMemory == 0;
     }
 
-    public MemtableAllocator newAllocator()
+    public MemtableAllocator newAllocator(ColumnFamilyStore table)
     {
         return new SlabAllocator(onHeap.newAllocator(), offHeap.newAllocator(), allocateOnHeap);
     }
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 3d4de68..3478fb3 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -132,6 +132,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.exceptions.TransportException",
     "org.apache.cassandra.fql.FullQueryLogger",
     "org.apache.cassandra.fql.FullQueryLoggerOptions",
+    "org.apache.cassandra.gms.IFailureDetector",
     "org.apache.cassandra.locator.IEndpointSnitch",
     "org.apache.cassandra.io.FSWriteError",
     "org.apache.cassandra.io.FSError",
diff --git a/test/unit/org/apache/cassandra/db/CellSpecTest.java b/test/unit/org/apache/cassandra/db/CellSpecTest.java
index eac724a..24a3b12 100644
--- a/test/unit/org/apache/cassandra/db/CellSpecTest.java
+++ b/test/unit/org/apache/cassandra/db/CellSpecTest.java
@@ -138,7 +138,7 @@ public class CellSpecTest
         byte[] rawBytes = { 0, 1, 2, 3, 4, 5, 6 };
         ByteBuffer bbBytes = ByteBuffer.wrap(rawBytes);
         NativePool pool = new NativePool(1024, 1024, 1, () -> ImmediateFuture.success(true));
-        NativeAllocator allocator = pool.newAllocator();
+        NativeAllocator allocator = pool.newAllocator(null);
         OpOrder order = new OpOrder();
 
         List<Cell<?>> tests = new ArrayList<>();
diff --git a/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java b/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
index dbf9166..e97f067 100644
--- a/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/ClusteringHeapSizeTest.java
@@ -72,7 +72,7 @@ public class ClusteringHeapSizeTest
         return Arrays.asList(new Object[][] {
         { array },
         { buffer },
-        { new NativeClustering(pool.newAllocator(), order.getCurrent(), array)}
+        { new NativeClustering(pool.newAllocator(null), order.getCurrent(), array)}
         });
     }
 }
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/NativeCellTest.java b/test/unit/org/apache/cassandra/db/NativeCellTest.java
index 713f26b..fad9e2d 100644
--- a/test/unit/org/apache/cassandra/db/NativeCellTest.java
+++ b/test/unit/org/apache/cassandra/db/NativeCellTest.java
@@ -48,7 +48,7 @@ public class NativeCellTest
     private static final NativeAllocator nativeAllocator = new NativePool(Integer.MAX_VALUE,
                                                                           Integer.MAX_VALUE,
                                                                           1f,
-                                                                          () -> ImmediateFuture.success(true)).newAllocator();
+                                                                          () -> ImmediateFuture.success(true)).newAllocator(null);
     @SuppressWarnings("resource")
     private static final OpOrder.Group group = new OpOrder().start();
     private static Random rand;
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 86b81e5..3e28040 100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -94,6 +94,7 @@ public class ColumnFilterTest
     {
         DatabaseDescriptor.setSeedProvider(Arrays::asList);
         DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch());
+        DatabaseDescriptor.setDefaultFailureDetector();
         Gossiper.instance.start(0);
     }
 
diff --git a/test/unit/org/apache/cassandra/gms/ArrivalWindowTest.java b/test/unit/org/apache/cassandra/gms/ArrivalWindowTest.java
index ea59300..3a07ea3 100644
--- a/test/unit/org/apache/cassandra/gms/ArrivalWindowTest.java
+++ b/test/unit/org/apache/cassandra/gms/ArrivalWindowTest.java
@@ -23,13 +23,21 @@ package org.apache.cassandra.gms;
 
 import static org.junit.Assert.*;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class ArrivalWindowTest
 {
+    @BeforeClass
+    public static void beforeClass()
+    {
+        DatabaseDescriptor.setDefaultFailureDetector();
+    }
+
     @Test
     public void testWithNanoTime()
     {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org