You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2020/11/27 16:10:37 UTC

[cassandra] branch trunk updated (c26269b -> 9a3ca00)

This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from c26269b  ninja fixing wrongly committed circle-ci
     new 2d0b168  Fix serial read/non-applying CAS linearizability
     new 080280d  Merge commit '2d0b16804785660e8515aca9944784fb3733c619' into cassandra-3.11
     new 9a3ca00  Merge commit '080280dc0177da6176dd4ba970e5a35aa7e2a729' into trunk

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   8 +
 .../org/apache/cassandra/service/StorageProxy.java | 352 +++++++----
 .../org/apache/cassandra/service/paxos/Commit.java |   6 +
 .../cassandra/service/paxos/PrepareCallback.java   |  12 +-
 .../cassandra/distributed/impl/Instance.java       |  83 +++
 .../apache/cassandra/distributed/test/CASTest.java | 688 +++++++++++++++++++++
 7 files changed, 1034 insertions(+), 116 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/CASTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge commit '080280dc0177da6176dd4ba970e5a35aa7e2a729' into trunk

Posted by sl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9a3ca008bad2a7bfa887a8ba2d119fdc4369ba08
Merge: c26269b 080280d
Author: Sylvain Lebresne <le...@gmail.com>
AuthorDate: Fri Nov 27 17:07:15 2020 +0100

    Merge commit '080280dc0177da6176dd4ba970e5a35aa7e2a729' into trunk

 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   8 +
 .../org/apache/cassandra/service/StorageProxy.java | 352 +++++++----
 .../org/apache/cassandra/service/paxos/Commit.java |   6 +
 .../cassandra/service/paxos/PrepareCallback.java   |  12 +-
 .../cassandra/distributed/impl/Instance.java       |  83 +++
 .../apache/cassandra/distributed/test/CASTest.java | 688 +++++++++++++++++++++
 7 files changed, 1034 insertions(+), 116 deletions(-)

diff --cc CHANGES.txt
index 92094dc,c3c5f02..56843d7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,8 +1,21 @@@
 -3.11.10
 - * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161)
 +4.0-beta4
 + * Update jctools dependency to 3.1.0 (CASSANDRA-16255)
 + * 'SSLEngine closed already' exception on failed outbound connection (CASSANDRA-16277)
 + * Drain and/or shutdown might throw because of slow messaging service shutdown (CASSANDRA-16276)
 + * Upgrade JNA to 5.6.0, dropping support for <=glibc-2.6 systems (CASSANDRA-16212)
 + * Add saved Host IDs to TokenMetadata at startup (CASSANDRA-16246)
 + * Ensure that CacheMetrics.requests is picked up by the metric reporter (CASSANDRA-16228)
 + * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019)
 + * Produce consistent tombstone for reads to avoid digest mistmatch (CASSANDRA-15369)
 + * Fix SSTableloader issue when restoring a table named backups (CASSANDRA-16235)
 + * Invalid serialized size for responses caused by increasing message time by 1ms which caused extra bytes in size calculation (CASSANDRA-16103)
 + * Throw BufferOverflowException from DataOutputBuffer for better visibility (CASSANDRA-16214)
 + * TLS connections to the storage port on a node without server encryption configured causes java.io.IOException accessing missing keystore (CASSANDRA-16144)
 + * Internode messaging catches OOMs and does not rethrow (CASSANDRA-15214)
 +Merged from 3.11:
   * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071)
  Merged from 3.0:
+  * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
   * Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294)
   * Improved check of num_tokens against the length of initial_token (CASSANDRA-14477)
   * Fix a race condition on ColumnFamilyStore and TableMetrics (CASSANDRA-16228)
diff --cc NEWS.txt
index 9774d00,c5a3439..d02f2f0
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -33,256 -42,21 +33,264 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +4.0
 +===
 +
 +New features
 +------------
 +    - Nodes will now bootstrap all intra-cluster connections at startup by default and wait
 +      10 seconds for the all but one node in the local data center to be connected and marked
 +      UP in gossip. This prevents nodes from coordinating requests and failing because they
 +      aren't able to connect to the cluster fast enough. block_for_peers_timeout_in_secs in
 +      cassandra.yaml can be used to configure how long to wait (or whether to wait at all)
 +      and block_for_peers_in_remote_dcs can be used to also block on all but one node in
 +      each remote DC as well. See CASSANDRA-14297 and CASSANDRA-13993 for more information.
 +    - *Experimental* support for Transient Replication and Cheap Quorums introduced by CASSANDRA-14404
 +      The intended audience for this functionality is expert users of Cassandra who are prepared
 +      to validate every aspect of the database for their application and deployment practices. Future
 +      releases of Cassandra will make this feature suitable for a wider audience.
 +    - *Experimental* support for Java 11 has been added. JVM options that differ between or are
 +      specific for Java 8 and 11 have been moved from jvm.options into jvm8.options and jvm11.options.
 +      IMPORTANT: Running C* on Java 11 is *experimental* and do it at your own risk.
 +    - LCS now respects the max_threshold parameter when compacting - this was hard coded to 32
 +      before, but now it is possible to do bigger compactions when compacting from L0 to L1.
 +      This also applies to STCS-compactions in L0 - if there are more than 32 sstables in L0
 +      we will compact at most max_threshold sstables in an L0 STCS compaction. See CASSANDRA-14388
 +      for more information.
 +    - There is now an option to automatically upgrade sstables after Cassandra upgrade, enable
 +      either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during runtime. See
 +      CASSANDRA-14197.
 +    - `nodetool refresh` has been deprecated in favour of `nodetool import` - see CASSANDRA-6719
 +      for details
 +    - An experimental option to compare all merkle trees together has been added - for example, in
 +      a 3 node cluster with 2 replicas identical and 1 out-of-date, with this option enabled, the
 +      out-of-date replica will only stream a single copy from up-to-date replica. Enable it by adding
 +      "-os" to nodetool repair. See CASSANDRA-3200.
 +    - The currentTimestamp, currentDate, currentTime and currentTimeUUID functions have been added.
 +      See CASSANDRA-13132
 +    - Support for arithmetic operations between `timestamp`/`date` and `duration` has been added.
 +      See CASSANDRA-11936
 +    - Support for arithmetic operations on number has been added. See CASSANDRA-11935
 +    - Preview expected streaming required for a repair (nodetool repair --preview), and validate the
 +      consistency of repaired data between nodes (nodetool repair --validate). See CASSANDRA-13257
 +    - Support for selecting Map values and Set elements has been added for SELECT queries. See CASSANDRA-7396
 +    - Change-Data-Capture has been modified to make CommitLogSegments available
 +      immediately upon creation via hard-linking the files. This means that incomplete
 +      segments will be available in cdc_raw rather than fully flushed. See documentation
 +      and CASSANDRA-12148 for more detail.
 +    - The initial build of materialized views can be parallelized. The number of concurrent builder
 +      threads is specified by the property `cassandra.yaml:concurrent_materialized_view_builders`.
 +      This property can be modified at runtime through both JMX and the new `setconcurrentviewbuilders`
 +      and `getconcurrentviewbuilders` nodetool commands. See CASSANDRA-12245 for more details.
 +    - There is now a binary full query log based on Chronicle Queue that can be controlled using
 +      nodetool enablefullquerylog, disablefullquerylog, and resetfullquerylog. The log
 +      contains all queries invoked, approximate time they were invoked, any parameters necessary
 +      to bind wildcard values, and all query options. A human readable version of the log can be
 +      dumped or tailed using the new bin/fqltool utility. The full query log is designed to be safe
 +      to use in production and limits utilization of heap memory and disk space with limits
 +      you can specify when enabling the log.
 +      See nodetool and fqltool help text for more information.
 +    - SSTableDump now supports the -l option to output each partition as it's own json object
 +      See CASSANDRA-13848 for more detail
 +    - Metric for coordinator writes per table has been added. See CASSANDRA-14232
 +    - Nodetool cfstats now has options to sort by various metrics as well as limit results.
 +    - Operators can restrict login user activity to one or more datacenters. See `network_authorizer`
 +      in cassandra.yaml, and the docs for create and alter role statements. CASSANDRA-13985
 +    - Roles altered from login=true to login=false will prevent existing connections from executing any
 +      statements after the cache has been refreshed. CASSANDRA-13985
 +    - Support for audit logging of database activity. If enabled, logs every incoming
 +      CQL command request, Authentication (successful as well as unsuccessful login) to a node.
 +    - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, Cassandra will use stream
 +      entire SSTables, significantly speeding up transfers. Any streaming related operations will see
 +      corresponding improvement. See CASSANDRA-14556.
 +    - NetworkTopologyStrategy now supports auto-expanding the replication_factor
 +      option into all available datacenters at CREATE or ALTER time. For example,
 +      specifying replication_factor: 3 translates to three replicas in every
 +      datacenter. This auto-expansion will _only add_ datacenters for safety.
 +      See CASSANDRA-14303 for more details.
 +    - Added Python 3 support so cqlsh and cqlshlib is now compatible with Python 2.7 and Python 3.6.
 +      Added --python option to cqlsh so users can specify the path to their chosen Python interpreter.
 +      See CASSANDRA-10190 for details.
 +    - Support for server side DESCRIBE statements has been added. See CASSANDRA-14825
 +    - It is now possible to rate limit snapshot creation/clearing. See CASSANDRA-13019
 +
 +Upgrading
 +---------
 +    - Cassandra removed support for the OldNetworkTopologyStrategy. Before upgrading you will need to change the 
 +      replication strategy for the keyspaces using this strategy to the NetworkTopologyStrategy. (CASSANDRA-13990)
 +    - Sstables for tables using with a frozen UDT written by C* 3.0 appear as corrupted.
 +
 +      Background: The serialization-header in the -Statistics.db sstable component contains the type information
 +      of the table columns. C* 3.0 write incorrect type information for frozen UDTs by omitting the
 +      "frozen" information. Non-frozen UDTs were introduced by CASSANDRA-7423 in C* 3.6. Since then, the missing
 +      "frozen" information leads to deserialization issues that result in CorruptSSTableExceptions, potentially other
 +      exceptions as well.
 +
 +      As a mitigation, the sstable serialization-headers are rewritten to contain the missing "frozen" information for
 +      UDTs once, when an upgrade from C* 3.0 is detected. This migration does not touch snapshots or backups.
 +
 +      The sstablescrub tool now performs a check of the sstable serialization-header against the schema. A mismatch of
 +      the types in the serialization-header and the schema will cause sstablescrub to error out and stop by default.
 +      See the new `-e` option. `-e off` disables the new validation code. `-e fix` or `-e fix-only`, e.g.
 +      `sstablescrub -e fix keyspace table`, will validate the serialization-header, rewrite the non-frozen UDTs
 +      in the serialzation-header to frozen UDTs, if that matches the schema, and continue with scrub.
 +      See `sstablescrub -h`.
 +      (CASSANDRA-15035)
 +    - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd tables from
 +      64kb to 16kb. For highly compressible data this can have a noticeable impact
 +      on space utilization. You may want to consider manually specifying this value.
 +    - Additional columns have been added to system_distributed.repair_history,
 +      system_traces.sessions and system_traces.events. As a result select queries
 +      against these tables - including queries against tracing tables performed
 +      automatically by the drivers and cqlsh - will fail and generate an error in the log
 +      during upgrade when the cluster is mixed version. On 3.x side this will also lead
 +      to broken internode connections and lost messages.
 +      Cassandra versions 3.0.20 and 3.11.6 pre-add these columns (see CASSANDRA-15385),
 +      so please make sure to upgrade to those versions or higher before upgrading to
 +      4.0 for query tracing to not cause any issues during the upgrade to 4.0.
 +    - Timestamp ties between values resolve differently: if either value has a TTL,
 +      this value always wins. This is to provide consistent reconciliation before
 +      and after the value expires into a tombstone.
 +    - Support for legacy auth tables in the system_auth keyspace (users,
 +      permissions, credentials) and the migration code has been removed. Migration
 +      of these legacy auth tables must have been completed before the upgrade to
 +      4.0 and the legacy tables must have been removed. See the 'Upgrading' section
 +      for version 2.2 for migration instructions.
 +    - Cassandra 4.0 removed support for the deprecated Thrift interface. Amongst
 +      other things, this implies the removal of all yaml options related to thrift
 +      ('start_rpc', rpc_port, ...).
 +    - Cassandra 4.0 removed support for any pre-3.0 format. This means you
 +      cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade to
 +      a 3.0.x/3.x version first (and run upgradesstable). In particular, this
 +      mean Cassandra 4.0 cannot load or read pre-3.0 sstables in any way: you
 +      will need to upgrade those sstable in 3.0.x/3.x first.
 +    - Upgrades from 3.0.x or 3.x are supported since 3.0.13 or 3.11.0, previous
 +      versions will causes issues during rolling upgrades (CASSANDRA-13274).
 +    - Cassandra will no longer allow invalid keyspace replication options, such
 +      as invalid datacenter names for NetworkTopologyStrategy. Operators MUST
 +      add new nodes to a datacenter before they can set set ALTER or CREATE
 +      keyspace replication policies using that datacenter. Existing keyspaces
 +      will continue to operate, but CREATE and ALTER will validate that all
 +      datacenters specified exist in the cluster.
 +    - Cassandra 4.0 fixes a problem with incremental repair which caused repaired
 +      data to be inconsistent between nodes. The fix changes the behavior of both
 +      full and incremental repairs. For full repairs, data is no longer marked
 +      repaired. For incremental repairs, anticompaction is run at the beginning
 +      of the repair, instead of at the end. If incremental repair was being used
 +      prior to upgrading, a full repair should be run after upgrading to resolve
 +      any inconsistencies.
 +    - Config option index_interval has been removed (it was deprecated since 2.0)
 +    - Deprecated repair JMX APIs are removed.
 +    - The version of snappy-java has been upgraded to 1.1.2.6
 +    - the miniumum value for internode message timeouts is 10ms. Previously, any
 +      positive value was allowed. See cassandra.yaml entries like
 +      read_request_timeout_in_ms for more details.
 +    - Cassandra 4.0 allows a single port to be used for both secure and insecure
 +      connections between cassandra nodes (CASSANDRA-10404). See the yaml for
 +      specific property changes, and see the security doc for full details.
 +    - Due to the parallelization of the initial build of materialized views,
 +      the per token range view building status is stored in the new table
 +      `system.view_builds_in_progress`. The old table `system.views_builds_in_progress`
 +      is no longer used and can be removed. See CASSANDRA-12245 for more details.
 +    - Config option commitlog_sync_batch_window_in_ms has been deprecated as it's
 +      documentation has been incorrect and the setting itself near useless.
 +      Batch mode remains a valid commit log mode, however.
 +    - There is a new commit log mode, group, which is similar to batch mode
 +      but blocks for up to a configurable number of milliseconds between disk flushes.
 +    - nodetool clearsnapshot now required the --all flag to remove all snapshots.
 +      Previous behavior would delete all snapshots by default.
 +    - Nodes are now identified by a combination of IP, and storage port.
 +      Existing JMX APIs, nodetool, and system tables continue to work
 +      and accept/return just an IP, but there is a new
 +      version of each that works with the full unambiguous identifier.
 +      You should prefer these over the deprecated ambiguous versions that only
 +      work with an IP. This was done to support multiple instances per IP.
 +      Additionally we are moving to only using a single port for encrypted and
 +      unencrypted traffic and if you want multiple instances per IP you must
 +      first switch encrypted traffic to the storage port and not a separate
 +      encrypted port. If you want to use multiple instances per IP
 +      with SSL you will need to use StartTLS on storage_port and set
 +      outgoing_encrypted_port_source to gossip outbound connections
 +      know what port to connect to for each instance. Before changing
 +      storage port or native port at nodes you must first upgrade the entire cluster
 +      and clients to 4.0 so they can handle the port not being consistent across
 +      the cluster.
 +    - Names of AWS regions/availability zones have been cleaned up to more correctly
 +      match the Amazon names. There is now a new option in conf/cassandra-rackdc.properties
 +      that lets users enable the correct names for new clusters, or use the legacy
 +      names for existing clusters. See conf/cassandra-rackdc.properties for details.
 +    - Background repair has been removed. dclocal_read_repair_chance and
 +      read_repair_chance table options have been removed and are now rejected.
 +      See CASSANDRA-13910 for details.
 +    - Internode TCP connections that do not ack segments for 30s will now
 +      be automatically detected and closed via the Linux TCP_USER_TIMEOUT
 +      socket option. This should be exceedingly rare, but AWS networks (and
 +      other stateful firewalls) apparently suffer from this issue. You can
 +      tune the timeouts on TCP connection and segment ack via the
 +      `cassandra.yaml:internode_tcp_connect_timeout_in_ms` and
 +      `cassandra.yaml:internode_tcp_user_timeout_in_ms` options respectively.
 +      See CASSANDRA-14358 for details.
 +    - repair_session_space_in_mb setting has been added to cassandra.yaml to allow operators to reduce
 +      merkle tree size if repair is creating too much heap pressure. The repair_session_max_tree_depth
 +      setting added in 3.0.19 and 3.11.5 is deprecated in favor of this setting. See CASSANDRA-14096
 +    - The flags 'enable_materialized_views' and 'enable_sasi_indexes' in cassandra.yaml
 +      have been set as false by default. Operators should modify them to allow the
 +      creation of new views and SASI indexes, the existing ones will continue working.
 +      See CASSANDRA-14866 for details.
 +    - CASSANDRA-15216 - The flag 'cross_node_timeout' has been set as true by default.
 +      This change is done under the assumption that users have setup NTP on
 +      their clusters or otherwise synchronize their clocks, and that clocks are
 +      mostly in sync, since this is a requirement for general correctness of
 +      last write wins.
 +    - CASSANDRA-15257 removed the joda time dependency.  Any time formats
 +      passed will now need to conform to java.time.format.DateTimeFormatter.
 +      Most notably, days and months must be two digits, and years exceeding
 +      four digits need to be prefixed with a plus or minus sign.
 +    - cqlsh now returns a non-zero code in case of errors. This is a backward incompatible change so it may
 +      break existing scripts that rely on the current behavior. See CASSANDRA-15623 for more details.
 +    - Updated the default compaction_throughput_mb_per_sec to to 64. The original
 +      default (16) was meant for spinning disk volumes.  See CASSANDRA-14902 for details.
 +    - Custom compaction strategies must now handle getting sstables added/removed notifications for
 +      sstables already added/removed - see CASSANDRA-14103 for details.
 +    - Support for JNA with glibc 2.6 and earlier has been removed. Centos 5, Debian 4, and Ubuntu 7.10 operating systems
 +      must be first upgraded. See CASSANDRA-16212 for more.
 +    - In cassandra.yaml, when using vnodes num_tokens must be defined if initial_token is defined.
 +      If it is not defined, or not equal to the numbers of tokens defined in initial_tokens,
 +      the node will not start. See CASSANDRA-14477 for details.
 +
 +
 +Deprecation
 +-----------
 +
 +    - The JMX MBean org.apache.cassandra.db:type=BlacklistedDirectories has been
 +      deprecated in favor of org.apache.cassandra.db:type=DisallowedDirectories
 +      and will be removed in a subsequent major version.
 +
 +
 +Materialized Views
 +-------------------
 +    - Following a discussion regarding concerns about the design and safety of Materialized Views, the C* development
 +      community no longer recommends them for production use, and considers them experimental. Warnings messages will
 +      now be logged when they are created. (See https://www.mail-archive.com/dev@cassandra.apache.org/msg11511.html)
 +    - An 'enable_materialized_views' flag has been added to cassandra.yaml to allow operators to prevent creation of
 +      views
 +    - CREATE MATERIALIZED VIEW syntax has become stricter. Partition key columns are no longer implicitly considered
 +      to be NOT NULL, and no base primary key columns get automatically included in view definition. You have to
 +      specify them explicitly now.
 +
  3.11.10
 -=====
 +======
 +
  Upgrading
  ---------
+     - This release fix a correctness issue with SERIAL reads, and LWT writes that do not apply.
+       Unfortunately, this fix has a performance impact on read performance at the SERIAL or
+       LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads, the performance
+       impact may be noticeable and may also result in an increased of timeouts. For that
+       reason, a opt-in system property has been added to disable the fix:
+         -Dcassandra.unsafe.disable-serial-reads-linearizability=true
+       Use this flag at your own risk as it revert SERIAL reads to the incorrect behavior of
 -      previous versions. See CASSANDRA-12126 for details.
 -    - In cassandra.yaml, when using vnodes num_tokens must be defined if initial_token is defined.
 -      If it is not defined, or not equal to the numbers of tokens defined in initial_tokens,
 -      the node will not start. See CASSANDRA-14477 for details.
++      previous versions. See CASSANDRA-12126 for details. 
      - SASI's `max_compaction_flush_memory_in_mb` setting was previously getting interpreted in bytes. From 3.11.8
        it is correctly interpreted in megabytes, but prior to 3.11.10 previous configurations of this setting will
        lead to nodes OOM during compaction. From 3.11.10 previous configurations will be detected as incorrect,
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index c3715dd,d6f713e..df3a6f5
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -17,40 -17,24 +17,40 @@@
   */
  package org.apache.cassandra.service;
  
 -import java.io.IOException;
 -import java.net.InetAddress;
  import java.nio.ByteBuffer;
- import java.nio.file.Paths;
 -import java.util.*;
 -import java.util.concurrent.*;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.concurrent.atomic.AtomicLong;
+ import java.util.function.Supplier;
  
 -import com.google.common.base.Predicate;
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
  import com.google.common.cache.CacheLoader;
 -import com.google.common.collect.*;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.PeekingIterator;
  import com.google.common.primitives.Ints;
  import com.google.common.util.concurrent.Uninterruptibles;
 -
  import org.apache.commons.lang3.StringUtils;
 -
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -241,10 -177,18 +245,20 @@@ public class StorageProxy implements St
          for(ConsistencyLevel level : ConsistencyLevel.values())
          {
              readMetricsMap.put(level, new ClientRequestMetrics("Read-" + level.name()));
 -            writeMetricsMap.put(level, new ClientRequestMetrics("Write-" + level.name()));
 +            writeMetricsMap.put(level, new ClientWriteRequestMetrics("Write-" + level.name()));
          }
  
 +        ReadRepairMetrics.init();
++
+         if (disableSerialReadLinearizability)
+         {
+             logger.warn("This node was started with -D{}. SERIAL (and LOCAL_SERIAL) reads coordinated by this node " +
+                         "will not offer linearizability (see CASSANDRA-12126 for details on what this mean) with " +
+                         "respect to other SERIAL operations. Please note that, with this flag, SERIAL reads will be " +
+                         "slower than QUORUM reads, yet offer no more guarantee. This flag should only be used in " +
+                         "the restricted case of upgrading from a pre-CASSANDRA-12126 version, and only if you " +
+                         "understand the tradeoff.", DISABLE_SERIAL_READ_LINEARIZABILITY_KEY);
+         }
      }
  
      /**
@@@ -295,31 -239,19 +309,20 @@@
                                    ConsistencyLevel consistencyForPaxos,
                                    ConsistencyLevel consistencyForCommit,
                                    ClientState state,
 +                                  int nowInSeconds,
                                    long queryStartNanoTime)
 -    throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
 +    throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException, CasWriteUnknownResultException
      {
          final long startTimeForMetrics = System.nanoTime();
-         TableMetadata metadata = Schema.instance.getTableMetadata(keyspaceName, cfName);
-         int contentions = 0;
          try
          {
-             consistencyForPaxos.validateForCas();
-             consistencyForCommit.validateForCasCommit(keyspaceName);
 -            CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
++            TableMetadata metadata = Schema.instance.validateTable(keyspaceName, cfName);
  
-             long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
-             while (System.nanoTime() - queryStartNanoTime < timeoutNanos)
+             Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () ->
              {
-                 // for simplicity, we'll do a single liveness check at the start of each attempt
-                 ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
- 
-                 final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, consistencyForPaxos, consistencyForCommit, true, state);
-                 final UUID ballot = pair.ballot;
-                 contentions += pair.contentions;
- 
                  // read the current values and check they validate the conditions
                  Tracing.trace("Reading existing values for CAS precondition");
 -                SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
 +                SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(nowInSeconds);
                  ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
  
                  FilteredPartition current;
@@@ -332,17 -264,12 +335,16 @@@
                  {
                      Tracing.trace("CAS precondition does not match current values {}", current);
                      casWriteMetrics.conditionNotMet.inc();
-                     return current.rowIterator();
+                     return Pair.create(PartitionUpdate.emptyUpdate(metadata, key), current.rowIterator());
                  }
  
-                 // finish the paxos round w/ the desired updates
-                 // TODO turn null updates into delete?
+                 // Create the desired updates
                  PartitionUpdate updates = request.makeUpdates(current);
  
 +                long size = updates.dataSize();
 +                casWriteMetrics.mutationSize.update(size);
 +                writeMetricsMap.get(consistencyForPaxos).mutationSize.update(size);
 +
                  // Apply triggers to cas updates. A consideration here is that
                  // triggers emit Mutations, and so a given trigger implementation
                  // may generate mutations for partitions other than the one this
@@@ -352,36 -279,21 +354,32 @@@
                  // InvalidRequestException) any which aren't.
                  updates = TriggerExecutor.instance.execute(updates);
  
+                 return Pair.create(updates, null);
+             };
  
-                 Commit proposal = Commit.newProposal(ballot, updates);
-                 Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
-                 if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime))
-                 {
-                     commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
-                     Tracing.trace("CAS successful");
-                     return null;
-                 }
- 
-                 Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
-                 contentions++;
-                 Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
-                 // continue to retry
-             }
+             return doPaxos(metadata,
+                            key,
+                            consistencyForPaxos,
+                            consistencyForCommit,
+                            consistencyForCommit,
+                            state,
+                            queryStartNanoTime,
+                            casWriteMetrics,
+                            updateProposer);
  
-             throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
          }
 -        catch (WriteTimeoutException | ReadTimeoutException e)
 +        catch (CasWriteUnknownResultException e)
 +        {
 +            casWriteMetrics.unknownResult.mark();
 +            throw e;
 +        }
-         catch (WriteTimeoutException wte)
++        catch (CasWriteTimeoutException wte)
 +        {
 +            casWriteMetrics.timeouts.mark();
 +            writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
-             throw new CasWriteTimeoutException(wte.writeType, wte.consistency, wte.received, wte.blockFor, contentions);
++            throw new CasWriteTimeoutException(wte.writeType, wte.consistency, wte.received, wte.blockFor, wte.contentions);
 +        }
 +        catch (ReadTimeoutException e)
          {
              casWriteMetrics.timeouts.mark();
              writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
@@@ -409,10 -319,154 +405,131 @@@
          }
      }
  
-     private static void recordCasContention(int contentions)
 -    private static void recordCasContention(CASClientRequestMetrics casMetrics, int contentions)
++    private static void recordCasContention(TableMetadata table,
++                                            DecoratedKey key,
++                                            CASClientRequestMetrics casMetrics,
++                                            int contentions)
      {
--        if(contentions > 0)
-             casWriteMetrics.contention.update(contentions);
 -            casMetrics.contention.update(contentions);
++        if (contentions == 0)
++            return;
++
++        casMetrics.contention.update(contentions);
++        Keyspace.open(table.keyspace)
++                .getColumnFamilyStore(table.name)
++                .metric
++                .topCasPartitionContention
++                .addSample(key.getKey(), contentions);
+     }
+ 
+     /**
+      * Performs the Paxos rounds for a given proposal, retrying when preempted until the timeout.
+      *
+      * <p>The main 'configurable' of this method is the {@code createUpdateProposal} method: it is called by the method
+      * once a ballot has been successfully 'prepared' to generate the update to 'propose' (and commit if the proposal is
+      * successful). That method also generates the result that the whole method will return. Note that due to retrying,
+      * this method may be called multiple times and does not have to return the same results.
+      *
+      * @param metadata the table to update with Paxos.
+      * @param key the partition updated.
+      * @param consistencyForPaxos the serial consistency of the operation (either {@link ConsistencyLevel#SERIAL} or
+      *     {@link ConsistencyLevel#LOCAL_SERIAL}).
+      * @param consistencyForReplayCommits the consistency for the commit phase of "replayed" in-progress operations.
+      * @param consistencyForCommit the consistency for the commit phase of _this_ operation update.
+      * @param state the client state.
+      * @param queryStartNanoTime the nano time for the start of the query this is part of. This is the base time for
+      *     timeouts.
+      * @param casMetrics the metrics to update for this operation.
+      * @param createUpdateProposal method called after a successful 'prepare' phase to obtain 1) the actual update of
+      *     this operation and 2) the result that the whole method should return. This can return {@code null} in the
+      *     special where, after having "prepared" (and thus potentially replayed in-progress upgdates), we don't want
+      *     to propose anything (the whole method then return {@code null}).
+      * @return the second element of the pair returned by {@code createUpdateProposal} (for the last call of that method
+      *     if that method is called multiple times due to retries).
+      */
 -    private static RowIterator doPaxos(CFMetaData metadata,
++    private static RowIterator doPaxos(TableMetadata metadata,
+                                        DecoratedKey key,
+                                        ConsistencyLevel consistencyForPaxos,
+                                        ConsistencyLevel consistencyForReplayCommits,
+                                        ConsistencyLevel consistencyForCommit,
+                                        ClientState state,
+                                        long queryStartNanoTime,
+                                        CASClientRequestMetrics casMetrics,
+                                        Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal)
+     throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
+     {
+         int contentions = 0;
++        Keyspace keyspace = Keyspace.open(metadata.keyspace);
+         try
+         {
+             consistencyForPaxos.validateForCas();
 -            consistencyForReplayCommits.validateForCasCommit(metadata.ksName);
 -            consistencyForCommit.validateForCasCommit(metadata.ksName);
++            consistencyForReplayCommits.validateForCasCommit(metadata.keyspace);
++            consistencyForCommit.validateForCasCommit(metadata.keyspace);
+ 
 -            long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
 -            while (System.nanoTime() - queryStartNanoTime < timeout)
++            long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
++            while (System.nanoTime() - queryStartNanoTime < timeoutNanos)
+             {
+                 // for simplicity, we'll do a single liveness check at the start of each attempt
 -                Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
 -                List<InetAddress> liveEndpoints = p.left;
 -                int requiredParticipants = p.right;
 -
 -                final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime,
 -                                                                     key,
 -                                                                     metadata,
 -                                                                     liveEndpoints,
 -                                                                     requiredParticipants,
 -                                                                     consistencyForPaxos,
 -                                                                     consistencyForReplayCommits,
 -                                                                     casMetrics,
 -                                                                     state);
 -                final UUID ballot = pair.left;
 -                contentions += pair.right;
++                ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(keyspace, key, consistencyForPaxos);
++                PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime,
++                                                                    key,
++                                                                    metadata,
++                                                                    replicaPlan,
++                                                                    consistencyForPaxos,
++                                                                    consistencyForReplayCommits,
++                                                                    casMetrics,
++                                                                    state);
++
++                final UUID ballot = pair.ballot;
++                contentions += pair.contentions;
+ 
+                 Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.get();
+                 // See method javadoc: null here is code for "stop here and return null".
+                 if (proposalPair == null)
+                     return null;
+ 
+                 Commit proposal = Commit.newProposal(ballot, proposalPair.left);
+                 Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
 -                if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
++                if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime))
+                 {
+                     // We skip committing accepted updates when they are empty. This is an optimization which works
+                     // because we also skip replaying those same empty update in beginAndRepairPaxos (see the longer
+                     // comment there). As empty update are somewhat common (serial reads and non-applying CAS propose
+                     // them), this is worth bothering.
+                     if (!proposal.update.isEmpty())
+                         commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
+                     RowIterator result = proposalPair.right;
+                     if (result != null)
+                         Tracing.trace("CAS did not apply");
+                     else
+                         Tracing.trace("CAS applied successfully");
+                     return result;
+                 }
+ 
+                 Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)");
+                 contentions++;
+                 Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS);
+                 // continue to retry
+             }
 -
 -            throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
+         }
 -        finally
++        catch (CasWriteTimeoutException e)
+         {
 -            if(contentions > 0)
 -                casMetrics.contention.update(contentions);
++            // Might be thrown by beginRepairAndPaxos. In that case, any contention that happened within the method and
++            // led up to the timeout was not accounted in our local 'contentions' variable and we add it now so it the
++            // contention recorded in the finally is correct.
++            contentions += e.contentions;
++            throw e;
+         }
 -    }
 -
 -    private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
 -    {
 -        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 -        return new Predicate<InetAddress>()
++        catch (WriteTimeoutException e)
+         {
 -            public boolean apply(InetAddress host)
 -            {
 -                return dc.equals(snitch.getDatacenter(host));
 -            }
 -        };
 -    }
 -
 -    private static Pair<List<InetAddress>, Integer> getPaxosParticipants(CFMetaData cfm, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
 -    {
 -        Token tk = key.getToken();
 -        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(cfm.ksName, tk);
 -        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, cfm.ksName);
 -        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
++            // Might be thrown by proposePaxos or commitPaxos
++            throw new CasWriteTimeoutException(e.writeType, e.consistency, e.received, e.blockFor, contentions);
++        }
++        finally
+         {
 -            // Restrict naturalEndpoints and pendingEndpoints to node in the local DC only
 -            String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 -            Predicate<InetAddress> isLocalDc = sameDCPredicateFor(localDc);
 -            naturalEndpoints = ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc));
 -            pendingEndpoints = ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc));
++            recordCasContention(metadata, key, casMetrics, contentions);
+         }
 -        int participants = pendingEndpoints.size() + naturalEndpoints.size();
 -        int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
 -        List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive));
 -        if (liveEndpoints.size() < requiredParticipants)
 -            throw new UnavailableException(consistencyForPaxos, requiredParticipants, liveEndpoints.size());
 -
 -        // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
 -        // Note that we fake an impossible number of required nodes in the unavailable exception
 -        // to nail home the point that it's an impossible operation no matter how many nodes are live.
 -        if (pendingEndpoints.size() > 1)
 -            throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pendingEndpoints.size()),
 -                                           consistencyForPaxos,
 -                                           participants + 1,
 -                                           liveEndpoints.size());
+ 
 -        return Pair.create(liveEndpoints, requiredParticipants);
++        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(keyspace), contentions);
      }
  
      /**
@@@ -421,17 -475,18 +538,17 @@@
       * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of
       * nodes have seen the mostRecentCommit.  Otherwise, return null.
       */
 -    private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime,
 -                                                           DecoratedKey key,
 -                                                           CFMetaData metadata,
 -                                                           List<InetAddress> liveEndpoints,
 -                                                           int requiredParticipants,
 -                                                           ConsistencyLevel consistencyForPaxos,
 -                                                           ConsistencyLevel consistencyForCommit,
 -                                                           CASClientRequestMetrics casMetrics,
 -                                                           ClientState state)
 +    private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime,
 +                                                                DecoratedKey key,
 +                                                                TableMetadata metadata,
 +                                                                ReplicaPlan.ForPaxosWrite paxosPlan,
 +                                                                ConsistencyLevel consistencyForPaxos,
 +                                                                ConsistencyLevel consistencyForCommit,
-                                                                 final boolean isWrite,
++                                                                CASClientRequestMetrics casMetrics,
 +                                                                ClientState state)
      throws WriteTimeoutException, WriteFailureException
      {
 -        long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
 +        long timeoutNanos = DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
  
          PrepareCallback summary = null;
          int contentions = 0;
@@@ -448,76 -503,89 +565,87 @@@
              UUID ballot = UUIDGen.getRandomTimeUUIDFromMicros(ballotMicros);
  
              // prepare
--            Tracing.trace("Preparing {}", ballot);
--            Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-             summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
 -            summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos, queryStartNanoTime);
--            if (!summary.promised)
++            try
              {
--                Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
--                contentions++;
--                // sleep a random amount to give the other proposer a chance to finish
-                 Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
 -                Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS);
--                continue;
--            }
- 
-             Commit inProgress = summary.mostRecentInProgressCommitWithUpdate;
-             Commit mostRecent = summary.mostRecentCommit;
++                Tracing.trace("Preparing {}", ballot);
++                Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
++                summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
++                if (!summary.promised)
++                {
++                    Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
++                    contentions++;
++                    // sleep a random amount to give the other proposer a chance to finish
++                    Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
++                    continue;
++                }
  
 -            Commit inProgress = summary.mostRecentInProgressCommit;
 -            Commit mostRecent = summary.mostRecentCommit;
 -
--            // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that
--            // needs to be completed, so do it.
 -            // One special case we make is for update that are empty (which are proposed by serial reads and
 -            // non-applying CAS). While we could handle those as any other updates, we can optimize this somewhat by
 -            // neither committing those empty updates, nor replaying in-progress ones. The reasoning is this: as the
 -            // update is empty, we have nothing to apply to storage in the commit phase, so the only reason to commit
 -            // would be to update the MRC. However, if we skip replaying those empty updates, then we don't need to
 -            // update the MRC for following updates to make progress (that is, if we didn't had the empty update skip
 -            // below _but_ skipped updating the MRC on empty updates, then we'd be stuck always proposing that same
 -            // empty update). And the reason skipping that replay is safe is that when an operation tries to propose
 -            // an empty value, there can be only 2 cases:
 -            //  1) the propose succeed, meaning a quorum of nodes accept it, in which case we are guaranteed no earlier
 -            //     pending operation can ever be replayed (which is what we want to guarantee with the empty update).
 -            //  2) the propose does not succeed. But then the operation proposing the empty update will not succeed
 -            //     either (it will retry or ultimately timeout), and we're actually ok if earlier pending operation gets
 -            //     replayed in that case.
 -            // Tl;dr, it is safe to skip committing empty updates _as long as_ we also skip replying them below. And
 -            // doing is more efficient, so we do so.
--            if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
--            {
--                Tracing.trace("Finishing incomplete paxos round {}", inProgress);
-                 if(isWrite)
-                     casWriteMetrics.unfinishedCommit.inc();
-                 else
-                     casReadMetrics.unfinishedCommit.inc();
 -                casMetrics.unfinishedCommit.inc();
--                Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
-                 if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime))
 -                if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos, queryStartNanoTime))
++                Commit inProgress = summary.mostRecentInProgressCommit;
++                Commit mostRecent = summary.mostRecentCommit;
++
++                // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that
++                // needs to be completed, so do it.
++                // One special case we make is for update that are empty (which are proposed by serial reads and
++                // non-applying CAS). While we could handle those as any other updates, we can optimize this somewhat by
++                // neither committing those empty updates, nor replaying in-progress ones. The reasoning is this: as the
++                // update is empty, we have nothing to apply to storage in the commit phase, so the only reason to commit
++                // would be to update the MRC. However, if we skip replaying those empty updates, then we don't need to
++                // update the MRC for following updates to make progress (that is, if we didn't had the empty update skip
++                // below _but_ skipped updating the MRC on empty updates, then we'd be stuck always proposing that same
++                // empty update). And the reason skipping that replay is safe is that when an operation tries to propose
++                // an empty value, there can be only 2 cases:
++                //  1) the propose succeed, meaning a quorum of nodes accept it, in which case we are guaranteed no earlier
++                //     pending operation can ever be replayed (which is what we want to guarantee with the empty update).
++                //  2) the propose does not succeed. But then the operation proposing the empty update will not succeed
++                //     either (it will retry or ultimately timeout), and we're actually ok if earlier pending operation gets
++                //     replayed in that case.
++                // Tl;dr, it is safe to skip committing empty updates _as long as_ we also skip replying them below. And
++                // doing is more efficient, so we do so.
++                if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
                  {
--                    try
++                    Tracing.trace("Finishing incomplete paxos round {}", inProgress);
++                    casMetrics.unfinishedCommit.inc();
++                    Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
++                    if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime))
                      {
                          commitPaxos(refreshedInProgress, consistencyForCommit, false, queryStartNanoTime);
                      }
--                    catch (WriteTimeoutException e)
++                    else
                      {
-                         recordCasContention(contentions);
 -                        recordCasContention(casMetrics, contentions);
--                        // We're still doing preparation for the paxos rounds, so we want to use the CAS (see CASSANDRA-8672)
--                        throw new WriteTimeoutException(WriteType.CAS, e.consistency, e.received, e.blockFor);
++                        Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
++                        // sleep a random amount to give the other proposer a chance to finish
++                        contentions++;
++                        Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
                      }
++                    continue;
                  }
--                else
++
++                // To be able to propose our value on a new round, we need a quorum of replica to have learn the previous one. Why is explained at:
++                // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
++                // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
++                // mean we lost messages), we pro-actively "repair" those nodes, and retry.
++                int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
++                Iterable<InetAddressAndPort> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec);
++                if (Iterables.size(missingMRC) > 0)
                  {
--                    Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
--                    // sleep a random amount to give the other proposer a chance to finish
--                    contentions++;
-                     Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), MILLISECONDS);
 -                    Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS);
++                    Tracing.trace("Repairing replicas that missed the most recent commit");
++                    sendCommit(mostRecent, missingMRC);
++                    // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait
++                    // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means
++                    // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that
++                    // latter ticket, we can pass CL.ALL to the commit above and remove the 'continue'.
++                    continue;
                  }
--                continue;
--            }
  
--            // To be able to propose our value on a new round, we need a quorum of replica to have learn the previous one. Why is explained at:
--            // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
--            // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
--            // mean we lost messages), we pro-actively "repair" those nodes, and retry.
--            int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
-             Iterable<InetAddressAndPort> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec);
 -            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec);
--            if (Iterables.size(missingMRC) > 0)
++                return new PaxosBallotAndContention(ballot, contentions);
++            }
++            catch (WriteTimeoutException e)
              {
--                Tracing.trace("Repairing replicas that missed the most recent commit");
--                sendCommit(mostRecent, missingMRC);
--                // TODO: provided commits don't invalid the prepare we just did above (which they don't), we could just wait
--                // for all the missingMRC to acknowledge this commit and then move on with proposing our value. But that means
--                // adding the ability to have commitPaxos block, which is exactly CASSANDRA-5442 will do. So once we have that
--                // latter ticket, we can pass CL.ALL to the commit above and remove the 'continue'.
--                continue;
++                // We're still doing preparation for the paxos rounds, so we want to use the CAS (see CASSANDRA-8672)
++                throw new CasWriteTimeoutException(WriteType.CAS, e.consistency, e.received, e.blockFor, contentions);
              }
--
-             return new PaxosBallotAndContention(ballot, contentions);
 -            return Pair.create(ballot, contentions);
          }
  
-         recordCasContention(contentions);
-         throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace)));
 -        recordCasContention(casMetrics, contentions);
 -        throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
++        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace)), contentions);
      }
  
      /**
@@@ -1666,7 -1755,7 +1806,7 @@@
                  throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint);
              }
  
-             result = fetchRows(group.queries, consistencyForCommitOrFetch, queryStartNanoTime);
 -            result = fetchRows(group.commands, consistencyForReplayCommitsOrFetch, queryStartNanoTime);
++            result = fetchRows(group.queries, consistencyForReplayCommitsOrFetch, queryStartNanoTime);
          }
          catch (UnavailableException e)
          {
diff --cc src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 26890a9,ea069f7..93941e9
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@@ -46,27 -45,24 +46,24 @@@ public class PrepareCallback extends Ab
      public boolean promised = true;
      public Commit mostRecentCommit;
      public Commit mostRecentInProgressCommit;
-     public Commit mostRecentInProgressCommitWithUpdate;
  
 -    private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
 +    private final Map<InetAddressAndPort, Commit> commitsByReplica = new ConcurrentHashMap<>();
  
 -    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
 +    public PrepareCallback(DecoratedKey key, TableMetadata metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
      {
          super(targets, consistency, queryStartNanoTime);
 -        // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
 +        // need to inject the right key in the empty commit so comparing with empty commits in the response works as expected
          mostRecentCommit = Commit.emptyCommit(key, metadata);
          mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
-         mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key, metadata);
      }
  
 -    public synchronized void response(MessageIn<PrepareResponse> message)
 +    public synchronized void onResponse(Message<PrepareResponse> message)
      {
          PrepareResponse response = message.payload;
 -        logger.trace("Prepare response {} from {}", response, message.from);
 +        logger.trace("Prepare response {} from {}", response, message.from());
  
-         // In case of clock skew, another node could be proposing with ballot that are quite a bit
-         // older than our own. In that case, we record the more recent commit we've received to make
-         // sure we re-prepare on an older ballot.
+         // We set the mostRecentInProgressCommit even if we're not promised as, in that case, the ballot of that commit
+         // will be used to avoid generating a ballot that has not chance to win on retry (think clock skew).
          if (response.inProgressCommit.isAfter(mostRecentInProgressCommit))
              mostRecentInProgressCommit = response.inProgressCommit;
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 546f318,dc8c604..2a71ec2
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -59,14 -59,14 +59,16 @@@ import org.apache.cassandra.cql3.QueryP
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.ReadResponse;
  import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.SystemKeyspaceMigrator40;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.monitoring.ApproximateTime;
+ import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.Constants;
 +import org.apache.cassandra.distributed.action.GossipHelper;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.api.ICoordinator;
  import org.apache.cassandra.distributed.api.IInstance;
@@@ -79,10 -79,12 +81,12 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.SimpleQueryResult;
  import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
  import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
 -import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.Gossiper;
+ import org.apache.cassandra.gms.VersionedValue;
  import org.apache.cassandra.hints.HintsService;
  import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
  import org.apache.cassandra.io.sstable.IndexSummaryManager;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.DataInputBuffer;
@@@ -538,6 -602,88 +542,85 @@@ public class Instance extends IsolatedE
          return YamlConfigurationLoader.fromMap(params, check, Config.class);
      }
  
+     public static void addToRing(boolean bootstrapping, IInstance peer)
+     {
+         try
+         {
+             IInstanceConfig config = peer.config();
+             IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+             Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
 -            InetAddress address = config.broadcastAddress().getAddress();
++            InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(peer.broadcastAddress());
+ 
+             UUID hostId = config.hostId();
+             Gossiper.runInGossipStageBlocking(() -> {
 -                Gossiper.instance.initializeNodeUnsafe(address, hostId, 1);
 -                Gossiper.instance.injectApplicationState(address,
 -                        ApplicationState.TOKENS,
 -                        new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
 -                StorageService.instance.onChange(address,
 -                        ApplicationState.STATUS,
 -                        bootstrapping
 -                                ? new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
 -                                : new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
 -                Gossiper.instance.realMarkAlive(address, Gossiper.instance.getEndpointStateForEndpoint(address));
++                Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
++                Gossiper.instance.injectApplicationState(addressAndPort,
++                                                         ApplicationState.TOKENS,
++                                                         new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
++                StorageService.instance.onChange(addressAndPort,
++                                                 ApplicationState.STATUS,
++                                                 bootstrapping
++                                                 ? new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
++                                                 : new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
++                Gossiper.instance.realMarkAlive(addressAndPort, Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
+             });
+             int messagingVersion = peer.isShutdown()
+                     ? MessagingService.current_version
+                     : Math.min(MessagingService.current_version, peer.getMessagingVersion());
 -            MessagingService.instance().setVersion(address, messagingVersion);
++            MessagingService.instance().versions.set(addressAndPort, messagingVersion);
+ 
 -            if (!bootstrapping)
 -                assert StorageService.instance.getTokenMetadata().isMember(address);
++            assert bootstrapping || StorageService.instance.getTokenMetadata().isMember(addressAndPort);
+             PendingRangeCalculatorService.instance.blockUntilFinished();
+         }
+         catch (Throwable e) // UnknownHostException
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     public static void removeFromRing(IInstance peer)
+     {
+         try
+         {
+             IInstanceConfig config = peer.config();
+             IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner"));
+             Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token"));
 -            InetAddress address = config.broadcastAddress().getAddress();
++            InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(peer.broadcastAddress());
+ 
+             Gossiper.runInGossipStageBlocking(() -> {
 -                StorageService.instance.onChange(address,
++                StorageService.instance.onChange(addressAndPort,
+                         ApplicationState.STATUS,
+                         new VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token), 0L));
 -                Gossiper.instance.removeEndpoint(address);
++                Gossiper.instance.removeEndpoint(addressAndPort);
+             });
+             PendingRangeCalculatorService.instance.blockUntilFinished();
+         }
+         catch (Throwable e) // UnknownHostException
+         {
+             throw new RuntimeException(e);
+         }
+     }
+ 
+     public static void addToRingNormal(IInstance peer)
+     {
+         addToRing(false, peer);
 -        assert StorageService.instance.getTokenMetadata().isMember(peer.broadcastAddress().getAddress());
++        assert StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(peer.broadcastAddress()));
+     }
+ 
+     public static void addToRingBootstrapping(IInstance peer)
+     {
+         addToRing(true, peer);
+     }
+ 
+     private static void initializeRing(ICluster cluster)
+     {
+         for (int i = 1 ; i <= cluster.size() ; ++i)
+             addToRing(false, cluster.get(i));
+ 
+         for (int i = 1; i <= cluster.size(); ++i)
 -            assert StorageService.instance.getTokenMetadata().isMember(cluster.get(i).broadcastAddress().getAddress());
 -
 -        StorageService.instance.setNormalModeUnsafe();
++            assert StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(cluster.get(i).broadcastAddress()));
+     }
+ 
      public Future<Void> shutdown()
      {
          return shutdown(true);
diff --cc test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index 0000000,473f56c..4cefbf0
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@@ -1,0 -1,684 +1,688 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.UUID;
+ import java.util.function.BiConsumer;
+ 
++
+ import org.junit.Assert;
+ import org.junit.Ignore;
+ import org.junit.Test;
+ 
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.api.IMessageFilters;
+ import org.apache.cassandra.distributed.impl.Instance;
 -import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.schema.TableId;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.row;
 -import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT;
 -import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE;
 -import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE;
 -import static org.apache.cassandra.net.MessagingService.Verb.READ;
++import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ;
++import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
++import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
++import static org.apache.cassandra.net.Verb.READ_REQ;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class CASTest extends TestBaseImpl
+ {
++    private static final Logger logger = LoggerFactory.getLogger(CASTest.class);
++
+     @Test
+     public void simpleUpdate() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 1));
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 1));
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     @Test
+     public void incompletePrepare() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
 -            IMessageFilters.Filter drop = cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop();
++            IMessageFilters.Filter drop = cluster.filters().verbs(PAXOS_PREPARE_REQ.id).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
 -            catch (RuntimeException wrapped)
++            catch (RuntimeException e)
+             {
 -                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
++                Assert.assertEquals("CAS operation timed out - encountered contentions: 0", e.getMessage());
+             }
+             drop.off();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL));
+         }
+     }
+ 
+     @Test
+     public void incompletePropose() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
 -            IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop();
++            IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
 -            catch (RuntimeException wrapped)
++            catch (RuntimeException e)
+             {
 -                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
++                Assert.assertEquals("CAS operation timed out - encountered contentions: 0", e.getMessage());
+             }
+             drop1.off();
+             // make sure we encounter one of the in-progress proposals so we complete it
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id).from(1).to(2).drop();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     @Test
+     public void incompleteCommit() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
 -            IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
++            IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
 -            catch (RuntimeException wrapped)
++            catch (RuntimeException e)
+             {
 -                Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
++                Assert.assertEquals("CAS operation timed out - encountered contentions: 0", e.getMessage());
+             }
+             drop1.off();
+             // make sure we see one of the successful commits
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(2).drop();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     private int[] paxosAndReadVerbs() {
 -        return new int[] {
 -            MessagingService.Verb.PAXOS_PREPARE.ordinal(),
 -            MessagingService.Verb.PAXOS_PROPOSE.ordinal(),
 -            MessagingService.Verb.PAXOS_COMMIT.ordinal(),
 -            MessagingService.Verb.READ.ordinal()
 -        };
++        return new int[] { PAXOS_PREPARE_REQ.id, PAXOS_PROPOSE_REQ.id, PAXOS_COMMIT_REQ.id, READ_REQ.id };
+     }
+ 
+     /**
+      * Base test to ensure that if a write times out but with a proposal accepted by some nodes (less then quorum), and
+      * a following SERIAL operation does not observe that write (the node having accepted it do not participate in that
+      * following operation), then that write is never applied, even when the nodes having accepted the original proposal
+      * participate.
+      *
+      * <p>In other words, if an operation timeout, it may or may not be applied, but that "fate" is persistently decided
+      * by the very SERIAL operation that "succeed" (in the sense of 'not timing out or throwing some other exception').
+      *
+      * @param postTimeoutOperation1 a SERIAL operation executed after an initial write that inserts the row [0, 0] times
+      *                              out. It is executed with a QUORUM of nodes that have _not_ see the timed out
+      *                              proposal, and so that operation should expect that the [0, 0] write has not taken
+      *                              place.
+      * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_ {@code postTimeoutOperation1}, with no
+      *                              write executed between the 2 operation. Contrarily to the 1st operation, the QORUM
+      *                              for this operation _will_ include the node that got the proposal for the [0, 0]
+      *                              insert but didn't participated to {@code postTimeoutOperation1}}. That operation
+      *                              should also no witness that [0, 0] write (since {@code postTimeoutOperation1}
+      *                              didn't).
+      * @param loseCommitOfOperation1 if {@code true}, the test will also drop the "commits" messages for
+      *                               {@code postTimeoutOperation1}. In general, the test should behave the same with or
+      *                               without that flag since a value is decided as soon as it has been "accepted by
+      *                               quorum" and the commits should always be properly replayed.
+      */
+     private void consistencyAfterWriteTimeoutTest(BiConsumer<String, ICoordinator> postTimeoutOperation1,
+                                                   BiConsumer<String, ICoordinator> postTimeoutOperation2,
+                                                   boolean loseCommitOfOperation1) throws IOException
+     {
+         // It's unclear why (haven't dug), but in some of the instance of this test method, there is a consistent 2+
+         // seconds pauses between the prepare and propose phases during the execution of 'postTimeoutOperation2'. This
+         // does not happen on 3.0 and there is no report of such long pauses otherwise, so an hypothesis is that this
+         // is due to the in-jvm dtest framework. This is is why we use a 4 seconds timeout here. Given this test is
+         // not about performance, this is probably ok, even if we ideally should dug into the underlying reason.
+         try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 4000L)
+                                                                       .set("cas_contention_timeout_in_ms", 200L))))
+         {
+             String table = KEYSPACE + ".t";
+             cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY KEY, v int)");
+ 
+             // We do a CAS insertion, but have with the PROPOSE message dropped on node 1 and 2. The CAS will not get
+             // through and should timeout. Importantly, node 3 does receive and answer the PROPOSE.
+             IMessageFilters.Filter dropProposeFilter = cluster.filters()
+                                                               .inbound()
 -                                                              .verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal())
++                                                              .verbs(PAXOS_PROPOSE_REQ.id)
++                                                              .from(3)
+                                                               .to(1, 2)
+                                                               .drop();
+             try
+             {
+                 // NOTE: the consistency below is the "commit" one, so it doesn't matter at all here.
 -                cluster.coordinator(1)
++                // NOTE 2: we use node 3 as coordinator because message filters don't currently work for locally
++                //   delivered messages and as we want to drop messages to 1 and 2, we can't use them.
++                cluster.coordinator(3)
+                        .execute("INSERT INTO " + table + "(k, v) VALUES (0, 0) IF NOT EXISTS", ConsistencyLevel.ONE);
+                 fail("The insertion should have timed-out");
+             }
+             catch (Exception e)
+             {
+                 // We expect a write timeout. If we get one, the test can continue, otherwise, we rethrow. Note that we
+                 // look at the root cause because the dtest framework effectively wrap the exception in a RuntimeException
+                 // (we could just look at the immediate cause, but this feel a bit more resilient this way).
+                 // TODO: we can't use an instanceof below because the WriteTimeoutException we get is from a different class
+                 //  loader than the one the test run under, and that's our poor-man work-around. This kind of things should
+                 //  be improved at the dtest API level.
 -                if (!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException"))
++                if (!e.getClass().getSimpleName().equals("CasWriteTimeoutException"))
+                     throw e;
+             }
+             finally
+             {
+                 dropProposeFilter.off();
+             }
+ 
+             // Isolates node 3 and executes the SERIAL operation. As neither node 1 or 2 got the initial insert proposal,
+             // there is nothing to "replay" and the operation should assert the table is still empty.
+             IMessageFilters.Filter ignoreNode3Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop();
+             IMessageFilters.Filter dropCommitFilter = null;
+             if (loseCommitOfOperation1)
+             {
 -                dropCommitFilter = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop();
++                dropCommitFilter = cluster.filters().verbs(PAXOS_COMMIT_REQ.id).to(1, 2).drop();
+             }
+             try
+             {
+                 postTimeoutOperation1.accept(table, cluster.coordinator(1));
+             }
+             finally
+             {
+                 ignoreNode3Filter.off();
+                 if (dropCommitFilter != null)
+                     dropCommitFilter.off();
+             }
+ 
+             // Node 3 is now back and we isolate node 2 to ensure the next read hits node 1 and 3.
+             // What we want to ensure is that despite node 3 having the initial insert in its paxos state in a position of
+             // being replayed, that insert is _not_ replayed (it would contradict serializability since the previous
+             // operation asserted nothing was inserted). It is this execution that failed before CASSANDRA-12126.
+             IMessageFilters.Filter ignoreNode2Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop();
+             try
+             {
+                 postTimeoutOperation2.accept(table, cluster.coordinator(1));
+             }
+             finally
+             {
+                 ignoreNode2Filter.off();
+             }
+         }
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following serial read does not see that write, then no following reads sees
+      * it, even if some nodes still have the write in their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+      */
+     @Test
+     public void readConsistencyAfterWriteTimeoutTest() throws IOException
+     {
+         BiConsumer<String, ICoordinator> operation =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+                                                                    ConsistencyLevel.SERIAL));
+ 
+         consistencyAfterWriteTimeoutTest(operation, operation, false);
+         consistencyAfterWriteTimeoutTest(operation, operation, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts, then a following CAS succeed but does not apply in a way that indicate the write
+      * has not applied, then no following CAS can see that initial insert , even if some nodes still have the write in
+      * their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+      */
+     @Test
+     public void nonApplyingCasConsistencyAfterWriteTimeout() throws IOException
+     {
+         // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits.
+         // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test.
+         BiConsumer<String, ICoordinator> operation =
+             (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+                                                                             ConsistencyLevel.ANY));
+         consistencyAfterWriteTimeoutTest(operation, operation, false);
+         consistencyAfterWriteTimeoutTest(operation, operation, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following serial read does not see that write, then no following CAS see
+      * that initial insert, even if some nodes still have the write in their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+      */
+     @Test
+     public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout() throws IOException
+     {
+         BiConsumer<String, ICoordinator> operation1 =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+                                                                    ConsistencyLevel.SERIAL));
+         BiConsumer<String, ICoordinator> operation2 =
+             (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+                                                                             ConsistencyLevel.QUORUM));
+         consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+         consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following CAS succeed but does not apply in a way that indicate the write
+      * has not applied, then following serial reads do no see that write, even if some nodes still have the write in
+      * their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126.
+      */
+     @Test
+     public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout() throws IOException
+     {
+         // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits.
+         // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test.
+         BiConsumer<String, ICoordinator> operation1 =
+             (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0",
+                                                                             ConsistencyLevel.ANY));
+         BiConsumer<String, ICoordinator> operation2 =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0",
+                                                                    ConsistencyLevel.SERIAL));
+         consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+         consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+     }
+ 
+     // TODO: this shoud probably be moved into the dtest API.
+     private void assertCasNotApplied(Object[][] resultSet)
+     {
+         assertFalse("Expected a CAS resultSet (with at least application result) but got an empty one.",
+                     resultSet.length == 0);
+         assertFalse("Invalid empty first row in CAS resultSet.", resultSet[0].length == 0);
+         Object wasApplied = resultSet[0][0];
+         assertTrue("Expected 1st column of CAS resultSet to be a boolean, but got a " + wasApplied.getClass(),
+                    wasApplied instanceof Boolean);
+         assertFalse("Expected CAS to not be applied, but was applied.", (Boolean)wasApplied);
+     }
+ 
+     /**
+      * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful
+      * conflicting with another successful write performed by a node that did witness the range movement
+      * Prepare, Propose and Commit A to {1, 2}
+      * Range moves to {2, 3, 4}
+      * Prepare and Propose B (=> !A) to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSuccessfulWriteBeforeRangeMovement() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {1} is unaware (yet) that {4} is an owner of the token
+             cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} => {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             for (int i = 1 ; i <= 3 ; ++i)
+                 cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {4} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(2).drop();
+             assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+         }
+     }
+ 
+     /**
+      * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful
+      * conflicting with another successful write performed by a node that did witness the range movement
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X)
+      *  -  X: Prepare, Propose and Commit A to {3, 4}
+      *  - !X: Prepare and Propose B (=>!A) to {1, 2}
+      */
+     @Ignore
+     @Test
+     public void testConflictingWritesWithStaleRingInformation() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {1} is unaware (yet) that {4} is an owner of the token
+             cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ 
+             // {4} promises, accepts and commits on !{2} => {3, 4}
+             int pk = pk(cluster, 1, 2);
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(2).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(4).to(2).drop();
+             assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // {1} promises, accepts and commmits on !{3} => {1, 2}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+         }
+     }
+ 
+     /**
+      * Successful write during range movement, not witnessed by read after range movement.
+      * Very similar to {@link #testConflictingWritesWithStaleRingInformation}.
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range movement witnessed by !X
+      *  - Any: Prepare and Read from {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSucccessfulWriteDuringRangeMovementFollowedByRead() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3} => {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                     row(pk, 1, 1));
+         }
+     }
+ 
+     /**
+      * Successful write during range movement not witnessed by write after range movement
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range movement witnessed by !X
+      *  - Any: Prepare and Propose to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+ 
+             // TODO: repair and verify base table state
+         }
+     }
+ 
+     /**
+      * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation
+      * being performed with stale ring information.
+      * This is a particular special case of stale ring information sequencing, which probably would be resolved
+      * by fixing each of the more isolated cases (but is unique, so deserving of its own test case).
+      * See CASSANDRA-15745
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -   X: Prepare to {2, 3, 4}
+      *  -   X: Propose to {4}
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range move visible by !X
+      *  - Any: Prepare and Read from {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(4).to(1).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(1, 2, 3).drop();
+             try
+             {
+                 cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk);
+                 Assert.assertTrue(false);
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+             }
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                     row(pk, 1, null, 2));
+         }
+     }
+ 
+     /**
+      * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation
+      * being performed with stale ring information.
+      * This is a particular special case of stale ring information sequencing, which probably would be resolved
+      * by fixing each of the more isolated cases (but is unique, so deserving of its own test case).
+      * See CASSANDRA-15745
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -   X: Prepare to {2, 3, 4}
+      *  -   X: Propose to {4}
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range move visible by !X
+      *  - Any: Prepare and Propose to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(4).to(1).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(4).to(1, 2, 3).drop();
+             try
+             {
+                 cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk);
+                 Assert.assertTrue(false);
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage());
+             }
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
 -            cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(1).to(3).drop();
++            cluster.filters().verbs(PAXOS_COMMIT_REQ.id).from(1).to(2, 3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
 -            cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop();
 -            cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PREPARE_REQ.id, READ_REQ.id).from(3).to(2).drop();
++            cluster.filters().verbs(PAXOS_PROPOSE_REQ.id).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk),
+                     row(false, 5, 1, null, 2));
+         }
+     }
+ 
+     private static int pk(Cluster cluster, int lb, int ub)
+     {
+         return pk(cluster.get(lb), cluster.get(ub));
+     }
+ 
+     private static int pk(IInstance lb, IInstance ub)
+     {
+         return pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")),
+                 Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token")));
+     }
+ 
+     private static int pk(Token lb, Token ub)
+     {
+         int pk = 0;
+         Token pkt;
+         while (lb.compareTo(pkt = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || ub.compareTo(pkt) < 0)
+             ++pk;
+         return pk;
+     }
+ 
+     private static void debugOwnership(Cluster cluster, int pk)
+     {
+         for (int i = 1 ; i <= cluster.size() ; ++i)
 -            System.out.println(i + ": " + cluster.get(i).appliesOnInstance((Integer v) -> StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE, Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v))))
++            System.out.println(i + ": " + cluster.get(i).appliesOnInstance((Integer v) -> StorageService.instance.getNaturalEndpointsWithPort(KEYSPACE, Int32Type.instance.decompose(v)))
+                     .apply(pk));
+     }
+ 
+     private static void debugPaxosState(Cluster cluster, int pk)
+     {
 -        UUID cfid = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId);
++        TableId tableId = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.id);
+         for (int i = 1 ; i <= cluster.size() ; ++i)
 -            for (Object[] row : cluster.get(i).executeInternal("select in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid))
++            for (Object[] row : cluster.get(i).executeInternal("select in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), tableId))
+                 System.out.println(i + ": " + (row[0] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[2])));
+     }
+ 
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org