You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/03/08 17:10:41 UTC
[cassandra] branch trunk updated: Improve replication tests
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 953c18d Improve replication tests
953c18d is described below
commit 953c18df33ab3e009ced15a16785e2753843418a
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Mon Mar 8 17:08:36 2021 +0000
Improve replication tests
patch by Caleb Rackliffe; reviewed by Andrés de la Peña and Ekaterina Dimitrova for CASSANDRA-16181
---
src/java/org/apache/cassandra/batchlog/Batch.java | 27 +++-
.../apache/cassandra/batchlog/BatchlogManager.java | 5 +-
.../org/apache/cassandra/hints/HintMessage.java | 1 -
.../org/apache/cassandra/hints/HintsService.java | 6 +
.../locator/AbstractReplicationStrategy.java | 24 ++--
.../apache/cassandra/locator/LocalStrategy.java | 5 +-
.../cassandra/locator/NetworkTopologyStrategy.java | 7 +-
.../cassandra/locator/ReplicationFactor.java | 10 +-
.../apache/cassandra/locator/SimpleStrategy.java | 5 +-
.../cassandra/distributed/action/GossipHelper.java | 13 +-
.../distributed/impl/AbstractCluster.java | 2 +-
.../cassandra/distributed/impl/Instance.java | 49 ++++++-
.../test/HintedHandoffAddRemoveNodesTest.java | 156 ++++++++++++++++++++
.../test/HintedHandoffNodetoolTest.java | 149 +++++++++++++++++++
.../cassandra/distributed/test/TestBaseImpl.java | 16 ++-
.../distributed/test/ring/BootstrapTest.java | 11 +-
.../ring/CommunicationDuringDecommissionTest.java | 4 +-
.../upgrade/MixedModeBatchTestBase.java | 157 +++++++++++++++++++++
.../upgrade/MixedModeFrom2LoggedBatchTest.java | 38 +++++
.../upgrade/MixedModeFrom2ReplicationTest.java | 38 +++++
.../upgrade/MixedModeFrom2UnloggedBatchTest.java | 38 +++++
.../upgrade/MixedModeFrom3LoggedBatchTest.java | 44 ++++++
.../upgrade/MixedModeFrom3ReplicationTest.java | 44 ++++++
.../upgrade/MixedModeFrom3UnloggedBatchTest.java | 44 ++++++
.../upgrade/MixedModeReplicationTestBase.java | 83 +++++++++++
.../distributed/upgrade/UpgradeTestBase.java | 31 ++++
.../batchlog/BatchlogEndpointFilterTest.java | 41 +++++-
.../cassandra/batchlog/BatchlogManagerTest.java | 5 +-
.../cql3/validation/operations/BatchTest.java | 60 ++++++--
.../apache/cassandra/hints/HintMessageTest.java | 67 +++++++--
.../locator/NetworkTopologyStrategyTest.java | 62 +++++++-
.../cassandra/locator/ReplicationFactorTest.java | 75 ++++++----
.../ReplicationStrategyEndpointCacheTest.java | 68 +--------
.../cassandra/locator/SimpleStrategyTest.java | 33 +++++
34 files changed, 1237 insertions(+), 181 deletions(-)
diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
index fb6c5d5..7b205e0 100644
--- a/src/java/org/apache/cassandra/batchlog/Batch.java
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -65,6 +67,7 @@ public final class Batch
*
* The mutations will always be encoded using the current messaging version.
*/
+ @SuppressWarnings("RedundantTypeArguments")
public static Batch createRemote(UUID id, long creationTime, Collection<ByteBuffer> mutations)
{
return new Batch(id, creationTime, Collections.<Mutation>emptyList(), mutations);
@@ -77,12 +80,30 @@ public final class Batch
{
return decodedMutations.size() + encodedMutations.size();
}
+
+ @VisibleForTesting
+ public Collection<ByteBuffer> getEncodedMutations()
+ {
+ return encodedMutations;
+ }
- static final class Serializer implements IVersionedSerializer<Batch>
+ /**
+ * Local batches contain only already decoded {@link Mutation} instances. Unlike remote
+ * batches, which contain mutations encoded as {@link ByteBuffer} instances, local batches
+ * can be serialized and sent over the wire.
+ *
+ * @return {@code true} if there are no encoded mutations present, and {@code false} otherwise
+ */
+ public boolean isLocal()
+ {
+ return encodedMutations.isEmpty();
+ }
+
+ public static final class Serializer implements IVersionedSerializer<Batch>
{
public long serializedSize(Batch batch, int version)
{
- assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+ assert batch.isLocal() : "attempted to serialize a 'remote' batch";
long size = UUIDSerializer.serializer.serializedSize(batch.id, version);
size += sizeof(batch.creationTime);
@@ -100,7 +121,7 @@ public final class Batch
public void serialize(Batch batch, DataOutputPlus out, int version) throws IOException
{
- assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+ assert batch.isLocal() : "attempted to serialize a 'remote' batch";
UUIDSerializer.serializer.serialize(batch.id, out, version);
out.writeLong(batch.creationTime);
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index f140332..9a009dc 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -447,8 +447,7 @@ public class BatchlogManager implements BatchlogManagerMBean
for (Mutation mutation : mutations)
{
ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
- if (handler != null)
- handlers.add(handler);
+ handlers.add(handler);
}
return handlers;
}
@@ -457,7 +456,7 @@ public class BatchlogManager implements BatchlogManagerMBean
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
* when a replica is down or a write request times out.
*
- * @return direct delivery handler to wait on or null, if no live nodes found
+ * @return direct delivery handler to wait on
*/
private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
long writtenAt,
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index 60adb85..60d7641 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -24,7 +24,6 @@ import java.util.Objects;
import java.util.UUID;
import javax.annotation.Nullable;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import org.apache.cassandra.db.TypeSizes;
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 60de478..f7c4c9b 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -393,4 +393,10 @@ public final class HintsService implements HintsServiceMBean
{
return isShutDown;
}
+
+ @VisibleForTesting
+ public boolean isDispatchPaused()
+ {
+ return isDispatchPaused.get();
+ }
}
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index dfcfdc0..7891895 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -99,12 +99,12 @@ public abstract class AbstractReplicationStrategy
* @param searchPosition the position the natural endpoints are requested for
* @return a copy of the natural endpoints for the given token
*/
- public EndpointsForToken getNaturalReplicasForToken(RingPosition searchPosition)
+ public EndpointsForToken getNaturalReplicasForToken(RingPosition<?> searchPosition)
{
return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken());
}
- public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
+ public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition)
{
Token searchToken = searchPosition.getToken();
Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
@@ -121,7 +121,7 @@ public abstract class AbstractReplicationStrategy
return endpoints;
}
- public Replica getLocalReplicaFor(RingPosition searchPosition)
+ public Replica getLocalReplicaFor(RingPosition<?> searchPosition)
{
return getNaturalReplicas(searchPosition)
.byEndpoint()
@@ -160,7 +160,7 @@ public abstract class AbstractReplicationStrategy
long queryStartNanoTime,
ConsistencyLevel idealConsistencyLevel)
{
- AbstractWriteResponseHandler resultResponseHandler;
+ AbstractWriteResponseHandler<T> resultResponseHandler;
if (replicaPlan.consistencyLevel().isDatacenterLocal())
{
// block for in this context will be localnodes block.
@@ -188,11 +188,11 @@ public abstract class AbstractReplicationStrategy
else
{
//Construct a delegate response handler to use to track the ideal consistency level
- AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
- callback,
- writeType,
- queryStartNanoTime,
- idealConsistencyLevel);
+ AbstractWriteResponseHandler<T> idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
+ callback,
+ writeType,
+ queryStartNanoTime,
+ idealConsistencyLevel);
resultResponseHandler.setIdealCLResponseHandler(idealHandler);
}
}
@@ -319,7 +319,7 @@ public abstract class AbstractReplicationStrategy
throws ConfigurationException
{
AbstractReplicationStrategy strategy;
- Class [] parameterTypes = new Class[] {String.class, TokenMetadata.class, IEndpointSnitch.class, Map.class};
+ Class<?>[] parameterTypes = new Class[] {String.class, TokenMetadata.class, IEndpointSnitch.class, Map.class};
try
{
Constructor<? extends AbstractReplicationStrategy> constructor = strategyClass.getConstructor(parameterTypes);
@@ -436,7 +436,7 @@ public abstract class AbstractReplicationStrategy
if (rf.hasTransientReplicas())
{
if (DatabaseDescriptor.getNumTokens() > 1)
- throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet"));
+ throw new ConfigurationException("Transient replication is not supported with vnodes yet");
}
}
catch (IllegalArgumentException e)
@@ -447,7 +447,7 @@ public abstract class AbstractReplicationStrategy
protected void validateExpectedOptions() throws ConfigurationException
{
- Collection expectedOptions = recognizedOptions();
+ Collection<String> expectedOptions = recognizedOptions();
if (expectedOptions == null)
return;
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index e7da769..0e3a918 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -50,7 +50,7 @@ public class LocalStrategy extends AbstractReplicationStrategy
* LocalStrategy may be used before tokens are set up.
*/
@Override
- public EndpointsForRange getNaturalReplicas(RingPosition searchPosition)
+ public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition)
{
return replicas;
}
@@ -74,9 +74,10 @@ public class LocalStrategy extends AbstractReplicationStrategy
{
}
+ @Override
public Collection<String> recognizedOptions()
{
// LocalStrategy doesn't expect any options.
- return Collections.<String>emptySet();
+ return Collections.emptySet();
}
}
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 9e6ad6d..9029dc1 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -55,10 +55,11 @@ import com.google.common.collect.Multimap;
*/
public class NetworkTopologyStrategy extends AbstractReplicationStrategy
{
+ public static final String REPLICATION_FACTOR = "replication_factor";
+
private final Map<String, ReplicationFactor> datacenters;
private final ReplicationFactor aggregateRf;
private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class);
- private static final String REPLICATION_FACTOR = "replication_factor";
public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
{
@@ -170,6 +171,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
/**
* calculate endpoints in one pass through the tokens by tracking our progress in each DC.
*/
+ @Override
public EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata)
{
// we want to preserve insertion order so that the first added endpoint becomes primary
@@ -229,6 +231,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
return collection != null ? collection.size() : 0;
}
+ @Override
public ReplicationFactor getReplicationFactor()
{
return aggregateRf;
@@ -245,6 +248,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
return datacenters.keySet();
}
+ @Override
public Collection<String> recognizedOptions()
{
// only valid options are valid DC names.
@@ -286,6 +290,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
options.values().removeAll(Collections.singleton("0"));
}
+ @Override
protected void validateExpectedOptions() throws ConfigurationException
{
// Do not accept query with no data centers specified.
diff --git a/src/java/org/apache/cassandra/locator/ReplicationFactor.java b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
index 91ce770..ee971d9 100644
--- a/src/java/org/apache/cassandra/locator/ReplicationFactor.java
+++ b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
@@ -76,11 +76,11 @@ public class ReplicationFactor
.filter(endpoint -> Gossiper.instance.getReleaseVersion(endpoint) != null && Gossiper.instance.getReleaseVersion(endpoint).major < 4)
.collect(Collectors.toList());
if (!badVersionEndpoints.isEmpty())
- throw new AssertionError("Transient replication is not supported in mixed version clusters with nodes < 4.0. Bad nodes: " + badVersionEndpoints);
+ throw new IllegalArgumentException("Transient replication is not supported in mixed version clusters with nodes < 4.0. Bad nodes: " + badVersionEndpoints);
}
else if (transientRF < 0)
{
- throw new AssertionError(String.format("Amount of transient nodes should be strictly positive, but was: '%d'", transientRF));
+ throw new IllegalArgumentException(String.format("Amount of transient nodes should be strictly positive, but was: '%d'", transientRF));
}
}
@@ -114,17 +114,17 @@ public class ReplicationFactor
String[] parts = s.split("/");
Preconditions.checkArgument(parts.length == 2,
"Replication factor format is <replicas> or <replicas>/<transient>");
- return new ReplicationFactor(Integer.valueOf(parts[0]), Integer.valueOf(parts[1]));
+ return new ReplicationFactor(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]));
}
else
{
- return new ReplicationFactor(Integer.valueOf(s), 0);
+ return new ReplicationFactor(Integer.parseInt(s), 0);
}
}
public String toParseableString()
{
- return String.valueOf(allReplicas) + (hasTransientReplicas() ? "/" + transientReplicas() : "");
+ return allReplicas + (hasTransientReplicas() ? "/" + transientReplicas() : "");
}
@Override
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 672c9ff..928ac97 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -53,6 +53,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy
this.rf = ReplicationFactor.fromString(this.configOptions.get(REPLICATION_FACTOR));
}
+ @Override
public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
{
ArrayList<Token> ring = metadata.sortedTokens();
@@ -78,12 +79,13 @@ public class SimpleStrategy extends AbstractReplicationStrategy
return replicas.build();
}
+ @Override
public ReplicationFactor getReplicationFactor()
{
return rf;
}
- private final static void validateOptionsInternal(Map<String, String> configOptions) throws ConfigurationException
+ private static void validateOptionsInternal(Map<String, String> configOptions) throws ConfigurationException
{
if (configOptions.get(REPLICATION_FACTOR) == null)
throw new ConfigurationException("SimpleStrategy requires a replication_factor strategy option.");
@@ -116,6 +118,7 @@ public class SimpleStrategy extends AbstractReplicationStrategy
}
}
+ @Override
public Collection<String> recognizedOptions()
{
return Collections.singleton(REPLICATION_FACTOR);
diff --git a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index 255f6e2..dc4f89c 100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -30,9 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.IInstance;
@@ -48,8 +46,6 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.MigrationCoordinator;
-import org.apache.cassandra.schema.MigrationManager;
-import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -274,11 +270,16 @@ public class GossipHelper
}
}
- public static InstanceAction decomission()
+ public static InstanceAction decommission()
{
- return (target) -> target.nodetoolResult("decommission").asserts().success();
+ return decommission(false);
}
+ public static InstanceAction decommission(boolean force)
+ {
+ return force ? (target) -> target.nodetoolResult("decommission", "--force").asserts().success()
+ : (target) -> target.nodetoolResult("decommission").asserts().success();
+ }
public static VersionedApplicationState tokens(IInvokableInstance instance)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index babc2c3..8026357 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -111,7 +111,7 @@ import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAnd
*/
public abstract class AbstractCluster<I extends IInstance> implements ICluster<I>, AutoCloseable
{
- public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());;
+ public static Versions.Version CURRENT_VERSION = new Versions.Version(FBUtilities.getReleaseVersionString(), Versions.getClassPath());
// WARNING: we have this logger not (necessarily) for logging, but
// to ensure we have instantiated the main classloader's LoggerFactory (and any LogbackStatusListener)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b79ccbc..4a865a9 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.security.Permission;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,6 +45,7 @@ import javax.management.NotificationListener;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.GlobalEventExecutor;
+import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -92,6 +94,7 @@ import org.apache.cassandra.io.sstable.IndexSummaryManager;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
@@ -125,6 +128,7 @@ import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.UUIDSerializer;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
import org.apache.cassandra.utils.memory.BufferPools;
@@ -135,6 +139,7 @@ import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.fromCassandraInetAddressAndPort;
import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
+import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
public class Instance extends IsolatedExecutor implements IInvokableInstance
{
@@ -319,11 +324,33 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
try (DataOutputBuffer out = new DataOutputBuffer(1024))
{
+ // On a 4.0+ node, C* makes a distinction between "local" and "remote" batches, where only the former can
+ // be serialized and sent to a remote node, where they are deserialized and written to the batch commitlog
+ // without first being converted into mutation objects. Batch serialization is therfore not symmetric, and
+ // we use a special procedure here that "re-serializes" a "remote" batch to build the message.
+ if (fromVersion >= MessagingService.VERSION_40 && messageOut.verb().id == BATCH_STORE_REQ.id)
+ {
+ Object maybeBatch = messageOut.payload;
+
+ if (maybeBatch instanceof Batch)
+ {
+ Batch batch = (Batch) maybeBatch;
+
+ // If the batch is local, it can be serialized along the normal path.
+ if (!batch.isLocal())
+ {
+ reserialize(batch, out, toVersion);
+ byte[] bytes = out.toByteArray();
+ return new MessageImpl(messageOut.verb().id, bytes, messageOut.id(), toVersion, fromCassandraInetAddressAndPort(from));
+ }
+ }
+ }
+
Message.serializer.serialize(messageOut, out, toVersion);
byte[] bytes = out.toByteArray();
if (messageOut.serializedSize(toVersion) != bytes.length)
throw new AssertionError(String.format("Message serializedSize(%s) does not match what was written with serialize(out, %s) for verb %s and serializer %s; " +
- "expected %s, actual %s", toVersion, toVersion, messageOut.verb(), messageOut.serializer.getClass(),
+ "expected %s, actual %s", toVersion, toVersion, messageOut.verb(), Message.serializer.getClass(),
messageOut.serializedSize(toVersion), bytes.length));
return new MessageImpl(messageOut.verb().id, bytes, messageOut.id(), toVersion, fromCassandraInetAddressAndPort(from));
}
@@ -333,6 +360,26 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
}
+ /**
+ * Only "local" batches can be passed through {@link Batch.Serializer#serialize(Batch, DataOutputPlus, int)} and
+ * sent to a remote node during normal operation, but there are testing scenarios where we may intercept and
+ * forward a "remote" batch. This method allows us to put the already encoded mutations back onto a stream.
+ */
+ private static void reserialize(Batch batch, DataOutputPlus out, int version) throws IOException
+ {
+ assert !batch.isLocal() : "attempted to reserialize a 'local' batch";
+
+ UUIDSerializer.serializer.serialize(batch.id, out, version);
+ out.writeLong(batch.creationTime);
+
+ out.writeUnsignedVInt(batch.getEncodedMutations().size());
+
+ for (ByteBuffer mutation : batch.getEncodedMutations())
+ {
+ out.write(mutation);
+ }
+ }
+
@VisibleForTesting
public static Message<?> deserializeMessage(IMessage message)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
new file mode 100644
index 0000000..4bd5d44
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffAddRemoveNodesTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.apache.cassandra.distributed.action.GossipHelper;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.service.StorageService;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.cassandra.distributed.action.GossipHelper.decommission;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests around removing and adding nodes from and to a cluster while hints are still outstanding.
+ */
+public class HintedHandoffAddRemoveNodesTest extends TestBaseImpl
+{
+ /**
+ * Replaces Python dtest {@code hintedhandoff_test.py:TestHintedHandoff.test_hintedhandoff_decom()}.
+ */
+ @Test
+ public void shouldStreamHintsDuringDecommission() throws Exception
+ {
+ try (Cluster cluster = builder().withNodes(4)
+ .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.decom_hint_test (key int PRIMARY KEY, value int)"));
+
+ cluster.get(4).shutdown().get();
+
+ // Write data using the second node as the coordinator...
+ populate(cluster, "decom_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
+ Long totalHints = countTotalHints(cluster);
+ // ...and verify that we've accumulated hints intended for node 4, which is down.
+ assertThat(totalHints).isGreaterThan(0);
+
+ // Decomision node 1...
+ assertEquals(4, endpointsKnownTo(cluster, 2));
+ cluster.run(decommission(), 1);
+ await().pollDelay(1, SECONDS).until(() -> endpointsKnownTo(cluster, 2) == 3);
+ // ...and verify that all data still exists on either node 2 or 3.
+ verify(cluster, "decom_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
+
+ // Start node 4 back up and verify that all hints were delivered.
+ cluster.get(4).startup();
+ await().atMost(30, SECONDS).pollDelay(3, SECONDS).until(() -> count(cluster, "decom_hint_test", 4).equals(totalHints));
+
+ // Now decommission both nodes 2 and 3...
+ cluster.run(GossipHelper.decommission(true), 2);
+ cluster.run(GossipHelper.decommission(true), 3);
+ await().pollDelay(1, SECONDS).until(() -> endpointsKnownTo(cluster, 4) == 1);
+ // ...and verify that even if we drop below the replication factor of 2, all data has been preserved.
+ verify(cluster, "decom_hint_test", 4, 0, 128, ConsistencyLevel.ONE);
+ }
+ }
+
+ @Test
+ public void shouldBootstrapWithHintsOutstanding() throws Exception
+ {
+ try (Cluster cluster = builder().withNodes(3)
+ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
+ .withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2}"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.boot_hint_test (key int PRIMARY KEY, value int)"));
+
+ cluster.get(3).shutdown().get();
+
+ // Write data using the second node as the coordinator...
+ populate(cluster, "boot_hint_test", 2, 0, 128, ConsistencyLevel.ONE);
+ Long totalHints = countTotalHints(cluster);
+ // ...and verify that we've accumulated hints intended for node 3, which is down.
+ assertThat(totalHints).isGreaterThan(0);
+
+ // Bootstrap a new/4th node into the cluster...
+ bootstrapAndJoinNode(cluster);
+
+ // ...and verify that all data is available.
+ verify(cluster, "boot_hint_test", 4, 0, 128, ConsistencyLevel.ONE);
+
+ // Finally, bring node 3 back up and verify that all hints were delivered.
+ cluster.get(3).startup();
+ await().atMost(30, SECONDS).pollDelay(3, SECONDS).until(() -> count(cluster, "boot_hint_test", 3).equals(totalHints));
+ verify(cluster, "boot_hint_test", 3, 0, 128, ConsistencyLevel.ONE);
+ verify(cluster, "boot_hint_test", 3, 0, 128, ConsistencyLevel.TWO);
+ }
+ }
+
+ @SuppressWarnings("Convert2MethodRef")
+ private Long countTotalHints(Cluster cluster)
+ {
+ return cluster.get(2).callOnInstance(() -> StorageMetrics.totalHints.getCount());
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private void populate(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
+ {
+ for (int i = start; i < start + count; i++)
+ cluster.coordinator(coordinator)
+ .execute("INSERT INTO " + KEYSPACE + '.' + table + " (key, value) VALUES (?, ?)", cl, i, i);
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ private void verify(Cluster cluster, String table, int coordinator, int start, int count, ConsistencyLevel cl)
+ {
+ for (int i = start; i < start + count; i++)
+ {
+ Object[][] row = cluster.coordinator(coordinator)
+ .execute("SELECT key, value FROM " + KEYSPACE + '.' + table + " WHERE key = ?", cl, i);
+ assertRows(row, row(i, i));
+ }
+ }
+
+ private Long count(Cluster cluster, String table, int node)
+ {
+ return (Long) cluster.get(node).executeInternal("SELECT COUNT(*) FROM " + KEYSPACE + '.' + table)[0][0];
+ }
+
+ private int endpointsKnownTo(Cluster cluster, int node)
+ {
+ return cluster.get(node).callOnInstance(() -> StorageService.instance.getTokenMetadata().getAllEndpoints().size());
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffNodetoolTest.java b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffNodetoolTest.java
new file mode 100644
index 0000000..5045301
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/HintedHandoffNodetoolTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.service.StorageProxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests replace the nodetool-related Python dtests in hintedhandoff_test.py.
+ */
+@RunWith(Parameterized.class)
+public class HintedHandoffNodetoolTest extends TestBaseImpl
+{
+ private static Cluster cluster;
+
+ @Parameterized.Parameter
+ public int node;
+
+ @Parameterized.Parameters(name = "node={0}")
+ public static List<Object[]> data()
+ {
+ List<Object[]> result = new ArrayList<>();
+ result.add(new Object[]{ 1 });
+ result.add(new Object[]{ 2 });
+ return result;
+ }
+
+ @BeforeClass
+ public static void before() throws IOException
+ {
+ cluster = init(Cluster.build().withNodes(2).withDCs(2).start());
+ }
+
+ @AfterClass
+ public static void after()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @Before
+ public void enableHandoff()
+ {
+ cluster.get(1).nodetoolResult("enablehandoff");
+ cluster.get(2).nodetoolResult("enablehandoff");
+ }
+
+ @SuppressWarnings("Convert2MethodRef")
+ @Test
+ public void testEnableHandoff()
+ {
+ cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+ assertTrue(cluster.get(node).callOnInstance(() -> StorageProxy.instance.getHintedHandoffEnabled()));
+ }
+
+ @Test
+ public void testDisableHandoff()
+ {
+ cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+ cluster.get(node).nodetoolResult("disablehandoff");
+ cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is not running");
+ cluster.get(node).nodetoolResult("enablehandoff");
+ cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+ }
+
+ @Test
+ public void testDisableForDC()
+ {
+ cluster.get(node).nodetoolResult("disablehintsfordc", "datacenter1");
+ cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Data center datacenter1 is disabled");
+ cluster.get(node).nodetoolResult("enablehintsfordc", "datacenter1");
+ cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutNotContains("Data center datacenter1 is disabled");
+ }
+
+ @Test
+ public void testPauseHandoff()
+ {
+ cluster.get(node).nodetoolResult("statushandoff").asserts().success().stdoutContains("Hinted handoff is running");
+
+ cluster.get(node).nodetoolResult("pausehandoff").asserts().success();
+ Boolean isPaused = cluster.get(node).callOnInstance(() -> HintsService.instance.isDispatchPaused());
+ assertTrue(isPaused);
+
+ cluster.get(node).nodetoolResult("resumehandoff").asserts().success();
+ isPaused = cluster.get(node).callOnInstance(() -> HintsService.instance.isDispatchPaused());
+ assertFalse(isPaused);
+ }
+
+ @Test
+ public void testThrottle()
+ {
+ Integer throttleInKB = cluster.get(node).callOnInstance(DatabaseDescriptor::getHintedHandoffThrottleInKB);
+ cluster.get(node).nodetoolResult("sethintedhandoffthrottlekb", String.valueOf(throttleInKB * 2)).asserts().success();
+ Integer newThrottleInKB = cluster.get(node).callOnInstance(DatabaseDescriptor::getHintedHandoffThrottleInKB);
+ assertEquals(throttleInKB * 2, newThrottleInKB.intValue());
+ }
+
+ @SuppressWarnings("Convert2MethodRef")
+ @Test
+ public void testMaxHintWindow()
+ {
+ Integer hintWindowMillis = cluster.get(node).callOnInstance(() -> StorageProxy.instance.getMaxHintWindow());
+
+ cluster.get(node).nodetoolResult("getmaxhintwindow")
+ .asserts()
+ .success()
+ .stdoutContains("Current max hint window: " + hintWindowMillis + " ms");
+
+ cluster.get(node).nodetoolResult("setmaxhintwindow", String.valueOf(hintWindowMillis * 2)).asserts().success();
+
+ cluster.get(node).nodetoolResult("getmaxhintwindow")
+ .asserts()
+ .success()
+ .stdoutContains("Current max hint window: " + hintWindowMillis * 2 + " ms");
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 37f43bd..343ccc8 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -23,7 +23,6 @@ import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -38,8 +37,13 @@ import org.apache.cassandra.cql3.Duration;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
+import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
+import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+
public class TestBaseImpl extends DistributedTestBase
{
public static final Object[][] EMPTY_ROWS = new Object[0][];
@@ -100,6 +104,16 @@ public class TestBaseImpl extends DistributedTestBase
return TupleType.buildValue(bbs);
}
+ protected void bootstrapAndJoinNode(Cluster cluster)
+ {
+ IInstanceConfig config = cluster.newInstanceConfig();
+ config.set("auto_bootstrap", true);
+ IInvokableInstance newInstance = cluster.bootstrap(config);
+ withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), Integer.toString(90 * 1000),
+ () -> withProperty("cassandra.join_ring", false, () -> newInstance.startup(cluster)));
+ newInstance.nodetoolResult("join").asserts().success();
+ }
+
@SuppressWarnings("unchecked")
private static ByteBuffer makeByteBuffer(Object value)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index 15a8d00..81861ab 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import static java.util.Arrays.asList;
-import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
@@ -90,15 +89,7 @@ public class BootstrapTest extends TestBaseImpl
.start())
{
populate(cluster,0, 100);
-
- IInstanceConfig config = cluster.newInstanceConfig();
- config.set("auto_bootstrap", true);
- IInvokableInstance newInstance = cluster.bootstrap(config);
- withProperty(BOOTSTRAP_SCHEMA_DELAY_MS.getKey(), Integer.toString(90 * 1000),
- () -> withProperty("cassandra.join_ring", false,
- () -> newInstance.startup(cluster)));
-
- newInstance.nodetoolResult("join").asserts().success();
+ bootstrapAndJoinNode(cluster);
for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
index 37b1802..d6825d9 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.test.TestBaseImpl;
-import static org.apache.cassandra.distributed.action.GossipHelper.decomission;
+import static org.apache.cassandra.distributed.action.GossipHelper.decommission;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
@@ -45,7 +45,7 @@ public class CommunicationDuringDecommissionTest extends TestBaseImpl
{
BootstrapTest.populate(cluster, 0, 100);
- cluster.run(decomission(), 1);
+ cluster.run(decommission(), 1);
cluster.filters().allVerbs().from(1).messagesMatching((i, i1, iMessage) -> {
throw new AssertionError("Decomissioned node should not send any messages");
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeBatchTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeBatchTestBase.java
new file mode 100644
index 0000000..f18647c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeBatchTestBase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
+import static org.apache.cassandra.net.Verb.REQUEST_RSP;
+
+/**
+ * A base class for testing the replication of logged/unlogged batches on mixed-version clusters.
+ *
+ * The tests based on this class partially replace the Python dtests in batch_test.py created for CASSANDRA-9673.
+ */
+public class MixedModeBatchTestBase extends UpgradeTestBase
+{
+ private static final int KEYS_PER_BATCH = 10;
+
+ protected void testSimpleStrategy(Versions.Major from, Versions.Major to, boolean isLogged) throws Throwable
+ {
+ String insert = "INSERT INTO test_simple.names (key, name) VALUES (%d, '%s')";
+ String select = "SELECT * FROM test_simple.names WHERE key = ?";
+
+ new TestCase()
+ .nodes(3)
+ .nodesToUpgrade(1, 2)
+ .upgrade(from, to)
+ .setup(cluster -> {
+ cluster.schemaChange("CREATE KEYSPACE test_simple WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};");
+ cluster.schemaChange("CREATE TABLE test_simple.names (key int PRIMARY KEY, name text)");
+ })
+ .runAfterNodeUpgrade((cluster, upgraded) -> {
+ if (isLogged)
+ {
+ // If we're testing logged batches, exercise the case were batchlog writes fail.
+ IMessageFilters.Filter dropBatchlogWrite = cluster.filters().inbound().verbs(BATCH_STORE_REQ.id, REQUEST_RSP.id).drop();
+ dropBatchlogWrite.on();
+ testBatches(true, true, insert, select, cluster, upgraded);
+ cluster.filters().reset();
+ }
+
+ cluster.coordinator(1).execute("TRUNCATE test_simple.names", ConsistencyLevel.ALL);
+ testBatches(isLogged, false, insert, select, cluster, upgraded);
+ cluster.coordinator(1).execute("TRUNCATE test_simple.names", ConsistencyLevel.ALL);
+ })
+ .run();
+ }
+
+ private void testBatches(boolean isLogged, boolean failBatchlog, String insert, String select, UpgradeableCluster cluster, int upgraded)
+ {
+ List<Long> initialTokens = new ArrayList<>(cluster.size());
+
+ for (int i = 1; i <= cluster.size(); i++)
+ initialTokens.add(Long.valueOf(cluster.get(i).config().get("initial_token").toString()));
+
+ // Exercise all the coordinators...
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ StringBuilder batchBuilder = new StringBuilder("BEGIN " + (isLogged ? "" : "UNLOGGED ") + "BATCH\n");
+ String name = "Julia";
+ Runnable[] tests = new Runnable[KEYS_PER_BATCH];
+
+ // ...and sample enough keys that we cover the ring.
+ for (int j = 0; j < KEYS_PER_BATCH; j++)
+ {
+ int key = j + (i * KEYS_PER_BATCH);
+ batchBuilder.append(String.format(insert, key, name)).append('\n');
+
+ // Track the test that will later verify that this mutation was replicated properly.
+ tests[j] = () -> {
+ Object[] row = row(key, name);
+ Long token = tokenFrom(key);
+ int node = primaryReplica(initialTokens, token);
+
+ Object[][] primaryResult = cluster.get(node).executeInternal(select, key);
+
+ // We shouldn't expect to see any results if the batchlog write failed.
+ if (failBatchlog)
+ assertRows(primaryResult);
+ else
+ assertRows(primaryResult, row);
+
+ node = nextNode(node, cluster.size());
+
+ Object[][] nextResult = cluster.get(node).executeInternal(select, key);
+
+ if (failBatchlog)
+ assertRows(nextResult);
+ else
+ assertRows(nextResult, row);
+
+ // At RF=2, this node should not have received the write.
+ node = nextNode(node, cluster.size());
+ assertRows(cluster.get(node).executeInternal(select, key));
+ };
+ }
+
+ String batch = batchBuilder.append("APPLY BATCH").toString();
+
+ try
+ {
+ cluster.coordinator(i).execute(batch, ConsistencyLevel.ALL);
+ }
+ catch (Throwable t)
+ {
+ if (!failBatchlog || !exceptionMatches(t, WriteTimeoutException.class))
+ {
+ // The standard write failure exception won't give us any context for what actually failed.
+ if (exceptionMatches(t, WriteFailureException.class))
+ {
+ String message = "Failed to write following batch to coordinator %d after upgrading node %d:\n%s";
+ throw new AssertionError(String.format(message, i, upgraded, batch), t);
+ }
+
+ throw t;
+ }
+
+ // Failing the batchlog write will involve a timeout, so that's expected. Just continue...
+ }
+
+ for (Runnable test : tests)
+ test.run();
+ }
+ }
+
+ private boolean exceptionMatches(Throwable t, Class<?> clazz)
+ {
+ return t.getClass().getSimpleName().equals(clazz.getSimpleName())
+ || t.getCause() != null && t.getCause().getClass().getSimpleName().equals(clazz.getSimpleName());
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2LoggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2LoggedBatchTest.java
new file mode 100644
index 0000000..575642c
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2LoggedBatchTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom2LoggedBatchTest extends MixedModeBatchTestBase
+{
+ @Test
+ public void testSimpleStrategy22to30() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v22, Versions.Major.v30, true);
+ }
+
+ @Test
+ public void testSimpleStrategy22to3X() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v22, Versions.Major.v3X, true);
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2ReplicationTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2ReplicationTest.java
new file mode 100644
index 0000000..38efbaf
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2ReplicationTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom2ReplicationTest extends MixedModeReplicationTestBase
+{
+ @Test
+ public void testSimpleStrategy22to30() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v22, Versions.Major.v30);
+ }
+
+ @Test
+ public void testSimpleStrategy22to3X() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v22, Versions.Major.v3X);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2UnloggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2UnloggedBatchTest.java
new file mode 100644
index 0000000..f01958e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom2UnloggedBatchTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom2UnloggedBatchTest extends MixedModeBatchTestBase
+{
+ @Test
+ public void testSimpleStrategy22to30() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v22, Versions.Major.v30, false);
+ }
+
+ @Test
+ public void testSimpleStrategy22to3X() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v22, Versions.Major.v3X, false);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3LoggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3LoggedBatchTest.java
new file mode 100644
index 0000000..bb2008e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3LoggedBatchTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom3LoggedBatchTest extends MixedModeBatchTestBase
+{
+ @Test
+ public void testSimpleStrategy30to3X() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v30, Versions.Major.v3X, true);
+ }
+
+ @Test
+ public void testSimpleStrategy30to4() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v30, Versions.Major.v4, true);
+ }
+
+ @Test
+ public void testSimpleStrategy3Xto4() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v3X, Versions.Major.v4, true);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3ReplicationTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3ReplicationTest.java
new file mode 100644
index 0000000..8eae4b4
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3ReplicationTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom3ReplicationTest extends MixedModeReplicationTestBase
+{
+ @Test
+ public void testSimpleStrategy30to3X() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v30, Versions.Major.v3X);
+ }
+
+ @Test
+ public void testSimpleStrategy30to4() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v30, Versions.Major.v4);
+ }
+
+ @Test
+ public void testSimpleStrategy3Xto4() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v3X, Versions.Major.v4);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3UnloggedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3UnloggedBatchTest.java
new file mode 100644
index 0000000..b39177b
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeFrom3UnloggedBatchTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.shared.Versions;
+
+public class MixedModeFrom3UnloggedBatchTest extends MixedModeBatchTestBase
+{
+ @Test
+ public void testSimpleStrategy30to3X() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v30, Versions.Major.v3X, false);
+ }
+
+ @Test
+ public void testSimpleStrategy30to4() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v30, Versions.Major.v4, false);
+ }
+
+ @Test
+ public void testSimpleStrategy3Xto4() throws Throwable
+ {
+ testSimpleStrategy(Versions.Major.v3X, Versions.Major.v4, false);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReplicationTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReplicationTestBase.java
new file mode 100644
index 0000000..153a8a5
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeReplicationTestBase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.Versions;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+/**
+ * A base class for testing basic replication on mixed-version clusters.
+ */
+public class MixedModeReplicationTestBase extends UpgradeTestBase
+{
+ protected void testSimpleStrategy(Versions.Major from, Versions.Major to) throws Throwable
+ {
+ String insert = "INSERT INTO test_simple.names (key, name) VALUES (?, ?)";
+ String select = "SELECT * FROM test_simple.names WHERE key = ?";
+
+ new TestCase()
+ .nodes(3)
+ .nodesToUpgrade(1, 2)
+ .upgrade(from, to)
+ .setup(cluster -> {
+ cluster.schemaChange("CREATE KEYSPACE test_simple WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};");
+ cluster.schemaChange("CREATE TABLE test_simple.names (key int PRIMARY KEY, name text)");
+ })
+ .runAfterNodeUpgrade((cluster, upgraded) -> {
+ List<Long> initialTokens = new ArrayList<>(cluster.size() + 1);
+ initialTokens.add(null); // The first valid token is at 1 to avoid offset math below.
+
+ for (int i = 1; i <= cluster.size(); i++)
+ initialTokens.add(Long.valueOf(cluster.get(i).config().get("initial_token").toString()));
+
+ List<Long> validTokens = initialTokens.subList(1, cluster.size() + 1);
+
+ // Exercise all the coordinators...
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ // ...and sample enough keys that we cover the ring.
+ for (int j = 0; j < 10; j++)
+ {
+ int key = j + (i * 10);
+ Object[] row = row(key, "Nero");
+ Long token = tokenFrom(key);
+
+ cluster.coordinator(i).execute(insert, ConsistencyLevel.ALL, row);
+
+ int node = primaryReplica(validTokens, token);
+ assertRows(cluster.get(node).executeInternal(select, key), row);
+
+ node = nextNode(node, cluster.size());
+ assertRows(cluster.get(node).executeInternal(select, key), row);
+
+ // At RF=2, this node should not have received the write.
+ node = nextNode(node, cluster.size());
+ assertRows(cluster.get(node).executeInternal(select, key));
+ }
+ }
+ })
+ .run();
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 3653837..39957e9 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -28,12 +28,15 @@ import java.util.function.Consumer;
import org.junit.After;
import org.junit.BeforeClass;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.distributed.shared.Versions.Major;
import static org.apache.cassandra.distributed.shared.Versions.Version;
@@ -206,4 +209,32 @@ public class UpgradeTestBase extends DistributedTestBase
.upgrade(Versions.Major.v3X, Versions.Major.v4)
.nodesToUpgrade(toUpgrade);
}
+
+ protected static int primaryReplica(List<Long> initialTokens, Long token)
+ {
+ int primary = 1;
+
+ for (Long initialToken : initialTokens)
+ {
+ if (token <= initialToken)
+ {
+ break;
+ }
+
+ primary++;
+ }
+
+ return primary;
+ }
+
+ protected static Long tokenFrom(int key)
+ {
+ DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(key));
+ return (Long) dk.getToken().getTokenValue();
+ }
+
+ protected static int nextNode(int current, int numNodes)
+ {
+ return current == numNodes ? 1 : current + 1;
+ }
}
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
index c2b9fc9..8057e99 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -20,15 +20,12 @@ package org.apache.cassandra.batchlog;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import static org.hamcrest.CoreMatchers.is;
@@ -40,7 +37,7 @@ public class BatchlogEndpointFilterTest
private static final String LOCAL = "local";
@Test
- public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+ public void shouldSelect2HostsFromNonLocalRacks() throws UnknownHostException
{
Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
.put(LOCAL, InetAddressAndPort.getByName("0"))
@@ -57,6 +54,27 @@ public class BatchlogEndpointFilterTest
}
@Test
+ public void shouldSelectLastHostsFromLastNonLocalRacks() throws UnknownHostException
+ {
+ Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+ .put(LOCAL, InetAddressAndPort.getByName("00"))
+ .put("1", InetAddressAndPort.getByName("11"))
+ .put("2", InetAddressAndPort.getByName("2"))
+ .put("2", InetAddressAndPort.getByName("22"))
+ .put("3", InetAddressAndPort.getByName("3"))
+ .put("3", InetAddressAndPort.getByName("33"))
+ .build();
+
+ Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
+ assertThat(result.size(), is(2));
+
+ // result should be the last replicas of the last two racks
+ // (Collections.shuffle has been replaced with Collections.reverse for testing)
+ assertTrue(result.contains(InetAddressAndPort.getByName("22")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("33")));
+ }
+
+ @Test
public void shouldSelectHostFromLocal() throws UnknownHostException
{
Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
@@ -71,7 +89,7 @@ public class BatchlogEndpointFilterTest
}
@Test
- public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+ public void shouldReturnPassedEndpointForSingleNodeDC() throws UnknownHostException
{
Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
.put(LOCAL, InetAddressAndPort.getByName("0"))
@@ -116,6 +134,19 @@ public class BatchlogEndpointFilterTest
assertTrue(result.contains(InetAddressAndPort.getByName("1111")));
}
+ @Test
+ public void shouldSelectOnlyTwoHostsEvenIfLocal() throws UnknownHostException
+ {
+ Multimap<String, InetAddressAndPort> endpoints = ImmutableMultimap.<String, InetAddressAndPort> builder()
+ .put(LOCAL, InetAddressAndPort.getByName("1"))
+ .put(LOCAL, InetAddressAndPort.getByName("11"))
+ .build();
+ Collection<InetAddressAndPort> result = filterBatchlogEndpoints(endpoints);
+ assertThat(result.size(), is(2));
+ assertTrue(result.contains(InetAddressAndPort.getByName("1")));
+ assertTrue(result.contains(InetAddressAndPort.getByName("11")));
+ }
+
private Collection<InetAddressAndPort> filterBatchlogEndpoints(Multimap<String, InetAddressAndPort> endpoints)
{
return ReplicaPlans.filterBatchlogEndpoints(LOCAL, endpoints,
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
index 361759f..b86b0d3 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.batchlog;
-import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
@@ -91,7 +90,6 @@ public class BatchlogManagerTest
}
@Before
- @SuppressWarnings("deprecation")
public void setUp() throws Exception
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
@@ -127,7 +125,6 @@ public class BatchlogManagerTest
}
@Test
- @SuppressWarnings("deprecation")
public void testReplay() throws Exception
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
@@ -273,7 +270,7 @@ public class BatchlogManagerTest
}
@Test
- public void testAddBatch() throws IOException
+ public void testAddBatch()
{
long initialAllBatches = BatchlogManager.instance.countAllBatches();
TableMetadata cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata();
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
index bc220fd..cbe5b04 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
@@ -33,14 +33,17 @@ public class BatchTest extends CQLTester
{
createTable("CREATE TABLE %s (userid text PRIMARY KEY, name text, password text)");
- String query = "BEGIN BATCH\n"
- + "INSERT INTO %1$s (userid, password, name) VALUES ('user2', 'ch@ngem3b', 'second user');\n"
- + "UPDATE %1$s SET password = 'ps22dhds' WHERE userid = 'user3';\n"
- + "INSERT INTO %1$s (userid, password) VALUES ('user4', 'ch@ngem3c');\n"
- + "DELETE name FROM %1$s WHERE userid = 'user1';\n"
- + "APPLY BATCH;";
-
- execute(query);
+ execute("BEGIN BATCH\n" +
+ "INSERT INTO %1$s (userid, password, name) VALUES ('user2', 'ch@ngem3b', 'second user');\n" +
+ "UPDATE %1$s SET password = 'ps22dhds' WHERE userid = 'user3';\n" +
+ "INSERT INTO %1$s (userid, password) VALUES ('user4', 'ch@ngem3c');\n" +
+ "DELETE name FROM %1$s WHERE userid = 'user1';\n" +
+ "APPLY BATCH;");
+
+ assertRows(execute("SELECT userid FROM %s"),
+ row("user2"),
+ row("user4"),
+ row("user3"));
}
/**
@@ -52,24 +55,53 @@ public class BatchTest extends CQLTester
createTable("CREATE TABLE %s (k int PRIMARY KEY, l list<int>)");
execute("BEGIN BATCH " +
- "UPDATE %1$s SET l = l +[ 1 ] WHERE k = 0; " +
- "UPDATE %1$s SET l = l + [ 2 ] WHERE k = 0; " +
- "UPDATE %1$s SET l = l + [ 3 ] WHERE k = 0; " +
+ "UPDATE %1$s SET l = l + [1] WHERE k = 0; " +
+ "UPDATE %1$s SET l = l + [2] WHERE k = 0; " +
+ "UPDATE %1$s SET l = l + [3] WHERE k = 0; " +
"APPLY BATCH");
assertRows(execute("SELECT l FROM %s WHERE k = 0"),
row(list(1, 2, 3)));
execute("BEGIN BATCH " +
- "UPDATE %1$s SET l =[ 1 ] + l WHERE k = 1; " +
- "UPDATE %1$s SET l = [ 2 ] + l WHERE k = 1; " +
- "UPDATE %1$s SET l = [ 3 ] + l WHERE k = 1; " +
+ "UPDATE %1$s SET l = [1] + l WHERE k = 1; " +
+ "UPDATE %1$s SET l = [2] + l WHERE k = 1; " +
+ "UPDATE %1$s SET l = [3] + l WHERE k = 1; " +
"APPLY BATCH ");
assertRows(execute("SELECT l FROM %s WHERE k = 1"),
row(list(3, 2, 1)));
}
+ @Test
+ public void testBatchAndMap() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, m map<int, int>)");
+
+ execute("BEGIN BATCH " +
+ "UPDATE %1$s SET m[1] = 2 WHERE k = 0; " +
+ "UPDATE %1$s SET m[3] = 4 WHERE k = 0; " +
+ "APPLY BATCH");
+
+ assertRows(execute("SELECT m FROM %s WHERE k = 0"),
+ row(map(1, 2, 3, 4)));
+ }
+
+ @Test
+ public void testBatchAndSet() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, s set<int>)");
+
+ execute("BEGIN BATCH " +
+ "UPDATE %1$s SET s = s + {1} WHERE k = 0; " +
+ "UPDATE %1$s SET s = s + {2} WHERE k = 0; " +
+ "UPDATE %1$s SET s = s + {3} WHERE k = 0; " +
+ "APPLY BATCH");
+
+ assertRows(execute("SELECT s FROM %s WHERE k = 0"),
+ row(set(1, 2, 3)));
+ }
+
/**
* Migrated from cql_tests.py:TestCQL.bug_6115_test()
*/
diff --git a/test/unit/org/apache/cassandra/hints/HintMessageTest.java b/test/unit/org/apache/cassandra/hints/HintMessageTest.java
index bb015a8..3565a12 100644
--- a/test/unit/org/apache/cassandra/hints/HintMessageTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintMessageTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.hints;
import java.io.IOException;
import java.util.UUID;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -34,33 +35,74 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.FBUtilities;
-import static junit.framework.Assert.assertEquals;
-
import static org.apache.cassandra.hints.HintsTestUtil.assertHintsEqual;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
public class HintMessageTest
{
private static final String KEYSPACE = "hint_message_test";
private static final String TABLE = "table";
- @Test
- public void testSerializer() throws IOException
+ @BeforeClass
+ public static void setup()
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE));
+ }
+ @Test
+ public void testSerializer() throws IOException
+ {
UUID hostId = UUID.randomUUID();
long now = FBUtilities.timestampMicros();
+ TableMetadata table = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+
+ Mutation mutation =
+ new RowUpdateBuilder(table, now, bytes("key")).clustering("column").add("val", "val" + 1234).build();
+
+ Hint hint = Hint.create(mutation, now / 1000);
+ HintMessage message = new HintMessage(hostId, hint);
+ // serialize
+ int serializedSize = (int) HintMessage.serializer.serializedSize(message, MessagingService.current_version);
+ HintMessage deserializedMessage;
+
+ try (DataOutputBuffer dob = new DataOutputBuffer())
+ {
+ HintMessage.serializer.serialize(message, dob, MessagingService.current_version);
+ assertEquals(serializedSize, dob.getLength());
+
+ // deserialize
+ DataInputPlus di = new DataInputBuffer(dob.buffer(), true);
+ deserializedMessage = HintMessage.serializer.deserialize(di, MessagingService.current_version);
+ }
+
+ // compare before/after
+ assertEquals(hostId, deserializedMessage.hostId);
+ assertNotNull(deserializedMessage.hint);
+ assertHintsEqual(hint, deserializedMessage.hint);
+ }
+
+ @Test
+ public void testEncodedSerializer() throws IOException
+ {
+ UUID hostId = UUID.randomUUID();
+ long now = FBUtilities.timestampMicros();
TableMetadata table = Schema.instance.getTableMetadata(KEYSPACE, TABLE);
+
Mutation mutation =
- new RowUpdateBuilder(table, now, bytes("key"))
- .clustering("column")
- .add("val", "val" + 1234)
- .build();
+ new RowUpdateBuilder(table, now, bytes("key")).clustering("column").add("val", "val" + 1234) .build();
+
Hint hint = Hint.create(mutation, now / 1000);
- HintMessage message = new HintMessage(hostId, hint);
+ HintMessage.Encoded message;
+
+ try (DataOutputBuffer dob = new DataOutputBuffer())
+ {
+ Hint.serializer.serialize(hint, dob, MessagingService.current_version);
+ message = new HintMessage.Encoded(hostId, dob.buffer(), MessagingService.current_version);
+ }
// serialize
int serializedSize = (int) HintMessage.serializer.serializedSize(message, MessagingService.current_version);
@@ -69,11 +111,12 @@ public class HintMessageTest
assertEquals(serializedSize, dob.getLength());
// deserialize
- DataInputPlus di = new DataInputBuffer(dob.buffer(), true);
- HintMessage deserializedMessage = HintMessage.serializer.deserialize(di, MessagingService.current_version);
+ DataInputPlus dip = new DataInputBuffer(dob.buffer(), true);
+ HintMessage deserializedMessage = HintMessage.serializer.deserialize(dip, MessagingService.current_version);
// compare before/after
assertEquals(hostId, deserializedMessage.hostId);
- assertHintsEqual(message.hint, deserializedMessage.hint);
+ assertNotNull(deserializedMessage.hint);
+ assertHintsEqual(hint, deserializedMessage.hint);
}
}
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index 3960bd0..8ab1bfa 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -31,7 +31,9 @@ import com.google.common.collect.Multimap;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,14 +47,18 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.locator.NetworkTopologyStrategy.REPLICATION_FACTOR;
import static org.apache.cassandra.locator.Replica.fullReplica;
import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.junit.Assert.assertTrue;
public class NetworkTopologyStrategyTest
{
- private String keyspaceName = "Keyspace1";
+ private static final String KEYSPACE = "Keyspace1";
private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategyTest.class);
@BeforeClass
@@ -76,7 +82,7 @@ public class NetworkTopologyStrategyTest
configOptions.put("DC3", "1");
// Set the localhost to the tokenmetadata. Embedded cassandra way?
- NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
assert strategy.getReplicationFactor("DC1").allReplicas == 3;
assert strategy.getReplicationFactor("DC2").allReplicas == 2;
assert strategy.getReplicationFactor("DC3").allReplicas == 1;
@@ -101,7 +107,7 @@ public class NetworkTopologyStrategyTest
configOptions.put("DC3", "0");
// Set the localhost to the tokenmetadata. Embedded cassandra way?
- NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
assert strategy.getReplicationFactor("DC1").allReplicas == 3;
assert strategy.getReplicationFactor("DC2").allReplicas == 3;
assert strategy.getReplicationFactor("DC3").allReplicas == 0;
@@ -144,7 +150,7 @@ public class NetworkTopologyStrategyTest
}
metadata.updateNormalTokens(tokens);
- NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"})
{
@@ -418,7 +424,7 @@ public class NetworkTopologyStrategyTest
Map<String, String> configOptions = new HashMap<String, String>();
configOptions.put(snitch.getDatacenter((InetAddressAndPort) null), "3/1");
- NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions);
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(KEYSPACE, metadata, snitch, configOptions);
Util.assertRCEquals(EndpointsForRange.of(fullReplica(endpoints.get(0), range(400, 100)),
fullReplica(endpoints.get(1), range(400, 100)),
@@ -431,4 +437,50 @@ public class NetworkTopologyStrategyTest
transientReplica(endpoints.get(3), range(100, 200))),
strategy.getNaturalReplicasForToken(tk(101)));
}
+
+ @Rule
+ public ExpectedException expectedEx = ExpectedException.none();
+
+ @Test
+ public void shouldRejectReplicationFactorOption() throws ConfigurationException
+ {
+ expectedEx.expect(ConfigurationException.class);
+ expectedEx.expectMessage(REPLICATION_FACTOR + " should not appear");
+
+ IEndpointSnitch snitch = new SimpleSnitch();
+ TokenMetadata metadata = new TokenMetadata();
+
+ Map<String, String> configOptions = new HashMap<>();
+ configOptions.put(REPLICATION_FACTOR, "1");
+
+ @SuppressWarnings("unused")
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy("ks", metadata, snitch, configOptions);
+ }
+
+ @Test
+ public void shouldWarnOnHigherReplicationFactorThanNodesInDC()
+ {
+ HashMap<String, String> configOptions = new HashMap<>();
+ configOptions.put("DC1", "2");
+
+ IEndpointSnitch snitch = new AbstractNetworkTopologySnitch()
+ {
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return "rack1";
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ return "DC1";
+ }
+ };
+
+ NetworkTopologyStrategy strategy = new NetworkTopologyStrategy("ks", new TokenMetadata(), snitch, configOptions);
+ StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), FBUtilities.getBroadcastAddressAndPort());
+
+ ClientWarn.instance.captureWarnings();
+ strategy.maybeWarnOnOptions();
+ assertTrue(ClientWarn.instance.getWarnings().stream().anyMatch(s -> s.contains("Your replication factor")));
+ }
}
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
index 7d85b44..55f54ad 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationFactorTest.java
@@ -24,10 +24,12 @@ import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
+import org.assertj.core.api.Assertions;
+
+import static org.junit.Assert.assertEquals;
public class ReplicationFactorTest
{
-
@BeforeClass
public static void setupClass()
{
@@ -36,7 +38,44 @@ public class ReplicationFactorTest
Gossiper.instance.start(1);
}
- private static void assertRfParseFailure(String s)
+ @Test
+ public void shouldParseValidRF()
+ {
+ assertRfParse("0", 0, 0);
+ assertRfParse("3", 3, 0);
+ assertRfParse("3/1", 3, 1);
+ assertRfParse("5", 5, 0);
+ assertRfParse("5/2", 5, 2);
+ }
+
+ @Test
+ public void shouldFailOnInvalidRF()
+ {
+ assertRfParseFailure("-1", "Replication factor must be non-negative");
+ assertRfParseFailure("3/3", "Transient replicas must be zero, or less than total replication factor");
+ assertRfParseFailure("3/-1", "Amount of transient nodes should be strictly positive");
+ assertRfParseFailure("3/4", "Transient replicas must be zero, or less than total replication factor");
+ assertRfParseFailure("3/", "Replication factor format is <replicas> or <replicas>/<transient>");
+ assertRfParseFailure("1/a", "For input string");
+ assertRfParseFailure("a/1", "For input string");
+ assertRfParseFailure("", "For input string");
+ }
+
+ @Test
+ public void shouldRoundTripParseSimpleRF()
+ {
+ String rf = "3";
+ assertEquals(rf, ReplicationFactor.fromString(rf).toParseableString());
+ }
+
+ @Test
+ public void shouldRoundTripParseTransientRF()
+ {
+ String rf = "3/1";
+ assertEquals(rf, ReplicationFactor.fromString(rf).toParseableString());
+ }
+
+ private static void assertRfParseFailure(String s, String error)
{
try
{
@@ -45,39 +84,15 @@ public class ReplicationFactorTest
}
catch (IllegalArgumentException e)
{
- // expected
+ Assertions.assertThat(e.getMessage()).contains(error);
}
}
private static void assertRfParse(String s, int expectedReplicas, int expectedTrans)
{
ReplicationFactor rf = ReplicationFactor.fromString(s);
- Assert.assertEquals(expectedReplicas, rf.allReplicas);
- Assert.assertEquals(expectedTrans, rf.transientReplicas());
- Assert.assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas);
- }
-
- @Test
- public void parseTest()
- {
- assertRfParse("3", 3, 0);
- assertRfParse("3/1", 3, 1);
-
- assertRfParse("5", 5, 0);
- assertRfParse("5/2", 5, 2);
-
- assertRfParseFailure("-1");
- assertRfParseFailure("3/3");
- assertRfParseFailure("3/4");
- }
-
- @Test
- public void roundTripParseTest()
- {
- String input = "3";
- Assert.assertEquals(input, ReplicationFactor.fromString(input).toParseableString());
-
- String transientInput = "3/1";
- Assert.assertEquals(transientInput, ReplicationFactor.fromString(transientInput).toParseableString());
+ assertEquals(expectedReplicas, rf.allReplicas);
+ assertEquals(expectedTrans, rf.transientReplicas());
+ assertEquals(expectedReplicas - expectedTrans, rf.fullReplicas);
}
}
diff --git a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
index 2e9e32d..6ccfcde 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicationStrategyEndpointCacheTest.java
@@ -17,9 +17,6 @@
*/
package org.apache.cassandra.locator;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -40,52 +37,33 @@ public class ReplicationStrategyEndpointCacheTest
public static final String KEYSPACE = "ReplicationStrategyEndpointCacheTest";
@BeforeClass
- public static void defineSchema() throws Exception
+ public static void defineSchema()
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(5));
}
- public void setup(Class stratClass, Map<String, String> strategyOptions) throws Exception
+ public void setup() throws Exception
{
tmd = new TokenMetadata();
searchToken = new BigIntegerToken(String.valueOf(15));
-
strategy = getStrategyWithNewTokenMetadata(Keyspace.open(KEYSPACE).getReplicationStrategy(), tmd);
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(10)), InetAddressAndPort.getByName("127.0.0.1"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(20)), InetAddressAndPort.getByName("127.0.0.2"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(30)), InetAddressAndPort.getByName("127.0.0.3"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(40)), InetAddressAndPort.getByName("127.0.0.4"));
- //tmd.updateNormalToken(new BigIntegerToken(String.valueOf(50)), InetAddressAndPort.getByName("127.0.0.5", null, null));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(60)), InetAddressAndPort.getByName("127.0.0.6"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(70)), InetAddressAndPort.getByName("127.0.0.7"));
- tmd.updateNormalToken(new BigIntegerToken(String.valueOf(80)), InetAddressAndPort.getByName("127.0.0.8"));
+ for (int i = 1; i <= 8; i++)
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(i * 10)), InetAddressAndPort.getByName("127.0.0." + i));
}
@Test
public void testEndpointsWereCached() throws Exception
{
- runEndpointsWereCachedTest(FakeSimpleStrategy.class, null);
- runEndpointsWereCachedTest(FakeNetworkTopologyStrategy.class, new HashMap<String, String>());
- }
-
- public void runEndpointsWereCachedTest(Class stratClass, Map<String, String> configOptions) throws Exception
- {
- setup(stratClass, configOptions);
+ setup();
Util.assertRCEquals(strategy.getNaturalReplicasForToken(searchToken), strategy.getNaturalReplicasForToken(searchToken));
}
@Test
public void testCacheRespectsTokenChanges() throws Exception
{
- runCacheRespectsTokenChangesTest(SimpleStrategy.class, null);
- runCacheRespectsTokenChangesTest(NetworkTopologyStrategy.class, new HashMap<String, String>());
- }
-
- public void runCacheRespectsTokenChangesTest(Class stratClass, Map<String, String> configOptions) throws Exception
- {
- setup(stratClass, configOptions);
+ setup();
EndpointsForToken initial;
EndpointsForToken replicas;
@@ -116,40 +94,6 @@ public class ReplicationStrategyEndpointCacheTest
Util.assertNotRCEquals(replicas, initial);
}
- protected static class FakeSimpleStrategy extends SimpleStrategy
- {
- private boolean called = false;
-
- public FakeSimpleStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
- {
- super(keyspaceName, tokenMetadata, snitch, configOptions);
- }
-
- public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
- {
- assert !called : "calculateNaturalReplicas was already called, result should have been cached";
- called = true;
- return super.calculateNaturalReplicas(token, metadata);
- }
- }
-
- protected static class FakeNetworkTopologyStrategy extends NetworkTopologyStrategy
- {
- private boolean called = false;
-
- public FakeNetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
- {
- super(keyspaceName, tokenMetadata, snitch, configOptions);
- }
-
- public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
- {
- assert !called : "calculateNaturalReplicas was already called, result should have been cached";
- called = true;
- return super.calculateNaturalReplicas(token, metadata);
- }
- }
-
private AbstractReplicationStrategy getStrategyWithNewTokenMetadata(AbstractReplicationStrategy strategy, TokenMetadata newTmd) throws ConfigurationException
{
return AbstractReplicationStrategy.createReplicationStrategy(
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index 9e24de7..4c1ff26 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
@@ -48,9 +49,12 @@ import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceAccessor;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -316,8 +320,37 @@ public class SimpleStrategyTest
Map<String, String> configOptions = new HashMap<>();
+ @SuppressWarnings("unused")
SimpleStrategy strategy = new SimpleStrategy("ks", metadata, snitch, configOptions);
}
+
+ @Test
+ public void shouldReturnNoEndpointsForEmptyRing()
+ {
+ TokenMetadata metadata = new TokenMetadata();
+
+ HashMap<String, String> configOptions = new HashMap<>();
+ configOptions.put("replication_factor", "1");
+
+ SimpleStrategy strategy = new SimpleStrategy("ks", metadata, new SimpleSnitch(), configOptions);
+
+ EndpointsForRange replicas = strategy.calculateNaturalReplicas(null, metadata);
+ assertTrue(replicas.endpoints().isEmpty());
+ }
+
+ @Test
+ public void shouldWarnOnHigherReplicationFactorThanNodes()
+ {
+ HashMap<String, String> configOptions = new HashMap<>();
+ configOptions.put("replication_factor", "2");
+
+ SimpleStrategy strategy = new SimpleStrategy("ks", new TokenMetadata(), new SimpleSnitch(), configOptions);
+ StorageService.instance.getTokenMetadata().updateHostId(UUID.randomUUID(), FBUtilities.getBroadcastAddressAndPort());
+
+ ClientWarn.instance.captureWarnings();
+ strategy.maybeWarnOnOptions();
+ assertTrue(ClientWarn.instance.getWarnings().stream().anyMatch(s -> s.contains("Your replication factor")));
+ }
private AbstractReplicationStrategy getStrategy(String keyspaceName, TokenMetadata tmd, IEndpointSnitch snitch)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org