You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2020/11/27 16:10:34 UTC

[cassandra] branch cassandra-3.0 updated: Fix serial read/non-applying CAS linearizability

This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 2d0b168  Fix serial read/non-applying CAS linearizability
2d0b168 is described below

commit 2d0b16804785660e8515aca9944784fb3733c619
Author: Sylvain Lebresne <le...@gmail.com>
AuthorDate: Wed May 20 14:56:02 2020 +0200

    Fix serial read/non-applying CAS linearizability
    
    Before this patch, a SERIAL read or a non-applying CAS replay any
    in-progress commit by calling `beginAndRepairPaxos`, but only a quorum
    of nodes is contacted, so a minority of nodes could have an unfinished
    in-progress proposal in their Paxos state. If such in-progress proposal
    is not replayed by a SERIAL read/non-applying CAS, it should never be
    replayed by any following operation as that would break serializability,
    but nothing was done to avoid this.
    
    This patch ensures that both a SERIAL read or a non-applying CAS commit
    an empty update before succeeding. This ensures that no prior incomplete
    in-progress proposal can be replayed (such proposal will be discarded as
    older than the last committed ballot).
    
    As this fix has a performance impact on SERIAL reads, a flag is provided
    to disable the new code (even if this is discouraged by a warning).
    
    Patch by Sylvain Lebresne, reviewed by Benjamin Lerer for CASSANDRA-12126
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   8 +
 .../org/apache/cassandra/service/StorageProxy.java | 231 +++++--
 .../org/apache/cassandra/service/paxos/Commit.java |   6 +
 .../apache/cassandra/service/paxos/PaxosState.java |   3 -
 .../cassandra/service/paxos/PrepareCallback.java   |  12 +-
 .../cassandra/distributed/impl/Instance.java       | 112 ++--
 .../apache/cassandra/distributed/test/CASTest.java | 679 +++++++++++++++++++++
 8 files changed, 939 insertions(+), 113 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 546fd98..d6f406d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.24:
+ * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
  * Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294)
  * Improved check of num_tokens against the length of initial_token (CASSANDRA-14477)
  * Fix a race condition on ColumnFamilyStore and TableMetrics (CASSANDRA-16228)
diff --git a/NEWS.txt b/NEWS.txt
index 7034c2c..6cc5e84 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -47,6 +47,14 @@ using the provided 'sstableupgrade' tool.
 
 Upgrading
 ---------
+    - This release fix a correctness issue with SERIAL reads, and LWT writes that do not apply.
+      Unfortunately, this fix has a performance impact on read performance at the SERIAL or
+      LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads, the performance
+      impact may be noticeable and may also result in an increased of timeouts. For that
+      reason, a opt-in system property has been added to disable the fix:
+        -Dcassandra.unsafe.disable-serial-reads-linearizability=true
+      Use this flag at your own risk as it revert SERIAL reads to the incorrect behavior of
+      previous versions. See CASSANDRA-12126 for details.
     - In cassandra.yaml, when using vnodes num_tokens must be defined if initial_token is defined.
       If it is not defined, or not equal to the numbers of tokens defined in initial_tokens,
       the node will not start. See CASSANDRA-14477 for details.
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c7888c4..91dd991 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,6 +24,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import com.google.common.base.Predicate;
 import com.google.common.annotations.VisibleForTesting;
@@ -109,6 +110,10 @@ public class StorageProxy implements StorageProxyMBean
      */
     private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1, Integer.getInteger("cassandra.max_concurrent_range_requests", FBUtilities.getAvailableProcessors() * 10));
 
+    private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY = "cassandra.unsafe.disable-serial-reads-linearizability";
+    private static final boolean disableSerialReadLinearizability =
+        Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY, "false"));
+
     private StorageProxy()
     {
     }
@@ -163,6 +168,16 @@ public class StorageProxy implements StorageProxyMBean
                             .execute(counterWriteTask(mutation, targets, responseHandler, localDataCenter));
             }
         };
+
+        if (disableSerialReadLinearizability)
+        {
+            logger.warn("This node was started with -D{}. SERIAL (and LOCAL_SERIAL) reads coordinated by this node " +
+                        "will not offer linearizability (see CASSANDRA-12126 for details on what this mean) with " +
+                        "respect to other SERIAL operations. Please note that, with this flag, SERIAL reads will be " +
+                        "slower than QUORUM reads, yet offer no more guarantee. This flag should only be used in " +
+                        "the restricted case of upgrading from a pre-CASSANDRA-12126 version, and only if you " +
+                        "understand the tradeoff.", DISABLE_SERIAL_READ_LINEARIZABILITY_KEY);
+        }
     }
 
     /**
@@ -216,26 +231,12 @@ public class StorageProxy implements StorageProxyMBean
     throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
     {
         final long start = System.nanoTime();
-        int contentions = 0;
         try
         {
-            consistencyForPaxos.validateForCas();
-            consistencyForCommit.validateForCasCommit(keyspaceName);
-
             CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
 
-            long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
-            while (System.nanoTime() - start < timeout)
+            Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () ->
             {
-                // for simplicity, we'll do a single liveness check at the start of each attempt
-                Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
-                List<InetAddress> liveEndpoints = p.left;
-                int requiredParticipants = p.right;
-
-                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
-                final UUID ballot = pair.left;
-                contentions += pair.right;
-
                 // read the current values and check they validate the conditions
                 Tracing.trace("Reading existing values for CAS precondition");
                 SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
@@ -251,11 +252,10 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     Tracing.trace("CAS precondition does not match current values {}", current);
                     casWriteMetrics.conditionNotMet.inc();
-                    return current.rowIterator();
+                    return Pair.create(PartitionUpdate.emptyUpdate(metadata, key), current.rowIterator());
                 }
 
-                // finish the paxos round w/ the desired updates
-                // TODO turn null updates into delete?
+                // Create the desired updates
                 PartitionUpdate updates = request.makeUpdates(current);
 
                 // Apply triggers to cas updates. A consideration here is that
@@ -267,47 +267,141 @@ public class StorageProxy implements StorageProxyMBean
                 // InvalidRequestException) any which aren't.
                 updates = TriggerExecutor.instance.execute(updates);
 
+                return Pair.create(updates, null);
+            };
 
-                Commit proposal = Commit.newProposal(ballot, updates);
-                Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
-                if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
-                {
-                    commitPaxos(proposal, consistencyForCommit, true);
-                    Tracing.trace("CAS successful");
-                    return null;
-                }
+            return doPaxos(metadata,
+                           key,
+                           consistencyForPaxos,
+                           consistencyForCommit,
+                           consistencyForCommit,
+                           state,
+                           start,
+                           casWriteMetrics,
+                           updateProposer);
 
-                Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
-                contentions++;
-                Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS);
-                // continue to retry
-            }
-
-            throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
         }
-        catch (WriteTimeoutException|ReadTimeoutException e)
+        catch (WriteTimeoutException | ReadTimeoutException e)
         {
             casWriteMetrics.timeouts.mark();
             throw e;
         }
-        catch (WriteFailureException|ReadFailureException e)
+        catch (WriteFailureException | ReadFailureException e)
         {
             casWriteMetrics.failures.mark();
             throw e;
         }
-        catch(UnavailableException e)
+        catch (UnavailableException e)
         {
             casWriteMetrics.unavailables.mark();
             throw e;
         }
         finally
         {
-            if(contentions > 0)
-                casWriteMetrics.contention.update(contentions);
             casWriteMetrics.addNano(System.nanoTime() - start);
         }
     }
 
+    /**
+     * Performs the Paxos rounds for a given proposal, retrying when preempted until the timeout.
+     *
+     * <p>The main 'configurable' of this method is the {@code createUpdateProposal} method: it is called by the method
+     * once a ballot has been successfully 'prepared' to generate the update to 'propose' (and commit if the proposal is
+     * successful). That method also generates the result that the whole method will return. Note that due to retrying,
+     * this method may be called multiple times and does not have to return the same results.
+     *
+     * @param metadata the table to update with Paxos.
+     * @param key the partition updated.
+     * @param consistencyForPaxos the serial consistency of the operation (either {@link ConsistencyLevel#SERIAL} or
+     *     {@link ConsistencyLevel#LOCAL_SERIAL}).
+     * @param consistencyForReplayCommits the consistency for the commit phase of "replayed" in-progress operations.
+     * @param consistencyForCommit the consistency for the commit phase of _this_ operation update.
+     * @param state the client state.
+     * @param queryStartNanoTime the nano time for the start of the query this is part of.
+     * @param casMetrics the metrics to update for this operation.
+     * @param createUpdateProposal method called after a successful 'prepare' phase to obtain 1) the actual update of
+     *     this operation and 2) the result that the whole method should return. This can return {@code null} in the
+     *     special where, after having "prepared" (and thus potentially replayed in-progress upgdates), we don't want
+     *     to propose anything (the whole method then return {@code null}).
+     * @return the second element of the pair returned by {@code createUpdateProposal} (for the last call of that method
+     *     if that method is called multiple times due to retries).
+     */
+    private static RowIterator doPaxos(CFMetaData metadata,
+                                       DecoratedKey key,
+                                       ConsistencyLevel consistencyForPaxos,
+                                       ConsistencyLevel consistencyForReplayCommits,
+                                       ConsistencyLevel consistencyForCommit,
+                                       ClientState state,
+                                       long queryStartNanoTime,
+                                       CASClientRequestMetrics casMetrics,
+                                       Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal)
+    throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
+    {
+        int contentions = 0;
+        try
+        {
+            consistencyForPaxos.validateForCas();
+            consistencyForReplayCommits.validateForCasCommit(metadata.ksName);
+            consistencyForCommit.validateForCasCommit(metadata.ksName);
+
+            long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
+            while (System.nanoTime() - queryStartNanoTime < timeout)
+            {
+                // for simplicity, we'll do a single liveness check at the start of each attempt
+                Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
+                List<InetAddress> liveEndpoints = p.left;
+                int requiredParticipants = p.right;
+
+                final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime,
+                                                                     key,
+                                                                     metadata,
+                                                                     liveEndpoints,
+                                                                     requiredParticipants,
+                                                                     consistencyForPaxos,
+                                                                     consistencyForReplayCommits,
+                                                                     casMetrics,
+                                                                     state);
+                final UUID ballot = pair.left;
+                contentions += pair.right;
+
+                Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.get();
+                // See method javadoc: null here is code for "stop here and return null".
+                if (proposalPair == null)
+                    return null;
+
+                Commit proposal = Commit.newProposal(ballot, proposalPair.left);
+                Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
+                if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
+                {
+                    // We skip committing accepted updates when they are empty. This is an optimization which works
+                    // because we also skip replaying those same empty update in beginAndRepairPaxos (see the longer
+                    // comment there). As empty update are somewhat common (serial reads and non-applying CAS propose
+                    // them), this is worth bothering.
+                    if (!proposal.update.isEmpty())
+                        commitPaxos(proposal, consistencyForCommit, true);
+                    RowIterator result = proposalPair.right;
+                    if (result != null)
+                        Tracing.trace("CAS did not apply");
+                    else
+                        Tracing.trace("CAS applied successfully");
+                    return result;
+                }
+
+                Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
+                contentions++;
+                Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS);
+                // continue to retry
+            }
+
+            throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
+        }
+        finally
+        {
+            if(contentions > 0)
+                casMetrics.contention.update(contentions);
+        }
+    }
+
     private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
     {
         final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -364,7 +458,7 @@ public class StorageProxy implements StorageProxyMBean
                                                            int requiredParticipants,
                                                            ConsistencyLevel consistencyForPaxos,
                                                            ConsistencyLevel consistencyForCommit,
-                                                           final boolean isWrite,
+                                                           CASClientRequestMetrics casMetrics,
                                                            ClientState state)
     throws WriteTimeoutException, WriteFailureException
     {
@@ -397,18 +491,31 @@ public class StorageProxy implements StorageProxyMBean
                 continue;
             }
 
-            Commit inProgress = summary.mostRecentInProgressCommitWithUpdate;
+            Commit inProgress = summary.mostRecentInProgressCommit;
             Commit mostRecent = summary.mostRecentCommit;
 
             // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that
             // needs to be completed, so do it.
+            // One special case we make is for update that are empty (which are proposed by serial reads and
+            // non-applying CAS). While we could handle those as any other updates, we can optimize this somewhat by
+            // neither committing those empty updates, nor replaying in-progress ones. The reasoning is this: as the
+            // update is empty, we have nothing to apply to storage in the commit phase, so the only reason to commit
+            // would be to update the MRC. However, if we skip replaying those empty updates, then we don't need to
+            // update the MRC for following updates to make progress (that is, if we didn't had the empty update skip
+            // below _but_ skipped updating the MRC on empty updates, then we'd be stuck always proposing that same
+            // empty update). And the reason skipping that replay is safe is that when an operation tries to propose
+            // an empty value, there can be only 2 cases:
+            //  1) the propose succeed, meaning a quorum of nodes accept it, in which case we are guaranteed no earlier
+            //     pending operation can ever be replayed (which is what we want to guarantee with the empty update).
+            //  2) the propose does not succeed. But then the operation proposing the empty update will not succeed
+            //     either (it will retry or ultimately timeout), and we're actually ok if earlier pending operation gets
+            //     replayed in that case.
+            // Tl;dr, it is safe to skip committing empty updates _as long as_ we also skip replying them below. And
+            // doing is more efficient, so we do so.
             if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
             {
                 Tracing.trace("Finishing incomplete paxos round {}", inProgress);
-                if(isWrite)
-                    casWriteMetrics.unfinishedCommit.inc();
-                else
-                    casReadMetrics.unfinishedCommit.inc();
+                casMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
                 if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos))
                 {
@@ -1535,21 +1642,31 @@ public class StorageProxy implements StorageProxyMBean
         PartitionIterator result = null;
         try
         {
-            // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read
-            Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel);
-            List<InetAddress> liveEndpoints = p.left;
-            int requiredParticipants = p.right;
-
-            // does the work of applying in-progress writes; throws UAE or timeout if it can't
-            final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
-                                                                                   ? ConsistencyLevel.LOCAL_QUORUM
-                                                                                   : ConsistencyLevel.QUORUM;
+            final ConsistencyLevel consistencyForReplayCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
+                                                                       ? ConsistencyLevel.LOCAL_QUORUM
+                                                                       : ConsistencyLevel.QUORUM;
 
             try
             {
-                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state);
-                if (pair.right > 0)
-                    casReadMetrics.contention.update(pair.right);
+                // Commit an empty update to make sure all in-progress updates that should be finished first is, _and_
+                // that no other in-progress can get resurrected.
+                Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer =
+                    disableSerialReadLinearizability
+                    ? () -> null
+                    : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null);
+                // When replaying, we commit at quorum/local quorum, as we want to be sure the following read (done at
+                // quorum/local_quorum) sees any replayed updates. Our own update is however empty, and those don't even
+                // get committed due to an optimiation described in doPaxos/beingRepairAndPaxos, so the commit
+                // consistency is irrelevant (we use ANY just to emphasis that we don't wait on our commit).
+                doPaxos(metadata,
+                        key,
+                        consistencyLevel,
+                        consistencyForReplayCommitOrFetch,
+                        ConsistencyLevel.ANY,
+                        state,
+                        start,
+                        casReadMetrics,
+                        updateProposer);
             }
             catch (WriteTimeoutException e)
             {
@@ -1560,7 +1677,7 @@ public class StorageProxy implements StorageProxyMBean
                 throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false);
             }
 
-            result = fetchRows(group.commands, consistencyForCommitOrFetch);
+            result = fetchRows(group.commands, consistencyForReplayCommitOrFetch);
         }
         catch (UnavailableException e)
         {
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 95bd464..a3f491b 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -82,6 +82,12 @@ public class Commit
         return this.ballot.equals(ballot);
     }
 
+    /** Whether this is an empty commit, that is one with no updates. */
+    public boolean isEmpty()
+    {
+        return update.isEmpty();
+    }
+
     public Mutation makeMutation()
     {
         return new Mutation(update);
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index ee1ba6a..8ab9a98 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -20,12 +20,9 @@
  */
 package org.apache.cassandra.service.paxos;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
 
-import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.Striped;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index ff81803..26e292e 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -46,7 +46,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
     public boolean promised = true;
     public Commit mostRecentCommit;
     public Commit mostRecentInProgressCommit;
-    public Commit mostRecentInProgressCommitWithUpdate;
 
     private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
 
@@ -56,7 +55,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
         mostRecentCommit = Commit.emptyCommit(key, metadata);
         mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
-        mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key, metadata);
     }
 
     public synchronized void response(MessageIn<PrepareResponse> message)
@@ -64,9 +62,8 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         PrepareResponse response = message.payload;
         logger.trace("Prepare response {} from {}", response, message.from);
 
-        // In case of clock skew, another node could be proposing with ballot that are quite a bit
-        // older than our own. In that case, we record the more recent commit we've received to make
-        // sure we re-prepare on an older ballot.
+        // We set the mostRecentInProgressCommit even if we're not promised as, in that case, the ballot of that commit
+        // will be used to avoid generating a ballot that has not chance to win on retry (think clock skew).
         if (response.inProgressCommit.isAfter(mostRecentInProgressCommit))
             mostRecentInProgressCommit = response.inProgressCommit;
 
@@ -82,11 +79,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         if (response.mostRecentCommit.isAfter(mostRecentCommit))
             mostRecentCommit = response.mostRecentCommit;
 
-        // If some response has an update, then we should replay the update with the highest ballot. So find
-        // the the highest commit that actually have an update
-        if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty())
-            mostRecentInProgressCommitWithUpdate = response.inProgressCommit;
-
         latch.countDown();
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ff0095d..0d2eb4b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -647,55 +647,59 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         return YamlConfigurationLoader.fromMap(params, check, Config.class);
     }
 
-    private void initializeRing(ICluster cluster)
+    public static void addToRing(boolean bootstrapping, IInstance peer)
     {
-        // This should be done outside instance in order to avoid serializing config
-        String partitionerName = config.getString("partitioner");
-        List<String> initialTokens = new ArrayList<>();
-        List<InetSocketAddress> hosts = new ArrayList<>();
-        List<UUID> hostIds = new ArrayList<>();
-        for (int i = 1 ; i <= cluster.size() ; ++i)
+        try
         {
-            IInstanceConfig config = cluster.get(i).config();
-            initialTokens.add(config.getString("initial_token"));
-            hosts.add(config.broadcastAddress());
-            hostIds.add(config.hostId());
+            IInstanceConfig config = peer.config();
+            IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+            Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+            InetAddress address = config.broadcastAddress().getAddress();
+
+            UUID hostId = config.hostId();
+            Gossiper.runInGossipStageBlocking(() -> {
+                Gossiper.instance.initializeNodeUnsafe(address, hostId, 1);
+                Gossiper.instance.injectApplicationState(address,
+                        ApplicationState.TOKENS,
+                        new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
+                StorageService.instance.onChange(address,
+                        ApplicationState.STATUS,
+                        bootstrapping
+                                ? new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
+                                : new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
+                Gossiper.instance.realMarkAlive(address, Gossiper.instance.getEndpointStateForEndpoint(address));
+            });
+            int messagingVersion = peer.isShutdown()
+                    ? MessagingService.current_version
+                    : Math.min(MessagingService.current_version, peer.getMessagingVersion());
+            MessagingService.instance().setVersion(address, messagingVersion);
+
+            if (!bootstrapping)
+                assert StorageService.instance.getTokenMetadata().isMember(address);
+            PendingRangeCalculatorService.instance.blockUntilFinished();
         }
+        catch (Throwable e) // UnknownHostException
+        {
+            throw new RuntimeException(e);
+        }
+    }
 
+    public static void removeFromRing(IInstance peer)
+    {
         try
         {
-            IPartitioner partitioner = FBUtilities.newPartitioner(partitionerName);
-            StorageService storageService = StorageService.instance;
-            List<Token> tokens = new ArrayList<>();
-            for (String token : initialTokens)
-                tokens.add(partitioner.getTokenFactory().fromString(token));
-
-            for (int i = 0; i < tokens.size(); i++)
-            {
-                InetSocketAddress ep = hosts.get(i);
-                UUID hostId = hostIds.get(i);
-                Token token = tokens.get(i);
-                Gossiper.runInGossipStageBlocking(() -> {
-                    Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1);
-                    Gossiper.instance.injectApplicationState(ep.getAddress(),
-                                                             ApplicationState.TOKENS,
-                                                             new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
-                    storageService.onChange(ep.getAddress(),
-                                            ApplicationState.STATUS,
-                                            new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                    Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
-                });
-                int messagingVersion = cluster.get(ep).isShutdown()
-                                       ? MessagingService.current_version
-                                       : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
-                MessagingService.instance().setVersion(ep.getAddress(), messagingVersion);
-            }
-
-            // check that all nodes are in token metadata
-            for (int i = 0; i < tokens.size(); ++i)
-                assert storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
-
-            storageService.setNormalModeUnsafe();
+            IInstanceConfig config = peer.config();
+            IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+            Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+            InetAddress address = config.broadcastAddress().getAddress();
+
+            Gossiper.runInGossipStageBlocking(() -> {
+                StorageService.instance.onChange(address,
+                        ApplicationState.STATUS,
+                        new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L));
+                Gossiper.instance.removeEndpoint(address);
+            });
+            PendingRangeCalculatorService.instance.blockUntilFinished();
         }
         catch (Throwable e) // UnknownHostException
         {
@@ -703,6 +707,28 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         }
     }
 
+    public static void addToRingNormal(IInstance peer)
+    {
+        addToRing(false, peer);
+        assert StorageService.instance.getTokenMetadata().isMember(peer.broadcastAddress().getAddress());
+    }
+
+    public static void addToRingBootstrapping(IInstance peer)
+    {
+        addToRing(true, peer);
+    }
+
+    private static void initializeRing(ICluster cluster)
+    {
+        for (int i = 1 ; i <= cluster.size() ; ++i)
+            addToRing(false, cluster.get(i));
+
+        for (int i = 1; i <= cluster.size(); ++i)
+            assert StorageService.instance.getTokenMetadata().isMember(cluster.get(i).broadcastAddress().getAddress());
+
+        StorageService.instance.setNormalModeUnsafe();
+    }
+
     public Future<Void> shutdown()
     {
         return shutdown(true);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
new file mode 100644
index 0000000..0b1dce6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE;
+import static org.apache.cassandra.net.MessagingService.Verb.READ;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CASTest extends TestBaseImpl
+{
+    @Test
+    public void simpleUpdate() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                    row(1, 1, 1));
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                    row(1, 1, 1));
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                    row(1, 1, 2));
+        }
+    }
+
+    @Test
+    public void incompletePrepare() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            IMessageFilters.Filter drop = cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop();
+            try
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                Assert.fail();
+            }
+            catch (RuntimeException wrapped)
+            {
+                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+            }
+            drop.off();
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL));
+        }
+    }
+
+    @Test
+    public void incompletePropose() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop();
+            try
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                Assert.fail();
+            }
+            catch (RuntimeException wrapped)
+            {
+                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+            }
+            drop1.off();
+            // make sure we encounter one of the in-progress proposals so we complete it
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop();
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                    row(1, 1, 2));
+        }
+    }
+
+    @Test
+    public void incompleteCommit() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+            try
+            {
+                cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                Assert.fail();
+            }
+            catch (RuntimeException wrapped)
+            {
+                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+            }
+            drop1.off();
+            // make sure we see one of the successful commits
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop();
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                    row(1, 1, 2));
+        }
+    }
+
+    private int[] paxosAndReadVerbs() {
+        return new int[] {
+            MessagingService.Verb.PAXOS_PREPARE.ordinal(),
+            MessagingService.Verb.PAXOS_PROPOSE.ordinal(),
+            MessagingService.Verb.PAXOS_COMMIT.ordinal(),
+            MessagingService.Verb.READ.ordinal()
+        };
+    }
+
+    /**
+     * Base test to ensure that if a write times out but with a proposal accepted by some nodes (less then quorum), and
+     * a following SERIAL operation does not observe that write (the node having accepted it do not participate in that
+     * following operation), then that write is never applied, even when the nodes having accepted the original proposal
+     * participate.
+     *
+     * <p>In other words, if an operation timeout, it may or may not be applied, but that "fate" is persistently decided
+     * by the very SERIAL operation that "succeed" (in the sense of 'not timing out or throwing some other exception').
+     *
+     * @param postTimeoutOperation1 a SERIAL operation executed after an initial write that inserts the row [0, 0] times
+     *                              out. It is executed with a QUORUM of nodes that have _not_ see the timed out
+     *                              proposal, and so that operation should expect that the [0, 0] write has not taken
+     *                              place.
+     * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_ {@code postTimeoutOperation1}, with no
+     *                              write executed between the 2 operation. Contrarily to the 1st operation, the QORUM
+     *                              for this operation _will_ include the node that got the proposal for the [0, 0]
+     *                              insert but didn't participated to {@code postTimeoutOperation1}}. That operation
+     *                              should also no witness that [0, 0] write (since {@code postTimeoutOperation1}
+     *                              didn't).
+     * @param loseCommitOfOperation1 if {@code true}, the test will also drop the "commits" messages for
+     *                               {@code postTimeoutOperation1}. In general, the test should behave the same with or
+     *                               without that flag since a value is decided as soon as it has been "accepted by
+     *                               quorum" and the commits should always be properly replayed.
+     */
+    private void consistencyAfterWriteTimeoutTest(BiConsumer<String, ICoordinator> postTimeoutOperation1,
+                                                  BiConsumer<String, ICoordinator> postTimeoutOperation2,
+                                                  boolean loseCommitOfOperation1) throws IOException
+    {
+        try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L)
+                                                                      .set("cas_contention_timeout_in_ms", 200L))))
+        {
+            String table = KEYSPACE + ".t";
+            cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY KEY, v int)");
+
+            // We do a CAS insertion, but have with the PROPOSE message dropped on node 1 and 2. The CAS will not get
+            // through and should timeout. Importantly, node 3 does receive and answer the PROPOSE.
+            IMessageFilters.Filter dropProposeFilter = cluster.filters()
+                                                              .inbound()
+                                                              .verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal())
+                                                              .to(1, 2)
+                                                              .drop();
+            try
+            {
+                // NOTE: the consistency below is the "commit" one, so it doesn't matter at all here.
+                cluster.coordinator(1)
+                       .execute("INSERT INTO " + table + "(k, v) VALUES (0, 0) IF NOT EXISTS", ConsistencyLevel.ONE);
+                fail("The insertion should have timed-out");
+            }
+            catch (Exception e)
+            {
+                // We expect a write timeout. If we get one, the test can continue, otherwise, we rethrow. Note that we
+                // look at the root cause because the dtest framework effectively wrap the exception in a RuntimeException
+                // (we could just look at the immediate cause, but this feel a bit more resilient this way).
+                // TODO: we can't use an instanceof below because the WriteTimeoutException we get is from a different class
+                //  loader than the one the test run under, and that's our poor-man work-around. This kind of things should
+                //  be improved at the dtest API level.
+                if (!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException"))
+                    throw e;
+            }
+            finally
+            {
+                dropProposeFilter.off();
+            }
+
+            // Isolates node 3 and executes the SERIAL operation. As neither node 1 or 2 got the initial insert proposal,
+            // there is nothing to "replay" and the operation should assert the table is still empty.
+            IMessageFilters.Filter ignoreNode3Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop();
+            IMessageFilters.Filter dropCommitFilter = null;
+            if (loseCommitOfOperation1)
+            {
+                dropCommitFilter = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop();
+            }
+            try
+            {
+                postTimeoutOperation1.accept(table, cluster.coordinator(1));
+            }
+            finally
+            {
+                ignoreNode3Filter.off();
+                if (dropCommitFilter != null)
+                    dropCommitFilter.off();
+            }
+
+            // Node 3 is now back and we isolate node 2 to ensure the next read hits node 1 and 3.
+            // What we want to ensure is that despite node 3 having the initial insert in its paxos state in a position of
+            // being replayed, that insert is _not_ replayed (it would contradict serializability since the previous
+            // operation asserted nothing was inserted). It is this execution that failed before CASSANDRA-12126.
+            IMessageFilters.Filter ignoreNode2Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop();
+            try
+            {
+                postTimeoutOperation2.accept(table, cluster.coordinator(1));
+            }
+            finally
+            {
+                ignoreNode2Filter.off();
+            }
+        }
+    }
+
+    /**
+     * Tests that if a write timeouts and a following serial read does not see that write, then no following reads sees
+     * it, even if some nodes still have the write in their paxos state.
+     *
+     * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+     */
+    @Test
+    public void readConsistencyAfterWriteTimeoutTest() throws IOException
+    {
+        BiConsumer<String, ICoordinator> operation =
+            (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+                                                                   ConsistencyLevel.SERIAL));
+
+        consistencyAfterWriteTimeoutTest(operation, operation, false);
+        consistencyAfterWriteTimeoutTest(operation, operation, true);
+    }
+
+    /**
+     * Tests that if a write timeouts, then a following CAS succeed but does not apply in a way that indicate the write
+     * has not applied, then no following CAS can see that initial insert , even if some nodes still have the write in
+     * their paxos state.
+     *
+     * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+     */
+    @Test
+    public void nonApplyingCasConsistencyAfterWriteTimeout() throws IOException
+    {
+        // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits.
+        // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test.
+        BiConsumer<String, ICoordinator> operation =
+            (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+                                                                            ConsistencyLevel.ANY));
+        consistencyAfterWriteTimeoutTest(operation, operation, false);
+        consistencyAfterWriteTimeoutTest(operation, operation, true);
+    }
+
+    /**
+     * Tests that if a write timeouts and a following serial read does not see that write, then no following CAS see
+     * that initial insert, even if some nodes still have the write in their paxos state.
+     *
+     * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+     */
+    @Test
+    public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout() throws IOException
+    {
+        BiConsumer<String, ICoordinator> operation1 =
+            (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+                                                                   ConsistencyLevel.SERIAL));
+        BiConsumer<String, ICoordinator> operation2 =
+            (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+                                                                            ConsistencyLevel.QUORUM));
+        consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+        consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+    }
+
+    /**
+     * Tests that if a write timeouts and a following CAS succeed but does not apply in a way that indicate the write
+     * has not applied, then following serial reads do no see that write, even if some nodes still have the write in
+     * their paxos state.
+     *
+     * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+     */
+    @Test
+    public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout() throws IOException
+    {
+        // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits.
+        // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test.
+        BiConsumer<String, ICoordinator> operation1 =
+            (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+                                                                            ConsistencyLevel.ANY));
+        BiConsumer<String, ICoordinator> operation2 =
+            (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+                                                                   ConsistencyLevel.SERIAL));
+        consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+        consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+    }
+
+    // TODO: this shoud probably be moved into the dtest API.
+    private void assertCasNotApplied(Object[][] resultSet)
+    {
+        assertFalse("Expected a CAS resultSet (with at least application result) but got an empty one.",
+                    resultSet.length == 0);
+        assertFalse("Invalid empty first row in CAS resultSet.", resultSet[0].length == 0);
+        Object wasApplied = resultSet[0][0];
+        assertTrue("Expected 1st column of CAS resultSet to be a boolean, but got a " + wasApplied.getClass(),
+                   wasApplied instanceof Boolean);
+        assertFalse("Expected CAS to not be applied, but was applied.", (Boolean)wasApplied);
+    }
+
+    /**
+     * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful
+     * conflicting with another successful write performed by a node that did witness the range movement
+     * Prepare, Propose and Commit A to {1, 2}
+     * Range moves to {2, 3, 4}
+     * Prepare and Propose B (=> !A) to {3, 4}
+     */
+    @Ignore
+    @Test
+    public void testSuccessfulWriteBeforeRangeMovement() throws Throwable
+    {
+        try (Cluster cluster = Cluster.create(4, config -> config
+                .set("write_request_timeout_in_ms", 200L)
+                .set("cas_contention_timeout_in_ms", 200L)))
+        {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+            // make it so {1} is unaware (yet) that {4} is an owner of the token
+            cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
+            int pk = pk(cluster, 1, 2);
+
+            // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} => {1}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+            assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(true));
+
+            for (int i = 1 ; i <= 3 ; ++i)
+                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+            // {4} reads from !{2} => {3, 4}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+            assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(false, pk, 1, 1, null));
+        }
+    }
+
+    /**
+     * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful
+     * conflicting with another successful write performed by a node that did witness the range movement
+     *  - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X)
+     *  -  X: Prepare, Propose and Commit A to {3, 4}
+     *  - !X: Prepare and Propose B (=>!A) to {1, 2}
+     */
+    @Ignore
+    @Test
+    public void testConflictingWritesWithStaleRingInformation() throws Throwable
+    {
+        try (Cluster cluster = Cluster.create(4, config -> config
+                .set("write_request_timeout_in_ms", 200L)
+                .set("cas_contention_timeout_in_ms", 200L)))
+        {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+            // make it so {1} is unaware (yet) that {4} is an owner of the token
+            cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
+            // {4} promises, accepts and commits on !{2} => {3, 4}
+            int pk = pk(cluster, 1, 2);
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop();
+            assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(true));
+
+            // {1} promises, accepts and commmits on !{3} => {1, 2}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop();
+            assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(false, pk, 1, 1, null));
+        }
+    }
+
+    /**
+     * Successful write during range movement, not witnessed by read after range movement.
+     * Very similar to {@link #testConflictingWritesWithStaleRingInformation}.
+     *
+     *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+     *  -  !X: Prepare and Propose to {1, 2}
+     *  - Range movement witnessed by !X
+     *  - Any: Prepare and Read from {3, 4}
+     */
+    @Ignore
+    @Test
+    public void testSucccessfulWriteDuringRangeMovementFollowedByRead() throws Throwable
+    {
+        try (Cluster cluster = Cluster.create(4, config -> config
+                .set("write_request_timeout_in_ms", 200L)
+                .set("cas_contention_timeout_in_ms", 200L)))
+        {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+            int pk = pk(cluster, 1, 2);
+
+            // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3} => {1}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+            assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(true));
+
+            // finish topology change
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+            // {3} reads from !{2} => {3, 4}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+            assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                    row(pk, 1, 1));
+        }
+    }
+
+    /**
+     * Successful write during range movement not witnessed by write after range movement
+     *
+     *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+     *  -  !X: Prepare and Propose to {1, 2}
+     *  - Range movement witnessed by !X
+     *  - Any: Prepare and Propose to {3, 4}
+     */
+    @Ignore
+    @Test
+    public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting() throws Throwable
+    {
+        try (Cluster cluster = Cluster.create(4, config -> config
+                .set("write_request_timeout_in_ms", 200L)
+                .set("cas_contention_timeout_in_ms", 200L)))
+        {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+            // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+            int pk = pk(cluster, 1, 2);
+
+            // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+            assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(true));
+
+            // finish topology change
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+            // {3} reads from !{2} => {3, 4}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+            assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(false, pk, 1, 1, null));
+
+            // TODO: repair and verify base table state
+        }
+    }
+
+    /**
+     * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation
+     * being performed with stale ring information.
+     * This is a particular special case of stale ring information sequencing, which probably would be resolved
+     * by fixing each of the more isolated cases (but is unique, so deserving of its own test case).
+     * See CASSANDRA-15745
+     *
+     *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+     *  -   X: Prepare to {2, 3, 4}
+     *  -   X: Propose to {4}
+     *  -  !X: Prepare and Propose to {1, 2}
+     *  - Range move visible by !X
+     *  - Any: Prepare and Read from {3, 4}
+     */
+    @Ignore
+    @Test
+    public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead() throws Throwable
+    {
+        try (Cluster cluster = Cluster.create(4, config -> config
+                .set("write_request_timeout_in_ms", 200L)
+                .set("cas_contention_timeout_in_ms", 200L)))
+        {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+            // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+            int pk = pk(cluster, 1, 2);
+
+            // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop();
+            try
+            {
+                cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk);
+                Assert.assertTrue(false);
+            }
+            catch (RuntimeException wrapped)
+            {
+                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+            }
+
+            // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+            assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(true));
+
+            // finish topology change
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+            // {3} reads from !{2} => {3, 4}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+            assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                    row(pk, 1, null, 2));
+        }
+    }
+
+    /**
+     * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation
+     * being performed with stale ring information.
+     * This is a particular special case of stale ring information sequencing, which probably would be resolved
+     * by fixing each of the more isolated cases (but is unique, so deserving of its own test case).
+     * See CASSANDRA-15745
+     *
+     *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+     *  -   X: Prepare to {2, 3, 4}
+     *  -   X: Propose to {4}
+     *  -  !X: Prepare and Propose to {1, 2}
+     *  - Range move visible by !X
+     *  - Any: Prepare and Propose to {3, 4}
+     */
+    @Ignore
+    @Test
+    public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite() throws Throwable
+    {
+        try (Cluster cluster = Cluster.create(4, config -> config
+                .set("write_request_timeout_in_ms", 200L)
+                .set("cas_contention_timeout_in_ms", 200L)))
+        {
+            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+            // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+            cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+            int pk = pk(cluster, 1, 2);
+
+            // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop();
+            try
+            {
+                cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk);
+                Assert.assertTrue(false);
+            }
+            catch (RuntimeException wrapped)
+            {
+                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+            }
+
+            // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+            assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(true));
+
+            // finish topology change
+            for (int i = 1 ; i <= 4 ; ++i)
+                cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+            // {3} reads from !{2} => {3, 4}
+            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
+            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+            assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                    row(false, 5, 1, null, 2));
+        }
+    }
+
+    private static int pk(Cluster cluster, int lb, int ub)
+    {
+        return pk(cluster.get(lb), cluster.get(ub));
+    }
+
+    private static int pk(IInstance lb, IInstance ub)
+    {
+        return pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")),
+                Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token")));
+    }
+
+    private static int pk(Token lb, Token ub)
+    {
+        int pk = 0;
+        Token pkt;
+        while (lb.compareTo(pkt = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || ub.compareTo(pkt) < 0)
+            ++pk;
+        return pk;
+    }
+
+    private static void debugOwnership(Cluster cluster, int pk)
+    {
+        for (int i = 1 ; i <= cluster.size() ; ++i)
+            System.out.println(i + ": " + cluster.get(i).appliesOnInstance((Integer v) -> StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE, Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v))))
+                    .apply(pk));
+    }
+
+    private static void debugPaxosState(Cluster cluster, int pk)
+    {
+        UUID cfid = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId);
+        for (int i = 1 ; i <= cluster.size() ; ++i)
+            for (Object[] row : cluster.get(i).executeInternal("select in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid))
+                System.out.println(i + ": " + (row[0] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[2])));
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org