You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2020/11/27 16:10:34 UTC
[cassandra] branch cassandra-3.0 updated: Fix serial
read/non-applying CAS linearizability
This is an automated email from the ASF dual-hosted git repository.
slebresne pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 2d0b168 Fix serial read/non-applying CAS linearizability
2d0b168 is described below
commit 2d0b16804785660e8515aca9944784fb3733c619
Author: Sylvain Lebresne <le...@gmail.com>
AuthorDate: Wed May 20 14:56:02 2020 +0200
Fix serial read/non-applying CAS linearizability
Before this patch, a SERIAL read or a non-applying CAS replay any
in-progress commit by calling `beginAndRepairPaxos`, but only a quorum
of nodes is contacted, so a minority of nodes could have an unfinished
in-progress proposal in their Paxos state. If such in-progress proposal
is not replayed by a SERIAL read/non-applying CAS, it should never be
replayed by any following operation as that would break serializability,
but nothing was done to avoid this.
This patch ensures that both a SERIAL read or a non-applying CAS commit
an empty update before succeeding. This ensures that no prior incomplete
in-progress proposal can be replayed (such proposal will be discarded as
older than the last committed ballot).
As this fix has a performance impact on SERIAL reads, a flag is provided
to disable the new code (even if this is discouraged by a warning).
Patch by Sylvain Lebresne, reviewed by Benjamin Lerer for CASSANDRA-12126
---
CHANGES.txt | 1 +
NEWS.txt | 8 +
.../org/apache/cassandra/service/StorageProxy.java | 231 +++++--
.../org/apache/cassandra/service/paxos/Commit.java | 6 +
.../apache/cassandra/service/paxos/PaxosState.java | 3 -
.../cassandra/service/paxos/PrepareCallback.java | 12 +-
.../cassandra/distributed/impl/Instance.java | 112 ++--
.../apache/cassandra/distributed/test/CASTest.java | 679 +++++++++++++++++++++
8 files changed, 939 insertions(+), 113 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 546fd98..d6f406d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.24:
+ * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
* Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294)
* Improved check of num_tokens against the length of initial_token (CASSANDRA-14477)
* Fix a race condition on ColumnFamilyStore and TableMetrics (CASSANDRA-16228)
diff --git a/NEWS.txt b/NEWS.txt
index 7034c2c..6cc5e84 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -47,6 +47,14 @@ using the provided 'sstableupgrade' tool.
Upgrading
---------
+ - This release fix a correctness issue with SERIAL reads, and LWT writes that do not apply.
+ Unfortunately, this fix has a performance impact on read performance at the SERIAL or
+ LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads, the performance
+ impact may be noticeable and may also result in an increased of timeouts. For that
+ reason, a opt-in system property has been added to disable the fix:
+ -Dcassandra.unsafe.disable-serial-reads-linearizability=true
+ Use this flag at your own risk as it revert SERIAL reads to the incorrect behavior of
+ previous versions. See CASSANDRA-12126 for details.
- In cassandra.yaml, when using vnodes num_tokens must be defined if initial_token is defined.
If it is not defined, or not equal to the numbers of tokens defined in initial_tokens,
the node will not start. See CASSANDRA-14477 for details.
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c7888c4..91dd991 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,6 +24,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import com.google.common.base.Predicate;
import com.google.common.annotations.VisibleForTesting;
@@ -109,6 +110,10 @@ public class StorageProxy implements StorageProxyMBean
*/
private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10));
+ private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = "cassandra.unsafe.disable-serial-reads-linearizability";
+ private static final boolean disableSerialReadLinearizability =
+ Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false"));
+
private StorageProxy()
{
}
@@ -163,6 +168,16 @@ public class StorageProxy implements StorageProxyMBean
.execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter));
}
};
+
+ if (disableSerialReadLinearizability)
+ {
+ logger.warn("This node was started with -D{}. SERIAL (and LOCAL_SERIAL) reads coordinated by this node " +
+ "will not offer linearizability (see CASSANDRA-12126 for details on what this mean) with " +
+ "respect to other SERIAL operations. Please note that, with this flag, SERIAL reads will be " +
+ "slower than QUORUM reads, yet offer no more guarantee. This flag should only be used in " +
+ "the restricted case of upgrading from a pre-CASSANDRA-12126 version, and only if you " +
+ "understand the tradeoff.", DISABLE_SERIAL_READ_LINEARIZABILITY_KEY);
+ }
}
/**
@@ -216,26 +231,12 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
{
final long start = System.nanoTime();
- int contentions = 0;
try
{
- consistencyForPaxos.validateForCas();
- consistencyForCommit.validateForCasCommit(keyspaceName);
-
CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
- long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
- while (System.nanoTime() - start < timeout)
+ Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () ->
{
- // for simplicity, we'll do a single liveness check at the start of each attempt
- Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
- List<InetAddress> liveEndpoints = p.left;
- int requiredParticipants = p.right;
-
- final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
- final UUID ballot = pair.left;
- contentions += pair.right;
-
// read the current values and check they validate the conditions
Tracing.trace("Reading existing values for CAS precondition");
SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
@@ -251,11 +252,10 @@ public class StorageProxy implements StorageProxyMBean
{
Tracing.trace("CAS precondition does not match current values {}", current);
casWriteMetrics.conditionNotMet.inc();
- return current.rowIterator();
+ return Pair.create(PartitionUpdate.emptyUpdate(metadata, key), current.rowIterator());
}
- // finish the paxos round w/ the desired updates
- // TODO turn null updates into delete?
+ // Create the desired updates
PartitionUpdate updates = request.makeUpdates(current);
// Apply triggers to cas updates. A consideration here is that
@@ -267,47 +267,141 @@ public class StorageProxy implements StorageProxyMBean
// InvalidRequestException) any which aren't.
updates = TriggerExecutor.instance.execute(updates);
+ return Pair.create(updates, null);
+ };
- Commit proposal = Commit.newProposal(ballot, updates);
- Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
- if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
- {
- commitPaxos(proposal, consistencyForCommit, true);
- Tracing.trace("CAS successful");
- return null;
- }
+ return doPaxos(metadata,
+ key,
+ consistencyForPaxos,
+ consistencyForCommit,
+ consistencyForCommit,
+ state,
+ start,
+ casWriteMetrics,
+ updateProposer);
- Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
- contentions++;
- Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS);
- // continue to retry
- }
-
- throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
}
- catch (WriteTimeoutException|ReadTimeoutException e)
+ catch (WriteTimeoutException | ReadTimeoutException e)
{
casWriteMetrics.timeouts.mark();
throw e;
}
- catch (WriteFailureException|ReadFailureException e)
+ catch (WriteFailureException | ReadFailureException e)
{
casWriteMetrics.failures.mark();
throw e;
}
- catch(UnavailableException e)
+ catch (UnavailableException e)
{
casWriteMetrics.unavailables.mark();
throw e;
}
finally
{
- if(contentions > 0)
- casWriteMetrics.contention.update(contentions);
casWriteMetrics.addNano(System.nanoTime() - start);
}
}
+ /**
+ * Performs the Paxos rounds for a given proposal, retrying when preempted until the timeout.
+ *
+ * <p>The main 'configurable' of this method is the {@code createUpdateProposal} method: it is called by the method
+ * once a ballot has been successfully 'prepared' to generate the update to 'propose' (and commit if the proposal is
+ * successful). That method also generates the result that the whole method will return. Note that due to retrying,
+ * this method may be called multiple times and does not have to return the same results.
+ *
+ * @param metadata the table to update with Paxos.
+ * @param key the partition updated.
+ * @param consistencyForPaxos the serial consistency of the operation (either {@link ConsistencyLevel#SERIAL} or
+ * {@link ConsistencyLevel#LOCAL_SERIAL}).
+ * @param consistencyForReplayCommits the consistency for the commit phase of "replayed" in-progress operations.
+ * @param consistencyForCommit the consistency for the commit phase of _this_ operation update.
+ * @param state the client state.
+ * @param queryStartNanoTime the nano time for the start of the query this is part of.
+ * @param casMetrics the metrics to update for this operation.
+ * @param createUpdateProposal method called after a successful 'prepare' phase to obtain 1) the actual update of
+ * this operation and 2) the result that the whole method should return. This can return {@code null} in the
+ * special where, after having "prepared" (and thus potentially replayed in-progress upgdates), we don't want
+ * to propose anything (the whole method then return {@code null}).
+ * @return the second element of the pair returned by {@code createUpdateProposal} (for the last call of that method
+ * if that method is called multiple times due to retries).
+ */
+ private static RowIterator doPaxos(CFMetaData metadata,
+ DecoratedKey key,
+ ConsistencyLevel consistencyForPaxos,
+ ConsistencyLevel consistencyForReplayCommits,
+ ConsistencyLevel consistencyForCommit,
+ ClientState state,
+ long queryStartNanoTime,
+ CASClientRequestMetrics casMetrics,
+ Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal)
+ throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
+ {
+ int contentions = 0;
+ try
+ {
+ consistencyForPaxos.validateForCas();
+ consistencyForReplayCommits.validateForCasCommit(metadata.ksName);
+ consistencyForCommit.validateForCasCommit(metadata.ksName);
+
+ long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
+ while (System.nanoTime() - queryStartNanoTime < timeout)
+ {
+ // for simplicity, we'll do a single liveness check at the start of each attempt
+ Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
+ List<InetAddress> liveEndpoints = p.left;
+ int requiredParticipants = p.right;
+
+ final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime,
+ key,
+ metadata,
+ liveEndpoints,
+ requiredParticipants,
+ consistencyForPaxos,
+ consistencyForReplayCommits,
+ casMetrics,
+ state);
+ final UUID ballot = pair.left;
+ contentions += pair.right;
+
+ Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.get();
+ // See method javadoc: null here is code for "stop here and return null".
+ if (proposalPair == null)
+ return null;
+
+ Commit proposal = Commit.newProposal(ballot, proposalPair.left);
+ Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
+ if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
+ {
+ // We skip committing accepted updates when they are empty. This is an optimization which works
+ // because we also skip replaying those same empty update in beginAndRepairPaxos (see the longer
+ // comment there). As empty update are somewhat common (serial reads and non-applying CAS propose
+ // them), this is worth bothering.
+ if (!proposal.update.isEmpty())
+ commitPaxos(proposal, consistencyForCommit, true);
+ RowIterator result = proposalPair.right;
+ if (result != null)
+ Tracing.trace("CAS did not apply");
+ else
+ Tracing.trace("CAS applied successfully");
+ return result;
+ }
+
+ Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
+ contentions++;
+ Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS);
+ // continue to retry
+ }
+
+ throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
+ }
+ finally
+ {
+ if(contentions > 0)
+ casMetrics.contention.update(contentions);
+ }
+ }
+
private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
{
final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -364,7 +458,7 @@ public class StorageProxy implements StorageProxyMBean
int requiredParticipants,
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit,
- final boolean isWrite,
+ CASClientRequestMetrics casMetrics,
ClientState state)
throws WriteTimeoutException, WriteFailureException
{
@@ -397,18 +491,31 @@ public class StorageProxy implements StorageProxyMBean
continue;
}
- Commit inProgress = summary.mostRecentInProgressCommitWithUpdate;
+ Commit inProgress = summary.mostRecentInProgressCommit;
Commit mostRecent = summary.mostRecentCommit;
// If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that
// needs to be completed, so do it.
+ // One special case we make is for update that are empty (which are proposed by serial reads and
+ // non-applying CAS). While we could handle those as any other updates, we can optimize this somewhat by
+ // neither committing those empty updates, nor replaying in-progress ones. The reasoning is this: as the
+ // update is empty, we have nothing to apply to storage in the commit phase, so the only reason to commit
+ // would be to update the MRC. However, if we skip replaying those empty updates, then we don't need to
+ // update the MRC for following updates to make progress (that is, if we didn't had the empty update skip
+ // below _but_ skipped updating the MRC on empty updates, then we'd be stuck always proposing that same
+ // empty update). And the reason skipping that replay is safe is that when an operation tries to propose
+ // an empty value, there can be only 2 cases:
+ // 1) the propose succeed, meaning a quorum of nodes accept it, in which case we are guaranteed no earlier
+ // pending operation can ever be replayed (which is what we want to guarantee with the empty update).
+ // 2) the propose does not succeed. But then the operation proposing the empty update will not succeed
+ // either (it will retry or ultimately timeout), and we're actually ok if earlier pending operation gets
+ // replayed in that case.
+ // Tl;dr, it is safe to skip committing empty updates _as long as_ we also skip replying them below. And
+ // doing is more efficient, so we do so.
if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
{
Tracing.trace("Finishing incomplete paxos round {}", inProgress);
- if(isWrite)
- casWriteMetrics.unfinishedCommit.inc();
- else
- casReadMetrics.unfinishedCommit.inc();
+ casMetrics.unfinishedCommit.inc();
Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos))
{
@@ -1535,21 +1642,31 @@ public class StorageProxy implements StorageProxyMBean
PartitionIterator result = null;
try
{
- // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read
- Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel);
- List<InetAddress> liveEndpoints = p.left;
- int requiredParticipants = p.right;
-
- // does the work of applying in-progress writes; throws UAE or timeout if it can't
- final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
- ? ConsistencyLevel.LOCAL_QUORUM
- : ConsistencyLevel.QUORUM;
+ final ConsistencyLevel consistencyForReplayCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
+ ? ConsistencyLevel.LOCAL_QUORUM
+ : ConsistencyLevel.QUORUM;
try
{
- final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state);
- if (pair.right > 0)
- casReadMetrics.contention.update(pair.right);
+ // Commit an empty update to make sure all in-progress updates that should be finished first is, _and_
+ // that no other in-progress can get resurrected.
+ Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer =
+ disableSerialReadLinearizability
+ ? () -> null
+ : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null);
+ // When replaying, we commit at quorum/local quorum, as we want to be sure the following read (done at
+ // quorum/local_quorum) sees any replayed updates. Our own update is however empty, and those don't even
+ // get committed due to an optimiation described in doPaxos/beingRepairAndPaxos, so the commit
+ // consistency is irrelevant (we use ANY just to emphasis that we don't wait on our commit).
+ doPaxos(metadata,
+ key,
+ consistencyLevel,
+ consistencyForReplayCommitOrFetch,
+ ConsistencyLevel.ANY,
+ state,
+ start,
+ casReadMetrics,
+ updateProposer);
}
catch (WriteTimeoutException e)
{
@@ -1560,7 +1677,7 @@ public class StorageProxy implements StorageProxyMBean
throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false);
}
- result = fetchRows(group.commands, consistencyForCommitOrFetch);
+ result = fetchRows(group.commands, consistencyForReplayCommitOrFetch);
}
catch (UnavailableException e)
{
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 95bd464..a3f491b 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -82,6 +82,12 @@ public class Commit
return this.ballot.equals(ballot);
}
+ /** Whether this is an empty commit, that is one with no updates. */
+ public boolean isEmpty()
+ {
+ return update.isEmpty();
+ }
+
public Mutation makeMutation()
{
return new Mutation(update);
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index ee1ba6a..8ab9a98 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -20,12 +20,9 @@
*/
package org.apache.cassandra.service.paxos;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Striped;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index ff81803..26e292e 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -46,7 +46,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
public boolean promised = true;
public Commit mostRecentCommit;
public Commit mostRecentInProgressCommit;
- public Commit mostRecentInProgressCommitWithUpdate;
private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
@@ -56,7 +55,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
// need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
mostRecentCommit = Commit.emptyCommit(key, metadata);
mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
- mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key, metadata);
}
public synchronized void response(MessageIn<PrepareResponse> message)
@@ -64,9 +62,8 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
PrepareResponse response = message.payload;
logger.trace("Prepare response {} from {}", response, message.from);
- // In case of clock skew, another node could be proposing with ballot that are quite a bit
- // older than our own. In that case, we record the more recent commit we've received to make
- // sure we re-prepare on an older ballot.
+ // We set the mostRecentInProgressCommit even if we're not promised as, in that case, the ballot of that commit
+ // will be used to avoid generating a ballot that has not chance to win on retry (think clock skew).
if (response.inProgressCommit.isAfter(mostRecentInProgressCommit))
mostRecentInProgressCommit = response.inProgressCommit;
@@ -82,11 +79,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
if (response.mostRecentCommit.isAfter(mostRecentCommit))
mostRecentCommit = response.mostRecentCommit;
- // If some response has an update, then we should replay the update with the highest ballot. So find
- // the the highest commit that actually have an update
- if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty())
- mostRecentInProgressCommitWithUpdate = response.inProgressCommit;
-
latch.countDown();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ff0095d..0d2eb4b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -647,55 +647,59 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
return YamlConfigurationLoader.fromMap(params, check, Config.class);
}
- private void initializeRing(ICluster cluster)
+ public static void addToRing(boolean bootstrapping, IInstance peer)
{
- // This should be done outside instance in order to avoid serializing config
- String partitionerName = config.getString("partitioner");
- List<String> initialTokens = new ArrayList<>();
- List<InetSocketAddress> hosts = new ArrayList<>();
- List<UUID> hostIds = new ArrayList<>();
- for (int i = 1 ; i <= cluster.size() ; ++i)
+ try
{
- IInstanceConfig config = cluster.get(i).config();
- initialTokens.add(config.getString("initial_token"));
- hosts.add(config.broadcastAddress());
- hostIds.add(config.hostId());
+ IInstanceConfig config = peer.config();
+ IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+ Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+ InetAddress address = config.broadcastAddress().getAddress();
+
+ UUID hostId = config.hostId();
+ Gossiper.runInGossipStageBlocking(() -> {
+ Gossiper.instance.initializeNodeUnsafe(address, hostId, 1);
+ Gossiper.instance.injectApplicationState(address,
+ ApplicationState.TOKENS,
+ new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
+ StorageService.instance.onChange(address,
+ ApplicationState.STATUS,
+ bootstrapping
+ ? new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
+ : new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
+ Gossiper.instance.realMarkAlive(address, Gossiper.instance.getEndpointStateForEndpoint(address));
+ });
+ int messagingVersion = peer.isShutdown()
+ ? MessagingService.current_version
+ : Math.min(MessagingService.current_version, peer.getMessagingVersion());
+ MessagingService.instance().setVersion(address, messagingVersion);
+
+ if (!bootstrapping)
+ assert StorageService.instance.getTokenMetadata().isMember(address);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
}
+ catch (Throwable e) // UnknownHostException
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ public static void removeFromRing(IInstance peer)
+ {
try
{
- IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName);
- StorageService storageService = StorageService.instance;
- List<Token> tokens = new ArrayList<>();
- for (String token : initialTokens)
- tokens.add(partitioner.getTokenFactory().fromString(token));
-
- for (int i = 0; i < tokens.size(); i++)
- {
- InetSocketAddress ep = hosts.get(i);
- UUID hostId = hostIds.get(i);
- Token token = tokens.get(i);
- Gossiper.runInGossipStageBlocking(() -> {
- Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1);
- Gossiper.instance.injectApplicationState(ep.getAddress(),
- ApplicationState.TOKENS,
- new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
- storageService.onChange(ep.getAddress(),
- ApplicationState.STATUS,
- new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
- Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
- });
- int messagingVersion = cluster.get(ep).isShutdown()
- ? MessagingService.current_version
- : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
- MessagingService.instance().setVersion(ep.getAddress(), messagingVersion);
- }
-
- // check that all nodes are in token metadata
- for (int i = 0; i < tokens.size(); ++i)
- assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
-
- storageService.setNormalModeUnsafe();
+ IInstanceConfig config = peer.config();
+ IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+ Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+ InetAddress address = config.broadcastAddress().getAddress();
+
+ Gossiper.runInGossipStageBlocking(() -> {
+ StorageService.instance.onChange(address,
+ ApplicationState.STATUS,
+ new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L));
+ Gossiper.instance.removeEndpoint(address);
+ });
+ PendingRangeCalculatorService.instance.blockUntilFinished();
}
catch (Throwable e) // UnknownHostException
{
@@ -703,6 +707,28 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
}
+ public static void addToRingNormal(IInstance peer)
+ {
+ addToRing(false, peer);
+ assert StorageService.instance.getTokenMetadata().isMember(peer.broadcastAddress().getAddress());
+ }
+
+ public static void addToRingBootstrapping(IInstance peer)
+ {
+ addToRing(true, peer);
+ }
+
+ private static void initializeRing(ICluster cluster)
+ {
+ for (int i = 1 ; i <= cluster.size() ; ++i)
+ addToRing(false, cluster.get(i));
+
+ for (int i = 1; i <= cluster.size(); ++i)
+ assert StorageService.instance.getTokenMetadata().isMember(cluster.get(i).broadcastAddress().getAddress());
+
+ StorageService.instance.setNormalModeUnsafe();
+ }
+
public Future<Void> shutdown()
{
return shutdown(true);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
new file mode 100644
index 0000000..0b1dce6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE;
+import static org.apache.cassandra.net.MessagingService.Verb.READ;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CASTest extends TestBaseImpl
+{
+ @Test
+ public void simpleUpdate() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 1));
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 1));
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 2));
+ }
+ }
+
+ @Test
+ public void incompletePrepare() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ IMessageFilters.Filter drop = cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop();
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ Assert.fail();
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+ }
+ drop.off();
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL));
+ }
+ }
+
+ @Test
+ public void incompletePropose() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop();
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ Assert.fail();
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+ }
+ drop1.off();
+ // make sure we encounter one of the in-progress proposals so we complete it
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop();
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 2));
+ }
+ }
+
+ @Test
+ public void incompleteCommit() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ Assert.fail();
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+ }
+ drop1.off();
+ // make sure we see one of the successful commits
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop();
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 2));
+ }
+ }
+
+ private int[] paxosAndReadVerbs() {
+ return new int[] {
+ MessagingService.Verb.PAXOS_PREPARE.ordinal(),
+ MessagingService.Verb.PAXOS_PROPOSE.ordinal(),
+ MessagingService.Verb.PAXOS_COMMIT.ordinal(),
+ MessagingService.Verb.READ.ordinal()
+ };
+ }
+
+ /**
+ * Base test to ensure that if a write times out but with a proposal accepted by some nodes (less then quorum), and
+ * a following SERIAL operation does not observe that write (the node having accepted it do not participate in that
+ * following operation), then that write is never applied, even when the nodes having accepted the original proposal
+ * participate.
+ *
+ * <p>In other words, if an operation timeout, it may or may not be applied, but that "fate" is persistently decided
+ * by the very SERIAL operation that "succeed" (in the sense of 'not timing out or throwing some other exception').
+ *
+ * @param postTimeoutOperation1 a SERIAL operation executed after an initial write that inserts the row [0, 0] times
+ * out. It is executed with a QUORUM of nodes that have _not_ see the timed out
+ * proposal, and so that operation should expect that the [0, 0] write has not taken
+ * place.
+ * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_ {@code postTimeoutOperation1}, with no
+ * write executed between the 2 operation. Contrarily to the 1st operation, the QORUM
+ * for this operation _will_ include the node that got the proposal for the [0, 0]
+ * insert but didn't participated to {@code postTimeoutOperation1}}. That operation
+ * should also no witness that [0, 0] write (since {@code postTimeoutOperation1}
+ * didn't).
+ * @param loseCommitOfOperation1 if {@code true}, the test will also drop the "commits" messages for
+ * {@code postTimeoutOperation1}. In general, the test should behave the same with or
+ * without that flag since a value is decided as soon as it has been "accepted by
+ * quorum" and the commits should always be properly replayed.
+ */
+ private void consistencyAfterWriteTimeoutTest(BiConsumer<String, ICoordinator> postTimeoutOperation1,
+ BiConsumer<String, ICoordinator> postTimeoutOperation2,
+ boolean loseCommitOfOperation1) throws IOException
+ {
+ try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L))))
+ {
+ String table = KEYSPACE + ".t";
+ cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY KEY, v int)");
+
+ // We do a CAS insertion, but have with the PROPOSE message dropped on node 1 and 2. The CAS will not get
+ // through and should timeout. Importantly, node 3 does receive and answer the PROPOSE.
+ IMessageFilters.Filter dropProposeFilter = cluster.filters()
+ .inbound()
+ .verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal())
+ .to(1, 2)
+ .drop();
+ try
+ {
+ // NOTE: the consistency below is the "commit" one, so it doesn't matter at all here.
+ cluster.coordinator(1)
+ .execute("INSERT INTO " + table + "(k, v) VALUES (0, 0) IF NOT EXISTS", ConsistencyLevel.ONE);
+ fail("The insertion should have timed-out");
+ }
+ catch (Exception e)
+ {
+ // We expect a write timeout. If we get one, the test can continue, otherwise, we rethrow. Note that we
+ // look at the root cause because the dtest framework effectively wrap the exception in a RuntimeException
+ // (we could just look at the immediate cause, but this feel a bit more resilient this way).
+ // TODO: we can't use an instanceof below because the WriteTimeoutException we get is from a different class
+ // loader than the one the test run under, and that's our poor-man work-around. This kind of things should
+ // be improved at the dtest API level.
+ if (!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException"))
+ throw e;
+ }
+ finally
+ {
+ dropProposeFilter.off();
+ }
+
+ // Isolates node 3 and executes the SERIAL operation. As neither node 1 or 2 got the initial insert proposal,
+ // there is nothing to "replay" and the operation should assert the table is still empty.
+ IMessageFilters.Filter ignoreNode3Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop();
+ IMessageFilters.Filter dropCommitFilter = null;
+ if (loseCommitOfOperation1)
+ {
+ dropCommitFilter = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop();
+ }
+ try
+ {
+ postTimeoutOperation1.accept(table, cluster.coordinator(1));
+ }
+ finally
+ {
+ ignoreNode3Filter.off();
+ if (dropCommitFilter != null)
+ dropCommitFilter.off();
+ }
+
+ // Node 3 is now back and we isolate node 2 to ensure the next read hits node 1 and 3.
+ // What we want to ensure is that despite node 3 having the initial insert in its paxos state in a position of
+ // being replayed, that insert is _not_ replayed (it would contradict serializability since the previous
+ // operation asserted nothing was inserted). It is this execution that failed before CASSANDRA-12126.
+ IMessageFilters.Filter ignoreNode2Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop();
+ try
+ {
+ postTimeoutOperation2.accept(table, cluster.coordinator(1));
+ }
+ finally
+ {
+ ignoreNode2Filter.off();
+ }
+ }
+ }
+
+ /**
+ * Tests that if a write timeouts and a following serial read does not see that write, then no following reads sees
+ * it, even if some nodes still have the write in their paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+ */
+ @Test
+ public void readConsistencyAfterWriteTimeoutTest() throws IOException
+ {
+ BiConsumer<String, ICoordinator> operation =
+ (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+ ConsistencyLevel.SERIAL));
+
+ consistencyAfterWriteTimeoutTest(operation, operation, false);
+ consistencyAfterWriteTimeoutTest(operation, operation, true);
+ }
+
+ /**
+ * Tests that if a write timeouts, then a following CAS succeed but does not apply in a way that indicate the write
+ * has not applied, then no following CAS can see that initial insert , even if some nodes still have the write in
+ * their paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+ */
+ @Test
+ public void nonApplyingCasConsistencyAfterWriteTimeout() throws IOException
+ {
+ // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits.
+ // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test.
+ BiConsumer<String, ICoordinator> operation =
+ (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+ ConsistencyLevel.ANY));
+ consistencyAfterWriteTimeoutTest(operation, operation, false);
+ consistencyAfterWriteTimeoutTest(operation, operation, true);
+ }
+
+ /**
+ * Tests that if a write timeouts and a following serial read does not see that write, then no following CAS see
+ * that initial insert, even if some nodes still have the write in their paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+ */
+ @Test
+ public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout() throws IOException
+ {
+ BiConsumer<String, ICoordinator> operation1 =
+ (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+ ConsistencyLevel.SERIAL));
+ BiConsumer<String, ICoordinator> operation2 =
+ (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+ ConsistencyLevel.QUORUM));
+ consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+ consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+ }
+
+ /**
+ * Tests that if a write timeouts and a following CAS succeed but does not apply in a way that indicate the write
+ * has not applied, then following serial reads do no see that write, even if some nodes still have the write in
+ * their paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+ */
+ @Test
+ public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout() throws IOException
+ {
+ // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits.
+ // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test.
+ BiConsumer<String, ICoordinator> operation1 =
+ (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+ ConsistencyLevel.ANY));
+ BiConsumer<String, ICoordinator> operation2 =
+ (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+ ConsistencyLevel.SERIAL));
+ consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+ consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+ }
+
+ // TODO: this shoud probably be moved into the dtest API.
+ private void assertCasNotApplied(Object[][] resultSet)
+ {
+ assertFalse("Expected a CAS resultSet (with at least application result) but got an empty one.",
+ resultSet.length == 0);
+ assertFalse("Invalid empty first row in CAS resultSet.", resultSet[0].length == 0);
+ Object wasApplied = resultSet[0][0];
+ assertTrue("Expected 1st column of CAS resultSet to be a boolean, but got a " + wasApplied.getClass(),
+ wasApplied instanceof Boolean);
+ assertFalse("Expected CAS to not be applied, but was applied.", (Boolean)wasApplied);
+ }
+
+ /**
+ * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful
+ * conflicting with another successful write performed by a node that did witness the range movement
+ * Prepare, Propose and Commit A to {1, 2}
+ * Range moves to {2, 3, 4}
+ * Prepare and Propose B (=> !A) to {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testSuccessfulWriteBeforeRangeMovement() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {1} is unaware (yet) that {4} is an owner of the token
+ cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} => {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(true));
+
+ for (int i = 1 ; i <= 3 ; ++i)
+ cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {4} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+ assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(false, pk, 1, 1, null));
+ }
+ }
+
+ /**
+ * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful
+ * conflicting with another successful write performed by a node that did witness the range movement
+ * - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X)
+ * - X: Prepare, Propose and Commit A to {3, 4}
+ * - !X: Prepare and Propose B (=>!A) to {1, 2}
+ */
+ @Ignore
+ @Test
+ public void testConflictingWritesWithStaleRingInformation() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {1} is unaware (yet) that {4} is an owner of the token
+ cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
+ // {4} promises, accepts and commits on !{2} => {3, 4}
+ int pk = pk(cluster, 1, 2);
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop();
+ assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // {1} promises, accepts and commmits on !{3} => {1, 2}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(false, pk, 1, 1, null));
+ }
+ }
+
+ /**
+ * Successful write during range movement, not witnessed by read after range movement.
+ * Very similar to {@link #testConflictingWritesWithStaleRingInformation}.
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range movement witnessed by !X
+ * - Any: Prepare and Read from {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testSucccessfulWriteDuringRangeMovementFollowedByRead() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3} => {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+ row(pk, 1, 1));
+ }
+ }
+
+ /**
+ * Successful write during range movement not witnessed by write after range movement
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range movement witnessed by !X
+ * - Any: Prepare and Propose to {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(false, pk, 1, 1, null));
+
+ // TODO: repair and verify base table state
+ }
+ }
+
+ /**
+ * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation
+ * being performed with stale ring information.
+ * This is a particular special case of stale ring information sequencing, which probably would be resolved
+ * by fixing each of the more isolated cases (but is unique, so deserving of its own test case).
+ * See CASSANDRA-15745
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - X: Prepare to {2, 3, 4}
+ * - X: Propose to {4}
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range move visible by !X
+ * - Any: Prepare and Read from {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop();
+ try
+ {
+ cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk);
+ Assert.assertTrue(false);
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+ }
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+ row(pk, 1, null, 2));
+ }
+ }
+
+ /**
+ * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation
+ * being performed with stale ring information.
+ * This is a particular special case of stale ring information sequencing, which probably would be resolved
+ * by fixing each of the more isolated cases (but is unique, so deserving of its own test case).
+ * See CASSANDRA-15745
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - X: Prepare to {2, 3, 4}
+ * - X: Propose to {4}
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range move visible by !X
+ * - Any: Prepare and Propose to {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop();
+ try
+ {
+ cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk);
+ Assert.assertTrue(false);
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+ }
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+ cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+ row(false, 5, 1, null, 2));
+ }
+ }
+
+ private static int pk(Cluster cluster, int lb, int ub)
+ {
+ return pk(cluster.get(lb), cluster.get(ub));
+ }
+
+ private static int pk(IInstance lb, IInstance ub)
+ {
+ return pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")),
+ Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token")));
+ }
+
+ private static int pk(Token lb, Token ub)
+ {
+ int pk = 0;
+ Token pkt;
+ while (lb.compareTo(pkt = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || ub.compareTo(pkt) < 0)
+ ++pk;
+ return pk;
+ }
+
+ private static void debugOwnership(Cluster cluster, int pk)
+ {
+ for (int i = 1 ; i <= cluster.size() ; ++i)
+ System.out.println(i + ": " + cluster.get(i).appliesOnInstance((Integer v) -> StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE, Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v))))
+ .apply(pk));
+ }
+
+ private static void debugPaxosState(Cluster cluster, int pk)
+ {
+ UUID cfid = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId);
+ for (int i = 1 ; i <= cluster.size() ; ++i)
+ for (Object[] row : cluster.get(i).executeInternal("select in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid))
+ System.out.println(i + ": " + (row[0] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[2])));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org