You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/03/08 17:10:41 UTC

[cassandra] branch trunk updated: Improve replication tests

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 953c18d  Improve replication tests
953c18d is described below

commit 953c18df33ab3e009ced15a16785e2753843418a
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Mon Mar 8 17:08:36 2021 +0000

    Improve replication tests
    
    patch by Caleb Rackliffe; reviewed by Andrés de la Peña and Ekaterina Dimitrova for CASSANDRA-16181
---
 src/java/org/apache/cassandra/batchlog/Batch.java  |  27 +++-
 .../apache/cassandra/batchlog/BatchlogManager.java |   5 +-
 .../org/apache/cassandra/hints/HintMessage.java    |   1 -
 .../org/apache/cassandra/hints/HintsService.java   |   6 +
 .../locator/AbstractReplicationStrategy.java       |  24 ++--
 .../apache/cassandra/locator/LocalStrategy.java    |   5 +-
 .../cassandra/locator/NetworkTopologyStrategy.java |   7 +-
 .../cassandra/locator/ReplicationFactor.java       |  10 +-
 .../apache/cassandra/locator/SimpleStrategy.java   |   5 +-
 .../cassandra/distributed/action/GossipHelper.java |  13 +-
 .../distributed/impl/AbstractCluster.java          |   2 +-
 .../cassandra/distributed/impl/Instance.java       |  49 ++++++-
 .../test/HintedHandoffAddRemoveNodesTest.java      | 156 ++++++++++++++++++++
 .../test/HintedHandoffNodetoolTest.java            | 149 +++++++++++++++++++
 .../cassandra/distributed/test/TestBaseImpl.java   |  16 ++-
 .../distributed/test/ring/BootstrapTest.java       |  11 +-
 .../ring/CommunicationDuringDecommissionTest.java  |   4 +-
 .../upgrade/MixedModeBatchTestBase.java            | 157 +++++++++++++++++++++
 .../upgrade/MixedModeFrom2LoggedBatchTest.java     |  38 +++++
 .../upgrade/MixedModeFrom2ReplicationTest.java     |  38 +++++
 .../upgrade/MixedModeFrom2UnloggedBatchTest.java   |  38 +++++
 .../upgrade/MixedModeFrom3LoggedBatchTest.java     |  44 ++++++
 .../upgrade/MixedModeFrom3ReplicationTest.java     |  44 ++++++
 .../upgrade/MixedModeFrom3UnloggedBatchTest.java   |  44 ++++++
 .../upgrade/MixedModeReplicationTestBase.java      |  83 +++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       |  31 ++++
 .../batchlog/BatchlogEndpointFilterTest.java       |  41 +++++-
 .../cassandra/batchlog/BatchlogManagerTest.java    |   5 +-
 .../cql3/validation/operations/BatchTest.java      |  60 ++++++--
 .../apache/cassandra/hints/HintMessageTest.java    |  67 +++++++--
 .../locator/NetworkTopologyStrategyTest.java       |  62 +++++++-
 .../cassandra/locator/ReplicationFactorTest.java   |  75 ++++++----
 .../ReplicationStrategyEndpointCacheTest.java      |  68 +--------
 .../cassandra/locator/SimpleStrategyTest.java      |  33 +++++
 34 files changed, 1237 insertions(+), 181 deletions(-)

diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
index fb6c5d5..7b205e0 100644
--- a/src/java/org/apache/cassandra/batchlog/Batch.java
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -65,6 +67,7 @@ public final class Batch
      *
      * The mutations will always be encoded using the current messaging version.
      */
+    @SuppressWarnings("RedundantTypeArguments")
     public static Batch createRemote(UUID id, long creationTime, Collection<ByteBuffer> mutations)
     {
         return new Batch(id, creationTime, Collections.<Mutation>emptyList(), mutations);
@@ -77,12 +80,30 @@ public final class Batch
     {
         return decodedMutations.size() + encodedMutations.size();
     }
+    
+    @VisibleForTesting
+    public Collection<ByteBuffer> getEncodedMutations()
+    {
+        return encodedMutations;
+    }
 
-    static final class Serializer implements IVersionedSerializer<Batch>
+    /**
+     * Local batches contain only already decoded {@link Mutation} instances. Unlike remote 
+     * batches, which contain mutations encoded as {@link ByteBuffer} instances, local batches 
+     * can be serialized and sent over the wire.
+     * 
+     * @return {@code true} if there are no encoded mutations present, and {@code false} otherwise 
+     */
+    public boolean isLocal()
+    {
+        return encodedMutations.isEmpty();
+    }
+    
+    public static final class Serializer implements IVersionedSerializer<Batch>
     {
         public long serializedSize(Batch batch, int version)
         {
-            assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+            assert batch.isLocal() : "attempted to serialize a 'remote' batch";
 
             long size = UUIDSerializer.serializer.serializedSize(batch.id, version);
             size += sizeof(batch.creationTime);
@@ -100,7 +121,7 @@ public final class Batch
 
         public void serialize(Batch batch, DataOutputPlus out, int version) throws IOException
         {
-            assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+            assert batch.isLocal() : "attempted to serialize a 'remote' batch";
 
             UUIDSerializer.serializer.serialize(batch.id, out, version);
             out.writeLong(batch.creationTime);
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index f140332..9a009dc 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -447,8 +447,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             for (Mutation mutation : mutations)
             {
                 ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
-                if (handler != null)
-                    handlers.add(handler);
+                handlers.add(handler);
             }
             return handlers;
         }
@@ -457,7 +456,7 @@ public class BatchlogManager implements BatchlogManagerMBean
          * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
          * when a replica is down or a write request times out.
          *
-         * @return direct delivery handler to wait on or null, if no live nodes found
+         * @return direct delivery handler to wait on
          */
         private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
                                                                                      long writtenAt,
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index 60adb85..60d7641 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -24,7 +24,6 @@ import java.util.Objects;
 import java.util.UUID;
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.db.TypeSizes;
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 60de478..f7c4c9b 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -393,4 +393,10 @@ public final class HintsService implements HintsServiceMBean
     {
         return isShutDown;
     }
+    
+    @VisibleForTesting
+    public boolean isDispatchPaused()
+    {
+        return isDispatchPaused.get();
+    }
 }
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index dfcfdc0..7891895 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -99,12 +99,12 @@ public abstract class AbstractReplicationStrategy
      * @param searchPosition the position the natural endpoints are requested for
      * @return a copy of the natural endpoints for the given token
      */
-    public EndpointsForToken getNaturalReplicasForToken(RingPosition searchPosition)
+    public EndpointsForToken getNaturalReplicasForToken(RingPosition<?> searchPosition)
     {
         return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken());
     }
 
-    public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
+    public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition)
     {
         Token searchToken = searchPosition.getToken();
         Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
@@ -121,7 +121,7 @@ public abstract class AbstractReplicationStrategy
         return endpoints;
     }
 
-    public Replica getLocalReplicaFor(RingPosition searchPosition)
+    public Replica getLocalReplicaFor(RingPosition<?> searchPosition)
     {
         return getNaturalReplicas(searchPosition)
                .byEndpoint()
@@ -160,7 +160,7 @@ public abstract class AbstractReplicationStrategy
                                                                        long queryStartNanoTime,
                                                                        ConsistencyLevel idealConsistencyLevel)
     {
-        AbstractWriteResponseHandler resultResponseHandler;
+        AbstractWriteResponseHandler<T> resultResponseHandler;
         if (replicaPlan.consistencyLevel().isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
@@ -188,11 +188,11 @@ public abstract class AbstractReplicationStrategy
             else
             {
                 //Construct a delegate response handler to use to track the ideal consistency level
-                AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
-                                                                                    callback,
-                                                                                    writeType,
-                                                                                    queryStartNanoTime,
-                                                                                    idealConsistencyLevel);
+                AbstractWriteResponseHandler<T> idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
+                                                                                       callback,
+                                                                                       writeType,
+                                                                                       queryStartNanoTime,
+                                                                                       idealConsistencyLevel);
                 resultResponseHandler.setIdealCLResponseHandler(idealHandler);
             }
         }
@@ -319,7 +319,7 @@ public abstract class AbstractReplicationStrategy
         throws ConfigurationException
     {
         AbstractReplicationStrategy strategy;
-        Class [] parameterTypes = new Class[] {String.class, TokenMetadata.class, IEndpointSnitch.class, Map.class};
+        Class<?>[] parameterTypes = new Class[] {String.class, TokenMetadata.class, IEndpointSnitch.class, Map.class};
         try
         {
             Constructor<? extends AbstractReplicationStrategy> constructor = strategyClass.getConstructor(parameterTypes);
@@ -436,7 +436,7 @@ public abstract class AbstractReplicationStrategy
             if (rf.hasTransientReplicas())
             {
                 if (DatabaseDescriptor.getNumTokens() > 1)
-                    throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
+                    throw new ConfigurationException("Transient replication is not supported with vnodes yet");
             }
         }
         catch (IllegalArgumentException e)
@@ -447,7 +447,7 @@ public abstract class AbstractReplicationStrategy
 
     protected void validateExpectedOptions() throws ConfigurationException
     {
-        Collection expectedOptions = recognizedOptions();
+        Collection<String> expectedOptions = recognizedOptions();
         if (expectedOptions == null)
             return;
 
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index e7da769..0e3a918 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -50,7 +50,7 @@ public class LocalStrategy extends AbstractReplicationStrategy
      * LocalStrategy may be used before tokens are set up.
      */
     @Override
-    public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
+    public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition)
     {
         return replicas;
     }
@@ -74,9 +74,10 @@ public class LocalStrategy extends AbstractReplicationStrategy
     {
     }
 
+    @Override
     public Collection<String> recognizedOptions()
     {
         // LocalStrategy doesn't expect any options.
-        return Collections.<String>emptySet();
+        return Collections.emptySet();
     }
 }
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 9e6ad6d..9029dc1 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -55,10 +55,11 @@ import com.google.common.collect.Multimap;
  */
 public class NetworkTopologyStrategy extends AbstractReplicationStrategy
 {
+    public static final String REPLICATION_FACTOR = "replication_factor";
+    
     private final Map<String, ReplicationFactor> datacenters;
     private final ReplicationFactor aggregateRf;
     private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class);
-    private static final String REPLICATION_FACTOR = "replication_factor";
 
     public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
     {
@@ -170,6 +171,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
     /**
      * calculate endpoints in one pass through the tokens by tracking our progress in each DC.
      */
+    @Override
     public EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata)
     {
         // we want to preserve insertion order so that the first added endpoint becomes primary
@@ -229,6 +231,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         return collection != null ? collection.size() : 0;
     }
 
+    @Override
     public ReplicationFactor getReplicationFactor()
     {
         return aggregateRf;
@@ -245,6 +248,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         return datacenters.keySet();
     }
 
+    @Override
     public Collection<String> recognizedOptions()
     {
         // only valid options are valid DC names.
@@ -286,6 +290,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         options.values().removeAll(Collections.singleton("0"));
     }
 
+    @Override
     protected void validateExpectedOptions() throws ConfigurationException
     {
         // Do not accept query with no data centers specified.
diff --git a/src/java/org/apache/cassandra/locator/ReplicationFactor.java b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
index 91ce770..ee971d9 100644
--- a/src/java/org/apache/cassandra/locator/ReplicationFactor.java
+++ b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
@@ -76,11 +76,11 @@ public class ReplicationFactor
                                                                     .filter(endpoint -> Gossiper.instance.getReleaseVersion(endpoint) != null && Gossiper.instance.getReleaseVersion(endpoint).major < 4)
                                                                     .collect(Collectors.toList());
             if (!badVersionEndpoints.isEmpty())
-                throw new AssertionError("Transient replication is not supported in mixed version clusters with nodes < 4.0. Bad nodes: " + badVersionEndpoints);
+                throw new IllegalArgumentException("Transient replication is not supported in mixed version clusters with nodes < 4.0. Bad nodes: " + badVersionEndpoints);
         }
         else if (transientRF < 0)
         {
-            throw new AssertionError(String.format("Amount of transient nodes should be strictly positive, but was: '%d'", transientRF));
+            throw new IllegalArgumentException(String.format("Amount of transient nodes should be strictly positive, but was: '%d'", transientRF));
         }
     }
 
@@ -114,17 +114,17 @@ public class ReplicationFactor
             String[] parts = s.split("/");
             Preconditions.checkArgument(parts.length == 2,
                                         "Replication factor format is <replicas> or <replicas>/<transient>");
-            return new ReplicationFactor(Integer.valueOf(parts[0]), Integer.valueOf(parts[1]));
+            return new ReplicationFactor(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]));
         }
         else
         {
-            return new ReplicationFactor(Integer.valueOf(s), 0);
+            return new ReplicationFactor(Integer.parseInt(s), 0);
         }
     }
 
     public String toParseableString()
     {
-        return String.valueOf(allReplicas) + (hasTransientReplicas() ? "/" + transientReplicas() : "");
+        return allReplicas + (hasTransientReplicas() ? "/" + transientReplicas() : "");
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 672c9ff..928ac97 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -53,6 +53,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy
         this.rf = ReplicationFactor.fromString(this.configOptions.get(REPLICATION_FACTOR));
     }
 
+    @Override
     public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
     {
         ArrayList<Token> ring = metadata.sortedTokens();
@@ -78,12 +79,13 @@ public class SimpleStrategy extends AbstractReplicationStrategy
         return replicas.build();
     }
 
+    @Override
     public ReplicationFactor getReplicationFactor()
     {
         return rf;
     }
 
-    private final static void validateOptionsInternal(Map<String, String> configOptions) throws ConfigurationException
+    private static void validateOptionsInternal(Map<String, String> configOptions) throws ConfigurationException
     {
         if (configOptions.get(REPLICATION_FACTOR) == null)
             throw new ConfigurationException("SimpleStrategy requires a replication_factor strategy option.");
@@ -116,6 +118,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy
         }
     }
 
+    @Override
     public Collection<String> recognizedOptions()
     {
         return Collections.singleton(REPLICATION_FACTOR);
diff --git a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index 255f6e2..dc4f89c 100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -30,9 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 
-import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.IInstance;
@@ -48,8 +46,6 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.MigrationCoordinator;
-import org.apache.cassandra.schema.MigrationManager;
-import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -274,11 +270,16 @@ public class GossipHelper
         }
     }
 
-    public static InstanceAction decomission()
+    public static InstanceAction decommission()
     {
-        return (target) -> target.nodetoolResult("decommission").asserts().success();
+        return decommission(false);
     }
 
+    public static InstanceAction decommission(boolean force)
+    {
+        return force ? (target) -> target.nodetoolResult("decommission",  "--force").asserts().success()
+                     : (target) -> target.nodetoolResult("decommission").asserts().success();
+    }
 
     public static VersionedApplicationState tokens(IInvokableInstance instance)
     {
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index babc2c3..8026357 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -111,7 +111,7 @@ import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAnd
  */
 public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
 {
-    public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());;
+    public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());
 
     // WARNING: we have this logger not (necessarily) for logging, but
     // to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b79ccbc..4a865a9 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.security.Permission;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -44,6 +45,7 @@ import javax.management.NotificationListener;
 import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.concurrent.GlobalEventExecutor;
+import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.ExecutorLocals;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -92,6 +94,7 @@ import org.apache.cassandra.io.sstable.IndexSummaryManager;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
@@ -125,6 +128,7 @@ import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.UUIDSerializer;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
 import org.apache.cassandra.utils.memory.BufferPools;
@@ -135,6 +139,7 @@ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.fromCassandraInetAddressAndPort;
 import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
+import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
 
 public class Instance extends IsolatedExecutor implements IInvokableInstance
 {
@@ -319,11 +324,33 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
         try (DataOutputBuffer out = new DataOutputBuffer(1024))
         {
+            // On a 4.0+ node, C* makes a distinction between "local" and "remote" batches, where only the former can 
+            // be serialized and sent to a remote node, where they are deserialized and written to the batch commitlog
+            // without first being converted into mutation objects. Batch serialization is therfore not symmetric, and
+            // we use a special procedure here that "re-serializes" a "remote" batch to build the message.
+            if (fromVersion >= MessagingService.VERSION_40 && messageOut.verb().id == BATCH_STORE_REQ.id)
+            {
+                Object maybeBatch = messageOut.payload;
+
+                if (maybeBatch instanceof Batch)
+                {
+                    Batch batch = (Batch) maybeBatch;
+
+                    // If the batch is local, it can be serialized along the normal path.
+                    if (!batch.isLocal())
+                    {
+                        reserialize(batch, out, toVersion);
+                        byte[] bytes = out.toByteArray();
+                        return new MessageImpl(messageOut.verb().id, bytes, messageOut.id(), toVersion, fromCassandraInetAddressAndPort(from));
+                    }
+                }
+            }
+            
             Message.serializer.serialize(messageOut, out, toVersion);
             byte[] bytes = out.toByteArray();
             if (messageOut.serializedSize(toVersion) != bytes.length)
                 throw new AssertionError(String.format("Message serializedSize(%s) does not match what was written with serialize(out, %s) for verb %s and serializer %s; " +
-                                                       "expected %s, actual %s", toVersion, toVersion, messageOut.verb(), messageOut.serializer.getClass(),
+                                                       "expected %s, actual %s", toVersion, toVersion, messageOut.verb(), Message.serializer.getClass(),
                                                        messageOut.serializedSize(toVersion), bytes.length));
             return new MessageImpl(messageOut.verb().id, bytes, messageOut.id(), toVersion, fromCassandraInetAddressAndPort(from));
         }
@@ -333,6 +360,26 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }
     }
 
+    /**
+     * Only "local" batches can be passed through {@link Batch.Serializer#serialize(Batch, DataOutputPlus, int)} and 
+     * sent to a remote node during normal operation, but there are testing scenarios where we may intercept and 
+     * forward a "remote" batch. This method allows us to put the already encoded mutations back onto a stream.
+     */
+    private static void reserialize(Batch batch, DataOutputPlus out, int version) throws IOException
+    {
+        assert !batch.isLocal() : "attempted to reserialize a 'local' batch";
+
+        UUIDSerializer.serializer.serialize(batch.id, out, version);
+        out.writeLong(batch.creationTime);
+
+        out.writeUnsignedVInt(batch.getEncodedMutations().size());
+
+        for (ByteBuffer mutation : batch.getEncodedMutations())
+        {
+            out.write(mutation);
+        }
+    }
+
     @VisibleForTesting
     public static Message<?> deserializeMessage(IMessage message)
     {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
new file mode 100644
index 0000000..4bd5d44
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.action.GossipHelper;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.service.StorageService;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.cassandra.distributed.action.GossipHelper.decommission;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests around removing and adding nodes from and to a cluster while hints are still outstanding.
+ */
+public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
+{
+    /**
+     * Replaces Python dtest {@code hintedhandoff_test.py:TestHintedHandoff.test_hintedhandoff_decom()}.
+     */
+    @Test
+    public void shouldStreamHintsDuringDecommission() throws Exception
+    {
+        try (Cluster cluster = builder().withNodes(4)
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        .start())
+        {
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_hint_test (key int PRIMARY KEY, value int)"));
+            
+            cluster.get(4).shutdown().get();
+            
+            // Write data using the second node as the coordinator...
+            populate(cluster, "decom_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
+            Long totalHints = countTotalHints(cluster);
+            // ...and verify that we've accumulated hints intended for node 4, which is down.
+            assertThat(totalHints).isGreaterThan(0);
+
+            // Decomision node 1...
+            assertEquals(4, endpointsKnownTo(cluster, 2));
+            cluster.run(decommission(), 1);
+            await().pollDelay(1, SECONDS).until(() -> endpointsKnownTo(cluster, 2) == 3);
+            // ...and verify that all data still exists on either node 2 or 3.
+            verify(cluster, "decom_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
+            
+            // Start node 4 back up and verify that all hints were delivered.
+            cluster.get(4).startup();
+            await().atMost(30, SECONDS).pollDelay(3, SECONDS).until(() -> count(cluster, "decom_hint_test", 4).equals(totalHints));
+
+            // Now decommission both nodes 2 and 3...
+            cluster.run(GossipHelper.decommission(true), 2);
+            cluster.run(GossipHelper.decommission(true), 3);
+            await().pollDelay(1, SECONDS).until(() -> endpointsKnownTo(cluster, 4) == 1);
+            // ...and verify that even if we drop below the replication factor of 2, all data has been preserved.
+            verify(cluster, "decom_hint_test", 4, 0, 128, ConsistencyLevel.ONE);
+        }
+    }
+
+    @Test
+    public void shouldBootstrapWithHintsOutstanding() throws Exception
+    {
+        try (Cluster cluster = builder().withNodes(3)
+                                        .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+                                        .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
+                                        .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        .start())
+        {
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.boot_hint_test (key int PRIMARY KEY, value int)"));
+
+            cluster.get(3).shutdown().get();
+
+            // Write data using the second node as the coordinator...
+            populate(cluster, "boot_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
+            Long totalHints = countTotalHints(cluster);
+            // ...and verify that we've accumulated hints intended for node 3, which is down.
+            assertThat(totalHints).isGreaterThan(0);
+
+            // Bootstrap a new/4th node into the cluster...
+            bootstrapAndJoinNode(cluster);
+
+            // ...and verify that all data is available.
+            verify(cluster, "boot_hint_test", 4, 0, 128, ConsistencyLevel.ONE);
+
+            // Finally, bring node 3 back up and verify that all hints were delivered.
+            cluster.get(3).startup();
+            await().atMost(30, SECONDS).pollDelay(3, SECONDS).until(() -> count(cluster, "boot_hint_test", 3).equals(totalHints));
+            verify(cluster, "boot_hint_test", 3, 0, 128, ConsistencyLevel.ONE);
+            verify(cluster, "boot_hint_test", 3, 0, 128, ConsistencyLevel.TWO);
+        }
+    }
+
+    @SuppressWarnings("Convert2MethodRef")
+    private Long countTotalHints(Cluster cluster)
+    {
+        return cluster.get(2).callOnInstance(() -> StorageMetrics.totalHints.getCount());
+    }
+
+    @SuppressWarnings("SameParameterValue")
+    private void populate(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
+    {
+        for (int i = start; i < start + count; i++)
+            cluster.coordinator(coordinator)
+                   .execute("INSERT INTO " + KEYSPACE + '.' + table + " (key, value) VALUES (?, ?)", cl, i, i);
+    }
+
+    @SuppressWarnings("SameParameterValue")
+    private void verify(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
+    {
+        for (int i = start; i < start + count; i++)
+        {
+            Object[][] row = cluster.coordinator(coordinator)
+                                    .execute("SELECT key, value FROM " + KEYSPACE + '.' + table + " WHERE key = ?", cl, i);
+            assertRows(row, row(i, i));
+        }
+    }
+
+    private Long count(Cluster cluster, String table, int node)
+    {
+        return (Long) cluster.get(node).executeInternal("SELECT COUNT(*) FROM " + KEYSPACE + '.' + table)[0][0];
+    }
+
+    private int endpointsKnownTo(Cluster cluster, int node)
+    {
+        return cluster.get(node).callOnInstance(() -> StorageService.instance.getTokenMetadata().getAllEndpoints().size());
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffNodetoolTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffNodetoolTest.java
new file mode 100644
index 0000000..5045301
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffNodetoolTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.service.StorageProxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests replace the nodetool-related Python dtests in hintedhandoff_test.py.
+ */
+@RunWith(Parameterized.class)
+public class HintedHandoffNodetoolTest extends TestBaseImpl
+{
+    private static Cluster cluster;
+
+    @Parameterized.Parameter
+    public int node;
+
+    @Parameterized.Parameters(name = "node={0}")
+    public static List<Object[]> data()
+    {   
+        List<Object[]> result = new ArrayList<>();
+        result.add(new Object[]{ 1 });
+        result.add(new Object[]{ 2 });
+        return result;
+    }
+
+    @BeforeClass
+    public static void before() throws IOException
+    {
+        cluster = init(Cluster.build().withNodes(2).withDCs(2).start());
+    }
+
+    @AfterClass
+    public static void after()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+    
+    @Before
+    public void enableHandoff()
+    {
+        cluster.get(1).nodetoolResult("enablehandoff");
+        cluster.get(2).nodetoolResult("enablehandoff");
+    }
+
+    @SuppressWarnings("Convert2MethodRef")
+    @Test
+    public void testEnableHandoff()
+    {
+        cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+        assertTrue(cluster.get(node).callOnInstance(() -> StorageProxy.instance.getHintedHandoffEnabled()));
+    }
+
+    @Test
+    public void testDisableHandoff()
+    {
+        cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+        cluster.get(node).nodetoolResult("disablehandoff");
+        cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is not running");
+        cluster.get(node).nodetoolResult("enablehandoff");
+        cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+    }
+
+    @Test
+    public void testDisableForDC()
+    {
+        cluster.get(node).nodetoolResult("disablehintsfordc", "datacenter1");
+        cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Data center datacenter1 is disabled");
+        cluster.get(node).nodetoolResult("enablehintsfordc", "datacenter1");
+        cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutNotContains("Data center datacenter1 is disabled");
+    }
+
+    @Test
+    public void testPauseHandoff()
+    {
+        cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+        
+        cluster.get(node).nodetoolResult("pausehandoff").asserts().success();
+        Boolean isPaused = cluster.get(node).callOnInstance(() -> HintsService.instance.isDispatchPaused());
+        assertTrue(isPaused);
+
+        cluster.get(node).nodetoolResult("resumehandoff").asserts().success();
+        isPaused = cluster.get(node).callOnInstance(() -> HintsService.instance.isDispatchPaused());
+        assertFalse(isPaused);
+    }
+
+    @Test
+    public void testThrottle()
+    {
+        Integer throttleInKB = cluster.get(node).callOnInstance(DatabaseDescriptor::getHintedHandoffThrottleInKB);
+        cluster.get(node).nodetoolResult("sethintedhandoffthrottlekb", String.valueOf(throttleInKB * 2)).asserts().success();
+        Integer newThrottleInKB = cluster.get(node).callOnInstance(DatabaseDescriptor::getHintedHandoffThrottleInKB);
+        assertEquals(throttleInKB * 2, newThrottleInKB.intValue());
+    }
+
+    @SuppressWarnings("Convert2MethodRef")
+    @Test
+    public void testMaxHintWindow()
+    {
+        Integer hintWindowMillis = cluster.get(node).callOnInstance(() -> StorageProxy.instance.getMaxHintWindow());
+        
+        cluster.get(node).nodetoolResult("getmaxhintwindow")
+                         .asserts()
+                         .success()
+                         .stdoutContains("Current max hint window: " + hintWindowMillis + " ms");
+
+        cluster.get(node).nodetoolResult("setmaxhintwindow", String.valueOf(hintWindowMillis * 2)).asserts().success();
+
+        cluster.get(node).nodetoolResult("getmaxhintwindow")
+                         .asserts()
+                         .success()
+                         .stdoutContains("Current max hint window: " + hintWindowMillis * 2 + " ms");
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 37f43bd..343ccc8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -23,7 +23,6 @@ import java.math.BigInteger;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -38,8 +37,13 @@ import org.apache.cassandra.cql3.Duration;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
 
+import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
+import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+
 public class TestBaseImpl extends DistributedTestBase
 {
     public static final Object[][] EMPTY_ROWS = new Object[0][];
@@ -100,6 +104,16 @@ public class TestBaseImpl extends DistributedTestBase
         return TupleType.buildValue(bbs);
     }
 
+    protected void bootstrapAndJoinNode(Cluster cluster)
+    {
+        IInstanceConfig config = cluster.newInstanceConfig();
+        config.set("auto_bootstrap", true);
+        IInvokableInstance newInstance = cluster.bootstrap(config);
+        withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), Integer.toString(90 * 1000),
+                     () -> withProperty("cassandra.join_ring", false, () -> newInstance.startup(cluster)));
+        newInstance.nodetoolResult("join").asserts().success();
+    }
+
     @SuppressWarnings("unchecked")
     private static ByteBuffer makeByteBuffer(Object value)
     {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index 15a8d00..81861ab 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 
 import static java.util.Arrays.asList;
-import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
 import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
 import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
 import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
@@ -90,15 +89,7 @@ public class BootstrapTest extends TestBaseImpl
                                         .start())
         {
             populate(cluster,0, 100);
-
-            IInstanceConfig config = cluster.newInstanceConfig();
-            config.set("auto_bootstrap", true);
-            IInvokableInstance newInstance = cluster.bootstrap(config);
-            withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), Integer.toString(90 * 1000),
-                         () -> withProperty("cassandra.join_ring", false,
-                                            () -> newInstance.startup(cluster)));
-
-            newInstance.nodetoolResult("join").asserts().success();
+            bootstrapAndJoinNode(cluster);
 
             for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
                 Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
index 37b1802..d6825d9 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 
-import static org.apache.cassandra.distributed.action.GossipHelper.decomission;
+import static org.apache.cassandra.distributed.action.GossipHelper.decommission;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
@@ -45,7 +45,7 @@ public class CommunicationDuringDecommissionTest extends TestBaseImpl
         {
             BootstrapTest.populate(cluster, 0, 100);
 
-            cluster.run(decomission(), 1);
+            cluster.run(decommission(), 1);
 
             cluster.filters().allVerbs().from(1).messagesMatching((i, i1, iMessage) -> {
                 throw new AssertionError("Decomissioned node should not send any messages");
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeBatchTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeBatchTestBase.java
new file mode 100644
index 0000000..f18647c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeBatchTestBase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
+import static org.apache.cassandra.net.Verb.REQUEST_RSP;
+
+/**
+ * A base class for testing the replication of logged/unlogged batches on mixed-version clusters.
+ * 
+ * The tests based on this class partially replace the Python dtests in batch_test.py created for CASSANDRA-9673.
+ */
+public class MixedModeBatchTestBase extends UpgradeTestBase
+{
+    private static final int KEYS_PER_BATCH = 10;
+
+    protected void testSimpleStrategy(Versions.Major from, Versions.Major to, boolean isLogged) throws Throwable
+    {
+        String insert = "INSERT INTO test_simple.names (key, name) VALUES (%d, '%s')";
+        String select = "SELECT * FROM test_simple.names WHERE key = ?";
+
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2)
+        .upgrade(from, to)
+        .setup(cluster -> {
+            cluster.schemaChange("CREATE KEYSPACE test_simple WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};");
+            cluster.schemaChange("CREATE TABLE test_simple.names (key int PRIMARY KEY, name text)");
+        })
+        .runAfterNodeUpgrade((cluster, upgraded) -> {
+            if (isLogged)
+            {
+                // If we're testing logged batches, exercise the case were batchlog writes fail.
+                IMessageFilters.Filter dropBatchlogWrite = cluster.filters().inbound().verbs(BATCH_STORE_REQ.id, REQUEST_RSP.id).drop();
+                dropBatchlogWrite.on();
+                testBatches(true, true, insert, select, cluster, upgraded);
+                cluster.filters().reset();
+            }
+
+            cluster.coordinator(1).execute("TRUNCATE test_simple.names", ConsistencyLevel.ALL);
+            testBatches(isLogged, false, insert, select, cluster, upgraded);
+            cluster.coordinator(1).execute("TRUNCATE test_simple.names", ConsistencyLevel.ALL);
+        })
+        .run();
+    }
+
+    private void testBatches(boolean isLogged, boolean failBatchlog, String insert, String select, UpgradeableCluster cluster, int upgraded)
+    {
+        List<Long> initialTokens = new ArrayList<>(cluster.size());
+
+        for (int i = 1; i <= cluster.size(); i++)
+            initialTokens.add(Long.valueOf(cluster.get(i).config().get("initial_token").toString()));
+
+        // Exercise all the coordinators...
+        for (int i = 1; i <= cluster.size(); i++)
+        {
+            StringBuilder batchBuilder = new StringBuilder("BEGIN " + (isLogged ? "" : "UNLOGGED ") + "BATCH\n");
+            String name = "Julia";
+            Runnable[] tests = new Runnable[KEYS_PER_BATCH];
+
+            // ...and sample enough keys that we cover the ring.
+            for (int j = 0; j < KEYS_PER_BATCH; j++)
+            {
+                int key = j + (i * KEYS_PER_BATCH);
+                batchBuilder.append(String.format(insert, key, name)).append('\n');
+
+                // Track the test that will later verify that this mutation was replicated properly.
+                tests[j] = () -> {
+                    Object[] row = row(key, name);
+                    Long token = tokenFrom(key);
+                    int node = primaryReplica(initialTokens, token);
+                    
+                    Object[][] primaryResult = cluster.get(node).executeInternal(select, key);
+                    
+                    // We shouldn't expect to see any results if the batchlog write failed.
+                    if (failBatchlog)
+                        assertRows(primaryResult);
+                    else
+                        assertRows(primaryResult, row);
+
+                    node = nextNode(node, cluster.size());
+
+                    Object[][] nextResult = cluster.get(node).executeInternal(select, key);
+
+                    if (failBatchlog)
+                        assertRows(nextResult);
+                    else
+                        assertRows(nextResult, row);
+
+                    // At RF=2, this node should not have received the write.
+                    node = nextNode(node, cluster.size());
+                    assertRows(cluster.get(node).executeInternal(select, key));
+                };
+            }
+
+            String batch = batchBuilder.append("APPLY BATCH").toString();
+            
+            try
+            {
+                cluster.coordinator(i).execute(batch, ConsistencyLevel.ALL);
+            }
+            catch (Throwable t)
+            {
+                if (!failBatchlog || !exceptionMatches(t, WriteTimeoutException.class))
+                {
+                    // The standard write failure exception won't give us any context for what actually failed.
+                    if (exceptionMatches(t, WriteFailureException.class))
+                    {
+                        String message = "Failed to write following batch to coordinator %d after upgrading node %d:\n%s";
+                        throw new AssertionError(String.format(message, i, upgraded, batch), t);
+                    }
+
+                    throw t;
+                }
+                
+                // Failing the batchlog write will involve a timeout, so that's expected. Just continue...
+            }
+            
+            for (Runnable test : tests)
+                test.run();
+        }
+    }
+
+    private boolean exceptionMatches(Throwable t, Class<?> clazz)
+    {
+        return t.getClass().getSimpleName().equals(clazz.getSimpleName()) 
+               || t.getCause() != null && t.getCause().getClass().getSimpleName().equals(clazz.getSimpleName());
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2LoggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2LoggedBatchTest.java
new file mode 100644
index 0000000..575642c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2LoggedBatchTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom2LoggedBatchTest extends MixedModeBatchTestBase
+{
+    @Test
+    public void testSimpleStrategy22to30() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v22, Versions.Major.v30, true);
+    }
+
+    @Test
+    public void testSimpleStrategy22to3X() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v22, Versions.Major.v3X, true);
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2ReplicationTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2ReplicationTest.java
new file mode 100644
index 0000000..38efbaf
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2ReplicationTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom2ReplicationTest extends MixedModeReplicationTestBase
+{
+    @Test
+    public void testSimpleStrategy22to30() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v22, Versions.Major.v30);
+    }
+
+    @Test
+    public void testSimpleStrategy22to3X() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v22, Versions.Major.v3X);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2UnloggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2UnloggedBatchTest.java
new file mode 100644
index 0000000..f01958e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2UnloggedBatchTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom2UnloggedBatchTest extends MixedModeBatchTestBase
+{
+    @Test
+    public void testSimpleStrategy22to30() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v22, Versions.Major.v30, false);
+    }
+
+    @Test
+    public void testSimpleStrategy22to3X() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v22, Versions.Major.v3X, false);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3LoggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3LoggedBatchTest.java
new file mode 100644
index 0000000..bb2008e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3LoggedBatchTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom3LoggedBatchTest extends MixedModeBatchTestBase
+{
+    @Test
+    public void testSimpleStrategy30to3X() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v30, Versions.Major.v3X, true);
+    }
+
+    @Test
+    public void testSimpleStrategy30to4() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v30, Versions.Major.v4, true);
+    }
+
+    @Test
+    public void testSimpleStrategy3Xto4() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v3X, Versions.Major.v4, true);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3ReplicationTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3ReplicationTest.java
new file mode 100644
index 0000000..8eae4b4
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3ReplicationTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom3ReplicationTest extends MixedModeReplicationTestBase
+{
+    @Test
+    public void testSimpleStrategy30to3X() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v30, Versions.Major.v3X);
+    }
+
+    @Test
+    public void testSimpleStrategy30to4() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v30, Versions.Major.v4);
+    }
+
+    @Test
+    public void testSimpleStrategy3Xto4() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v3X, Versions.Major.v4);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3UnloggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3UnloggedBatchTest.java
new file mode 100644
index 0000000..b39177b
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3UnloggedBatchTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom3UnloggedBatchTest extends MixedModeBatchTestBase
+{
+    @Test
+    public void testSimpleStrategy30to3X() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v30, Versions.Major.v3X, false);
+    }
+
+    @Test
+    public void testSimpleStrategy30to4() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v30, Versions.Major.v4, false);
+    }
+
+    @Test
+    public void testSimpleStrategy3Xto4() throws Throwable
+    {
+        testSimpleStrategy(Versions.Major.v3X, Versions.Major.v4, false);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReplicationTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReplicationTestBase.java
new file mode 100644
index 0000000..153a8a5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReplicationTestBase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * A base class for testing basic replication on mixed-version clusters.
+ */
+public class MixedModeReplicationTestBase extends UpgradeTestBase
+{
+    protected void testSimpleStrategy(Versions.Major from, Versions.Major to) throws Throwable
+    {
+        String insert = "INSERT INTO test_simple.names (key, name) VALUES (?, ?)";
+        String select = "SELECT * FROM test_simple.names WHERE key = ?";
+
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2)
+        .upgrade(from, to)
+        .setup(cluster -> {
+            cluster.schemaChange("CREATE KEYSPACE test_simple WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};");
+            cluster.schemaChange("CREATE TABLE test_simple.names (key int PRIMARY KEY, name text)");
+        })
+        .runAfterNodeUpgrade((cluster, upgraded) -> {
+            List<Long> initialTokens = new ArrayList<>(cluster.size() + 1);
+            initialTokens.add(null); // The first valid token is at 1 to avoid offset math below.
+
+            for (int i = 1; i <= cluster.size(); i++)
+                initialTokens.add(Long.valueOf(cluster.get(i).config().get("initial_token").toString()));
+
+            List<Long> validTokens = initialTokens.subList(1, cluster.size() + 1);
+
+            // Exercise all the coordinators...
+            for (int i = 1; i <= cluster.size(); i++)
+            {
+                // ...and sample enough keys that we cover the ring.
+                for (int j = 0; j < 10; j++)
+                {
+                    int key = j + (i * 10);
+                    Object[] row = row(key, "Nero");
+                    Long token = tokenFrom(key);
+
+                    cluster.coordinator(i).execute(insert, ConsistencyLevel.ALL, row);
+
+                    int node = primaryReplica(validTokens, token);
+                    assertRows(cluster.get(node).executeInternal(select, key), row);
+
+                    node = nextNode(node, cluster.size());
+                    assertRows(cluster.get(node).executeInternal(select, key), row);
+
+                    // At RF=2, this node should not have received the write.
+                    node = nextNode(node, cluster.size());
+                    assertRows(cluster.get(node).executeInternal(select, key));
+                }
+            }
+        })
+        .run();
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 3653837..39957e9 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -28,12 +28,15 @@ import java.util.function.Consumer;
 import org.junit.After;
 import org.junit.BeforeClass;
 
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.distributed.shared.Versions.Major;
 import static org.apache.cassandra.distributed.shared.Versions.Version;
@@ -206,4 +209,32 @@ public class UpgradeTestBase extends DistributedTestBase
                              .upgrade(Versions.Major.v3X, Versions.Major.v4)
                              .nodesToUpgrade(toUpgrade);
     }
+
+    protected static int primaryReplica(List<Long> initialTokens, Long token)
+    {
+        int primary = 1;
+
+        for (Long initialToken : initialTokens)
+        {
+            if (token <= initialToken)
+            {
+                break;
+            }
+
+            primary++;
+        }
+
+        return primary;
+    }
+
+    protected static Long tokenFrom(int key)
+    {
+        DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(key));
+        return (Long) dk.getToken().getTokenValue();
+    }
+
+    protected static int nextNode(int current, int numNodes)
+    {
+        return current == numNodes ? 1 : current + 1;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
index c2b9fc9..8057e99 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -20,15 +20,12 @@ package org.apache.cassandra.batchlog;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Multimap;
 import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaPlans;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -40,7 +37,7 @@ public class BatchlogEndpointFilterTest
     private static final String LOCAL = "local";
 
     @Test
-    public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+    public void shouldSelect2HostsFromNonLocalRacks() throws UnknownHostException
     {
         Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
                 .put(LOCAL, InetAddressAndPort.getByName("0"))
@@ -57,6 +54,27 @@ public class BatchlogEndpointFilterTest
     }
 
     @Test
+    public void shouldSelectLastHostsFromLastNonLocalRacks() throws UnknownHostException
+    {
+        Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+                                                         .put(LOCAL, InetAddressAndPort.getByName("00"))
+                                                         .put("1", InetAddressAndPort.getByName("11"))
+                                                         .put("2", InetAddressAndPort.getByName("2"))
+                                                         .put("2", InetAddressAndPort.getByName("22"))
+                                                         .put("3", InetAddressAndPort.getByName("3"))
+                                                         .put("3", InetAddressAndPort.getByName("33"))
+                                                         .build();
+        
+        Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
+        assertThat(result.size(), is(2));
+
+        // result should be the last replicas of the last two racks
+        // (Collections.shuffle has been replaced with Collections.reverse for testing)
+        assertTrue(result.contains(InetAddressAndPort.getByName("22")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("33")));
+    }
+
+    @Test
     public void shouldSelectHostFromLocal() throws UnknownHostException
     {
         Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
@@ -71,7 +89,7 @@ public class BatchlogEndpointFilterTest
     }
 
     @Test
-    public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+    public void shouldReturnPassedEndpointForSingleNodeDC() throws UnknownHostException
     {
         Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
                 .put(LOCAL, InetAddressAndPort.getByName("0"))
@@ -116,6 +134,19 @@ public class BatchlogEndpointFilterTest
         assertTrue(result.contains(InetAddressAndPort.getByName("1111")));
     }
 
+    @Test
+    public void shouldSelectOnlyTwoHostsEvenIfLocal() throws UnknownHostException
+    {
+        Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+                                                         .put(LOCAL, InetAddressAndPort.getByName("1"))
+                                                         .put(LOCAL, InetAddressAndPort.getByName("11"))
+                                                         .build();
+        Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
+        assertThat(result.size(), is(2));
+        assertTrue(result.contains(InetAddressAndPort.getByName("1")));
+        assertTrue(result.contains(InetAddressAndPort.getByName("11")));
+    }
+
     private Collection<InetAddressAndPort> filterBatchlogEndpoints(Multimap<String, InetAddressAndPort> endpoints)
     {
         return ReplicaPlans.filterBatchlogEndpoints(LOCAL, endpoints,
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
index 361759f..b86b0d3 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.batchlog;
 
-import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
@@ -91,7 +90,6 @@ public class BatchlogManagerTest
     }
 
     @Before
-    @SuppressWarnings("deprecation")
     public void setUp() throws Exception
     {
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -127,7 +125,6 @@ public class BatchlogManagerTest
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testReplay() throws Exception
     {
         long initialAllBatches = BatchlogManager.instance.countAllBatches();
@@ -273,7 +270,7 @@ public class BatchlogManagerTest
     }
 
     @Test
-    public void testAddBatch() throws IOException
+    public void testAddBatch()
     {
         long initialAllBatches = BatchlogManager.instance.countAllBatches();
         TableMetadata cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata();
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
index bc220fd..cbe5b04 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
@@ -33,14 +33,17 @@ public class BatchTest extends CQLTester
     {
         createTable("CREATE TABLE %s (userid text PRIMARY KEY, name text, password text)");
 
-        String query = "BEGIN BATCH\n"
-                       + "INSERT INTO %1$s (userid, password, name) VALUES ('user2', 'ch@ngem3b', 'second user');\n"
-                       + "UPDATE %1$s SET password = 'ps22dhds' WHERE userid = 'user3';\n"
-                       + "INSERT INTO %1$s (userid, password) VALUES ('user4', 'ch@ngem3c');\n"
-                       + "DELETE name FROM %1$s WHERE userid = 'user1';\n"
-                       + "APPLY BATCH;";
-
-        execute(query);
+        execute("BEGIN BATCH\n" +
+                "INSERT INTO %1$s (userid, password, name) VALUES ('user2', 'ch@ngem3b', 'second user');\n" + 
+                "UPDATE %1$s SET password = 'ps22dhds' WHERE userid = 'user3';\n" + 
+                "INSERT INTO %1$s (userid, password) VALUES ('user4', 'ch@ngem3c');\n" + 
+                "DELETE name FROM %1$s WHERE userid = 'user1';\n" + 
+                "APPLY BATCH;");
+
+        assertRows(execute("SELECT userid FROM %s"),
+                   row("user2"),
+                   row("user4"),
+                   row("user3"));
     }
 
     /**
@@ -52,24 +55,53 @@ public class BatchTest extends CQLTester
         createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<int>)");
 
         execute("BEGIN BATCH " +
-                "UPDATE %1$s SET l = l +[ 1 ] WHERE k = 0; " +
-                "UPDATE %1$s SET l = l + [ 2 ] WHERE k = 0; " +
-                "UPDATE %1$s SET l = l + [ 3 ] WHERE k = 0; " +
+                "UPDATE %1$s SET l = l + [1] WHERE k = 0; " +
+                "UPDATE %1$s SET l = l + [2] WHERE k = 0; " +
+                "UPDATE %1$s SET l = l + [3] WHERE k = 0; " +
                 "APPLY BATCH");
 
         assertRows(execute("SELECT l FROM %s WHERE k = 0"),
                    row(list(1, 2, 3)));
 
         execute("BEGIN BATCH " +
-                "UPDATE %1$s SET l =[ 1 ] + l WHERE k = 1; " +
-                "UPDATE %1$s SET l = [ 2 ] + l WHERE k = 1; " +
-                "UPDATE %1$s SET l = [ 3 ] + l WHERE k = 1; " +
+                "UPDATE %1$s SET l = [1] + l WHERE k = 1; " +
+                "UPDATE %1$s SET l = [2] + l WHERE k = 1; " +
+                "UPDATE %1$s SET l = [3] + l WHERE k = 1; " +
                 "APPLY BATCH ");
 
         assertRows(execute("SELECT l FROM %s WHERE k = 1"),
                    row(list(3, 2, 1)));
     }
 
+    @Test
+    public void testBatchAndMap() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, m map<int, int>)");
+
+        execute("BEGIN BATCH " +
+                "UPDATE %1$s SET m[1] = 2 WHERE k = 0; " +
+                "UPDATE %1$s SET m[3] = 4 WHERE k = 0; " +
+                "APPLY BATCH");
+
+        assertRows(execute("SELECT m FROM %s WHERE k = 0"),
+                   row(map(1, 2, 3, 4)));
+    }
+
+    @Test
+    public void testBatchAndSet() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, s set<int>)");
+
+        execute("BEGIN BATCH " +
+                "UPDATE %1$s SET s = s + {1} WHERE k = 0; " +
+                "UPDATE %1$s SET s = s + {2} WHERE k = 0; " +
+                "UPDATE %1$s SET s = s + {3} WHERE k = 0; " +
+                "APPLY BATCH");
+
+        assertRows(execute("SELECT s FROM %s WHERE k = 0"),
+                   row(set(1, 2, 3)));
+    }
+
     /**
      * Migrated from cql_tests.py:TestCQL.bug_6115_test()
      */
diff --git a/test/unit/org/apache/cassandra/hints/HintMessageTest.java b/test/unit/org/apache/cassandra/hints/HintMessageTest.java
index bb015a8..3565a12 100644
--- a/test/unit/org/apache/cassandra/hints/HintMessageTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintMessageTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.hints;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -34,33 +35,74 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static junit.framework.Assert.assertEquals;
-
 import static org.apache.cassandra.hints.HintsTestUtil.assertHintsEqual;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class HintMessageTest
 {
     private static final String KEYSPACE = "hint_message_test";
     private static final String TABLE = "table";
 
-    @Test
-    public void testSerializer() throws IOException
+    @BeforeClass
+    public static void setup()
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+    }
 
+    @Test
+    public void testSerializer() throws IOException
+    {
         UUID hostId = UUID.randomUUID();
         long now = FBUtilities.timestampMicros();
+        TableMetadata table = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+        
+        Mutation mutation = 
+            new RowUpdateBuilder(table, now, bytes("key")).clustering("column").add("val", "val" + 1234).build();
+        
+        Hint hint = Hint.create(mutation, now / 1000);
+        HintMessage message = new HintMessage(hostId, hint);
 
+        // serialize
+        int serializedSize = (int) HintMessage.serializer.serializedSize(message, MessagingService.current_version);
+        HintMessage deserializedMessage;
+        
+        try (DataOutputBuffer dob = new DataOutputBuffer())
+        {
+            HintMessage.serializer.serialize(message, dob, MessagingService.current_version);
+            assertEquals(serializedSize, dob.getLength());
+
+            // deserialize
+            DataInputPlus di = new DataInputBuffer(dob.buffer(), true);
+            deserializedMessage = HintMessage.serializer.deserialize(di, MessagingService.current_version);
+        }
+
+        // compare before/after
+        assertEquals(hostId, deserializedMessage.hostId);
+        assertNotNull(deserializedMessage.hint);
+        assertHintsEqual(hint, deserializedMessage.hint);
+    }
+
+    @Test
+    public void testEncodedSerializer() throws IOException
+    {
+        UUID hostId = UUID.randomUUID();
+        long now = FBUtilities.timestampMicros();
         TableMetadata table = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+        
         Mutation mutation =
-            new RowUpdateBuilder(table, now, bytes("key"))
-                .clustering("column")
-                .add("val", "val" + 1234)
-                .build();
+            new RowUpdateBuilder(table, now, bytes("key")).clustering("column").add("val", "val" + 1234) .build();
+        
         Hint hint = Hint.create(mutation, now / 1000);
-        HintMessage message = new HintMessage(hostId, hint);
+        HintMessage.Encoded message;
+        
+        try (DataOutputBuffer dob = new DataOutputBuffer())
+        {
+            Hint.serializer.serialize(hint, dob, MessagingService.current_version);
+            message = new HintMessage.Encoded(hostId, dob.buffer(), MessagingService.current_version);
+        } 
 
         // serialize
         int serializedSize = (int) HintMessage.serializer.serializedSize(message, MessagingService.current_version);
@@ -69,11 +111,12 @@ public class HintMessageTest
         assertEquals(serializedSize, dob.getLength());
 
         // deserialize
-        DataInputPlus di = new DataInputBuffer(dob.buffer(), true);
-        HintMessage deserializedMessage = HintMessage.serializer.deserialize(di, MessagingService.current_version);
+        DataInputPlus dip = new DataInputBuffer(dob.buffer(), true);
+        HintMessage deserializedMessage = HintMessage.serializer.deserialize(dip, MessagingService.current_version);
 
         // compare before/after
         assertEquals(hostId, deserializedMessage.hostId);
-        assertHintsEqual(message.hint, deserializedMessage.hint);
+        assertNotNull(deserializedMessage.hint);
+        assertHintsEqual(hint, deserializedMessage.hint);
     }
 }
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index 3960bd0..8ab1bfa 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -31,7 +31,9 @@ import com.google.common.collect.Multimap;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,14 +47,18 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.locator.NetworkTopologyStrategy.REPLICATION_FACTOR;
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.junit.Assert.assertTrue;
 
 public class NetworkTopologyStrategyTest
 {
-    private String keyspaceName = "Keyspace1";
+    private static final String KEYSPACE = "Keyspace1";
     private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategyTest.class);
 
     @BeforeClass
@@ -76,7 +82,7 @@ public class NetworkTopologyStrategyTest
         configOptions.put("DC3", "1");
 
         // Set the localhost to the tokenmetadata. Embedded cassandra way?
-        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
         assert strategy.getReplicationFactor("DC1").allReplicas == 3;
         assert strategy.getReplicationFactor("DC2").allReplicas == 2;
         assert strategy.getReplicationFactor("DC3").allReplicas == 1;
@@ -101,7 +107,7 @@ public class NetworkTopologyStrategyTest
         configOptions.put("DC3", "0");
 
         // Set the localhost to the tokenmetadata. Embedded cassandra way?
-        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
         assert strategy.getReplicationFactor("DC1").allReplicas == 3;
         assert strategy.getReplicationFactor("DC2").allReplicas == 3;
         assert strategy.getReplicationFactor("DC3").allReplicas == 0;
@@ -144,7 +150,7 @@ public class NetworkTopologyStrategyTest
         }
         metadata.updateNormalTokens(tokens);
 
-        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
 
         for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"})
         {
@@ -418,7 +424,7 @@ public class NetworkTopologyStrategyTest
         Map<String, String> configOptions = new HashMap<String, String>();
         configOptions.put(snitch.getDatacenter((InetAddressAndPort) null), "3/1");
 
-        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
 
         Util.assertRCEquals(EndpointsForRange.of(fullReplica(endpoints.get(0), range(400, 100)),
                                                fullReplica(endpoints.get(1), range(400, 100)),
@@ -431,4 +437,50 @@ public class NetworkTopologyStrategyTest
                                                transientReplica(endpoints.get(3), range(100, 200))),
                             strategy.getNaturalReplicasForToken(tk(101)));
     }
+
+    @Rule
+    public ExpectedException expectedEx = ExpectedException.none();
+
+    @Test
+    public void shouldRejectReplicationFactorOption() throws ConfigurationException
+    {
+        expectedEx.expect(ConfigurationException.class);
+        expectedEx.expectMessage(REPLICATION_FACTOR + " should not appear");
+
+        IEndpointSnitch snitch = new SimpleSnitch();
+        TokenMetadata metadata = new TokenMetadata();
+        
+        Map<String, String> configOptions = new HashMap<>();
+        configOptions.put(REPLICATION_FACTOR, "1");
+
+        @SuppressWarnings("unused") 
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy("ks", metadata, snitch, configOptions);
+    }
+
+    @Test
+    public void shouldWarnOnHigherReplicationFactorThanNodesInDC()
+    {
+        HashMap<String, String> configOptions = new HashMap<>();
+        configOptions.put("DC1", "2");
+        
+        IEndpointSnitch snitch = new AbstractNetworkTopologySnitch()
+        {
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return "rack1";
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                return "DC1";
+            }
+        };
+
+        NetworkTopologyStrategy strategy = new NetworkTopologyStrategy("ks", new TokenMetadata(), snitch, configOptions);
+        StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), FBUtilities.getBroadcastAddressAndPort());
+        
+        ClientWarn.instance.captureWarnings();
+        strategy.maybeWarnOnOptions();
+        assertTrue(ClientWarn.instance.getWarnings().stream().anyMatch(s -> s.contains("Your replication factor")));
+    }
 }
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
index 7d85b44..55f54ad 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
@@ -24,10 +24,12 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.Gossiper;
+import org.assertj.core.api.Assertions;
+
+import static org.junit.Assert.assertEquals;
 
 public class ReplicationFactorTest
 {
-
     @BeforeClass
     public static void setupClass()
     {
@@ -36,7 +38,44 @@ public class ReplicationFactorTest
         Gossiper.instance.start(1);
     }
 
-    private static void assertRfParseFailure(String s)
+    @Test
+    public void shouldParseValidRF()
+    {
+        assertRfParse("0", 0, 0);
+        assertRfParse("3", 3, 0);
+        assertRfParse("3/1", 3, 1);
+        assertRfParse("5", 5, 0);
+        assertRfParse("5/2", 5, 2);
+    }
+
+    @Test
+    public void shouldFailOnInvalidRF()
+    {
+        assertRfParseFailure("-1", "Replication factor must be non-negative");
+        assertRfParseFailure("3/3", "Transient replicas must be zero, or less than total replication factor");
+        assertRfParseFailure("3/-1", "Amount of transient nodes should be strictly positive");
+        assertRfParseFailure("3/4", "Transient replicas must be zero, or less than total replication factor");
+        assertRfParseFailure("3/", "Replication factor format is <replicas> or <replicas>/<transient>");
+        assertRfParseFailure("1/a", "For input string");
+        assertRfParseFailure("a/1", "For input string");
+        assertRfParseFailure("", "For input string");
+    }
+
+    @Test
+    public void shouldRoundTripParseSimpleRF()
+    {
+        String rf = "3";
+        assertEquals(rf, ReplicationFactor.fromString(rf).toParseableString());
+    }
+
+    @Test
+    public void shouldRoundTripParseTransientRF()
+    {
+        String rf = "3/1";
+        assertEquals(rf, ReplicationFactor.fromString(rf).toParseableString());
+    }
+
+    private static void assertRfParseFailure(String s, String error)
     {
         try
         {
@@ -45,39 +84,15 @@ public class ReplicationFactorTest
         }
         catch (IllegalArgumentException e)
         {
-            // expected
+            Assertions.assertThat(e.getMessage()).contains(error);
         }
     }
 
     private static void assertRfParse(String s, int expectedReplicas, int expectedTrans)
     {
         ReplicationFactor rf = ReplicationFactor.fromString(s);
-        Assert.assertEquals(expectedReplicas, rf.allReplicas);
-        Assert.assertEquals(expectedTrans, rf.transientReplicas());
-        Assert.assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas);
-    }
-
-    @Test
-    public void parseTest()
-    {
-        assertRfParse("3", 3, 0);
-        assertRfParse("3/1", 3, 1);
-
-        assertRfParse("5", 5, 0);
-        assertRfParse("5/2", 5, 2);
-
-        assertRfParseFailure("-1");
-        assertRfParseFailure("3/3");
-        assertRfParseFailure("3/4");
-    }
-
-    @Test
-    public void roundTripParseTest()
-    {
-        String input = "3";
-        Assert.assertEquals(input, ReplicationFactor.fromString(input).toParseableString());
-
-        String transientInput = "3/1";
-        Assert.assertEquals(transientInput, ReplicationFactor.fromString(transientInput).toParseableString());
+        assertEquals(expectedReplicas, rf.allReplicas);
+        assertEquals(expectedTrans, rf.transientReplicas());
+        assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas);
     }
 }
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index 2e9e32d..6ccfcde 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.commons.lang3.StringUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -40,52 +37,33 @@ public class ReplicationStrategyEndpointCacheTest
     public static final String KEYSPACE = "ReplicationStrategyEndpointCacheTest";
 
     @BeforeClass
-    public static void defineSchema() throws Exception
+    public static void defineSchema()
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(5));
     }
 
-    public void setup(Class stratClass, Map<String, String> strategyOptions) throws Exception
+    public void setup() throws Exception
     {
         tmd = new TokenMetadata();
         searchToken = new BigIntegerToken(String.valueOf(15));
-
         strategy = getStrategyWithNewTokenMetadata(Keyspace.open(KEYSPACE).getReplicationStrategy(), tmd);
 
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddressAndPort.getByName("127.0.0.1"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddressAndPort.getByName("127.0.0.2"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddressAndPort.getByName("127.0.0.3"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddressAndPort.getByName("127.0.0.4"));
-        //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddressAndPort.getByName("127.0.0.5", null, null));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddressAndPort.getByName("127.0.0.6"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddressAndPort.getByName("127.0.0.7"));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddressAndPort.getByName("127.0.0.8"));
+        for (int i = 1; i <= 8; i++)
+            tmd.updateNormalToken(new BigIntegerToken(String.valueOf(i * 10)), InetAddressAndPort.getByName("127.0.0." + i));
     }
 
     @Test
     public void testEndpointsWereCached() throws Exception
     {
-        runEndpointsWereCachedTest(FakeSimpleStrategy.class, null);
-        runEndpointsWereCachedTest(FakeNetworkTopologyStrategy.class, new HashMap<String, String>());
-    }
-
-    public void runEndpointsWereCachedTest(Class stratClass, Map<String, String> configOptions) throws Exception
-    {
-        setup(stratClass, configOptions);
+        setup();
         Util.assertRCEquals(strategy.getNaturalReplicasForToken(searchToken), strategy.getNaturalReplicasForToken(searchToken));
     }
 
     @Test
     public void testCacheRespectsTokenChanges() throws Exception
     {
-        runCacheRespectsTokenChangesTest(SimpleStrategy.class, null);
-        runCacheRespectsTokenChangesTest(NetworkTopologyStrategy.class, new HashMap<String, String>());
-    }
-
-    public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception
-    {
-        setup(stratClass, configOptions);
+        setup();
         EndpointsForToken initial;
         EndpointsForToken replicas;
 
@@ -116,40 +94,6 @@ public class ReplicationStrategyEndpointCacheTest
         Util.assertNotRCEquals(replicas, initial);
     }
 
-    protected static class FakeSimpleStrategy extends SimpleStrategy
-    {
-        private boolean called = false;
-
-        public FakeSimpleStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
-        {
-            super(keyspaceName, tokenMetadata, snitch, configOptions);
-        }
-
-        public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
-        {
-            assert !called : "calculateNaturalReplicas was already called, result should have been cached";
-            called = true;
-            return super.calculateNaturalReplicas(token, metadata);
-        }
-    }
-
-    protected static class FakeNetworkTopologyStrategy extends NetworkTopologyStrategy
-    {
-        private boolean called = false;
-
-        public FakeNetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
-        {
-            super(keyspaceName, tokenMetadata, snitch, configOptions);
-        }
-
-        public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
-        {
-            assert !called : "calculateNaturalReplicas was already called, result should have been cached";
-            called = true;
-            return super.calculateNaturalReplicas(token, metadata);
-        }
-    }
-
     private AbstractReplicationStrategy getStrategyWithNewTokenMetadata(AbstractReplicationStrategy strategy, TokenMetadata newTmd) throws ConfigurationException
     {
         return AbstractReplicationStrategy.createReplicationStrategy(
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index 9e24de7..4c1ff26 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
@@ -48,9 +49,12 @@ import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.StorageServiceAccessor;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -316,8 +320,37 @@ public class SimpleStrategyTest
 
         Map<String, String> configOptions = new HashMap<>();
 
+        @SuppressWarnings("unused")
         SimpleStrategy strategy = new SimpleStrategy("ks", metadata, snitch, configOptions);
     }
+    
+    @Test
+    public void shouldReturnNoEndpointsForEmptyRing()
+    {
+        TokenMetadata metadata = new TokenMetadata();
+        
+        HashMap<String, String> configOptions = new HashMap<>();
+        configOptions.put("replication_factor", "1");
+        
+        SimpleStrategy strategy = new SimpleStrategy("ks", metadata, new SimpleSnitch(), configOptions);
+
+        EndpointsForRange replicas = strategy.calculateNaturalReplicas(null, metadata);
+        assertTrue(replicas.endpoints().isEmpty());
+    }
+
+    @Test
+    public void shouldWarnOnHigherReplicationFactorThanNodes()
+    {
+        HashMap<String, String> configOptions = new HashMap<>();
+        configOptions.put("replication_factor", "2");
+
+        SimpleStrategy strategy = new SimpleStrategy("ks", new TokenMetadata(), new SimpleSnitch(), configOptions);
+        StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), FBUtilities.getBroadcastAddressAndPort());
+        
+        ClientWarn.instance.captureWarnings();
+        strategy.maybeWarnOnOptions();
+        assertTrue(ClientWarn.instance.getWarnings().stream().anyMatch(s -> s.contains("Your replication factor")));
+    }
 
     private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd, IEndpointSnitch snitch)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org