You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2022/12/09 20:01:43 UTC

[cassandra] 05/05: Support CAS and serial read on Accord

This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9b7cb56c4e368c08e5927c239724d6236d1699c3
Author: Ariel Weisberg <aw...@apple.com>
AuthorDate: Fri Dec 2 14:05:06 2022 -0500

    Support CAS and serial read on Accord
    
    patch by Ariel Weisberg; Reviewed by ? and ? for CASSANDRA-18100
---
 src/java/org/apache/cassandra/config/Config.java   |  31 +-
 .../cassandra/config/DatabaseDescriptor.java       |  11 +-
 src/java/org/apache/cassandra/cql3/Lists.java      |  24 +-
 src/java/org/apache/cassandra/cql3/Operator.java   |  38 ++
 .../cassandra/cql3/conditions/ColumnCondition.java | 379 ++++++++++++++-
 .../cassandra/cql3/statements/CQL3CasRequest.java  | 134 +++++-
 .../org/apache/cassandra/db/ClusteringPrefix.java  |   2 +
 .../org/apache/cassandra/db/marshal/ListType.java  |   4 +-
 .../org/apache/cassandra/db/marshal/MapType.java   |  11 +-
 .../org/apache/cassandra/db/marshal/SetType.java   |   9 +-
 .../apache/cassandra/db/marshal/ValueAccessor.java |   9 +-
 .../serializers/CollectionSerializer.java          |   2 +-
 .../org/apache/cassandra/service/CASRequest.java   |   7 +
 .../org/apache/cassandra/service/StorageProxy.java |  51 ++-
 .../service/accord/AccordPartialCommand.java       |   4 +-
 .../service/accord/AccordSerializers.java          |  92 +++-
 .../serializers/BeginInvalidationSerializers.java  |   8 +-
 .../accord/serializers/CheckStatusSerializers.java |  16 +-
 .../accord/serializers/CommitSerializers.java      |   8 +-
 .../accord/serializers/PreacceptSerializers.java   |  13 +-
 .../accord/serializers/RecoverySerializers.java    |  11 +-
 .../cassandra/service/accord/txn/TxnCondition.java |  75 ++-
 .../cassandra/service/accord/txn/TxnData.java      |   1 +
 .../cassandra/service/accord/txn/TxnDataName.java  |  33 +-
 .../cassandra/service/accord/txn/TxnNamedRead.java |   2 +-
 .../cassandra/service/accord/txn/TxnQuery.java     |  44 +-
 .../cassandra/service/accord/txn/TxnRead.java      |  14 +-
 .../cassandra/service/accord/txn/TxnUpdate.java    |  23 +-
 .../service/paxos/PaxosPrepareRefresh.java         |   5 +-
 .../cassandra/service/paxos/PaxosRepair.java       |  29 +-
 .../apache/cassandra/service/paxos/PaxosState.java |   1 +
 .../org/apache/cassandra/utils/ByteBufferUtil.java |  85 +++-
 .../apache/cassandra/utils/NullableSerializer.java |   4 +-
 .../distributed/test/ByteBuddyExamples.java        |   1 -
 .../distributed/test/accord/AccordCQLTest.java     |  61 ++-
 .../distributed/test/accord/AccordTestBase.java    |  63 ++-
 .../cql3/conditions/ColumnConditionTest.java       | 506 ++++++++++++++++++---
 .../cassandra/service/accord/txn/TxnBuilder.java   |  17 +-
 38 files changed, 1550 insertions(+), 278 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index db1ed12c8c..eed967d68f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -21,19 +21,17 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Supplier;
-
 import javax.annotation.Nullable;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1054,6 +1052,8 @@ public class Config
      */
     public volatile int paxos_repair_parallelism = -1;
 
+    public volatile LegacyPaxosStrategy legacy_paxos_strategy = LegacyPaxosStrategy.migration;
+
     public volatile int max_top_size_partition_count = 10;
     public volatile int max_top_tombstone_partition_count = 10;
     public volatile DataStorageSpec.LongBytesBound min_tracked_partition_size = new DataStorageSpec.LongBytesBound("1MiB");
@@ -1149,6 +1149,31 @@ public class Config
         exception
     }
 
+    /*
+     * How to pick a consensus protocol for CAS
+     * and serial read operations. Transaction statements
+     * will always run on Accord. Legacy in this context includes PaxosV2.
+     */
+    public enum LegacyPaxosStrategy
+    {
+        /*
+         * Default setting
+         *
+         * Allow both Accord and PaxoV1/V2 to run on the same cluster
+         * Some keys and ranges might be running on Accord if they
+         * have been migrated and the rest will run on Paxos until
+         * they are migrated.
+         */
+        migration,
+
+        /*
+         * Everything will be run on Accord. Useful for new deployments
+         * that don't want to accidentally start using legacy Paxos
+         * requiring migration to Accord.
+         */
+        accord
+    }
+
     private static final Set<String> SENSITIVE_KEYS = new HashSet<String>() {{
         add("client_encryption_options");
         add("server_encryption_options");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2955635694..7015382aca 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
-
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -2838,6 +2837,16 @@ public class DatabaseDescriptor
         return conf.paxos_topology_repair_strict_each_quorum;
     }
 
+    public static Config.LegacyPaxosStrategy getLegacyPaxosStrategy()
+    {
+        return conf.legacy_paxos_strategy;
+    }
+
+    public static void setLegacyPaxosStrategy(Config.LegacyPaxosStrategy strategy)
+    {
+        conf.legacy_paxos_strategy = strategy;
+    }
+
     public static void setNativeTransportMaxRequestDataInFlightPerIpInBytes(long maxRequestDataInFlightInBytes)
     {
         if (maxRequestDataInFlightInBytes == -1)
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index f24e94aac3..2ad2790006 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -17,10 +17,6 @@
  */
 package org.apache.cassandra.cql3;
 
-import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
-import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
-import static org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,22 +26,30 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import org.apache.cassandra.db.guardrails.Guardrails;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.schema.ColumnMetadata;
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
+
 /**
  * Static helper methods and classes for lists.
  */
@@ -210,8 +214,10 @@ public abstract class Lists
                 List<?> l = type.getSerializer().deserializeForNativeProtocol(value, ByteBufferAccessor.instance, version);
                 List<ByteBuffer> elements = new ArrayList<>(l.size());
                 for (Object element : l)
+                {
                     // elements can be null in lists that represent a set of IN values
                     elements.add(element == null ? null : type.getElementsType().decompose(element));
+                }
                 return new Value(elements);
             }
             catch (MarshalException e)
diff --git a/src/java/org/apache/cassandra/cql3/Operator.java b/src/java/org/apache/cassandra/cql3/Operator.java
index bcb5f63be6..8985a33f71 100644
--- a/src/java/org/apache/cassandra/cql3/Operator.java
+++ b/src/java/org/apache/cassandra/cql3/Operator.java
@@ -26,9 +26,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.ListSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
 public enum Operator
 {
     EQ(0)
@@ -285,6 +290,17 @@ public enum Operator
         output.writeInt(b);
     }
 
+    /**
+     * Write the serialized version of this <code>Operator</code> to the specified output.
+     *
+     * @param output the output to write to
+     * @throws IOException if an I/O problem occurs while writing to the specified output
+     */
+    public void writeToUnsignedVInt(DataOutputPlus output) throws IOException
+    {
+        output.writeUnsignedVInt(b);
+    }
+
     public int getValue()
     {
         return b;
@@ -307,6 +323,23 @@ public enum Operator
           throw new IOException(String.format("Cannot resolve Relation.Type from binary representation: %s", b));
     }
 
+    /**
+     * Deserializes a <code>Operator</code> instance from the specified input.
+     *
+     * @param input the input to read from
+     * @return the <code>Operator</code> instance deserialized
+     * @throws IOException if a problem occurs while deserializing the <code>Type</code> instance.
+     */
+    public static Operator readFromUnsignedVInt(DataInputPlus input) throws IOException
+    {
+        int b = checkedCast(input.readUnsignedVInt());
+        for (Operator operator : values())
+            if (operator.b == b)
+                return operator;
+
+        throw new IOException(String.format("Cannot resolve Relation.Type from binary representation: %s", b));
+    }
+
     /**
      * Whether 2 values satisfy this operator (given the type they should be compared with).
      */
@@ -358,4 +391,9 @@ public enum Operator
     {
         return this == CONTAINS_KEY;
     }
+
+    public long sizeAsUnsignedVInt()
+    {
+        return sizeofUnsignedVInt(b);
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
index 9d9506ff53..ca45c2adf4 100644
--- a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
@@ -17,24 +17,73 @@
  */
 package org.apache.cassandra.cql3.conditions;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Suppliers;
 import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.Term.Terminal;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.cql3.VariableSpecifications;
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
 
-import static org.apache.cassandra.cql3.statements.RequestValidations.*;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.primitives.Ints.checkedCast;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+import static org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER;
+import static org.apache.cassandra.utils.ByteBufferUtil.vintNullableSerializer;
+import static org.apache.cassandra.utils.ByteBufferUtil.vintSerializer;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
 
 /**
  * A CQL3 condition on the value of a column or collection element.  For example, "UPDATE .. IF a = 0".
@@ -73,7 +122,7 @@ public abstract class ColumnCondition
         terms.collectMarkerSpecification(boundNames);
     }
 
-    public abstract ColumnCondition.Bound bind(QueryOptions options);
+    public abstract Bound bind(QueryOptions options);
 
     protected final List<ByteBuffer> bindAndGetTerms(QueryOptions options)
     {
@@ -107,7 +156,7 @@ public abstract class ColumnCondition
         {
             T value = values.get(i);
             // The value can be ByteBuffer or Constants.Value so we need to check the 2 type of UNSET
-            if (value != ByteBufferUtil.UNSET_BYTE_BUFFER && value != Constants.UNSET_VALUE)
+            if (value != UNSET_BYTE_BUFFER && value != UNSET_VALUE)
                 filtered.add(value);
         }
         return filtered;
@@ -210,13 +259,51 @@ public abstract class ColumnCondition
         return new UDTFieldCondition(column, udtField, op, terms);
     }
 
+    /*
+     * Don't change or remove entries (and ordinals) or it will break cross version compatibility
+     * as the ordinals are sent in messages and stored by Accord for in flight transactions
+     */
+    enum BoundKind
+    {
+        ELEMENT_ACCESS,
+        MULTI_CELL_COLLECTION,
+        MULTI_CELL_UDT,
+        SIMPLE,
+        UDT_FIELD_ACCESS;
+
+        @SuppressWarnings("rawtypes")
+        BoundSerializer serializer()
+        {
+            switch(this)
+            {
+                case ELEMENT_ACCESS:
+                    return ElementAccessBound.serializer;
+                case MULTI_CELL_COLLECTION:
+                    return MultiCellCollectionBound.serializer;
+                case MULTI_CELL_UDT:
+                    return MultiCellUdtBound.serializer;
+                case SIMPLE:
+                     return SimpleBound.serializer;
+                case UDT_FIELD_ACCESS:
+                    return UDTFieldAccessBound.serializer;
+                default:
+                    throw new AssertionError("Shouldn't have an enum with no serializer");
+            }
+        }
+    }
+
     public static abstract class Bound
     {
+        @Nonnull
         public final ColumnMetadata column;
+
+        @Nonnull
         public final Operator comparisonOperator;
 
         protected Bound(ColumnMetadata column, Operator operator)
         {
+            checkNotNull(column);
+            checkNotNull(operator);
             this.column = column;
             // If the operator is an IN we want to compare the value using an EQ.
             this.comparisonOperator = operator.isIN() ? Operator.EQ : operator;
@@ -235,7 +322,7 @@ public abstract class ColumnCondition
         /** Returns true if the operator is satisfied (i.e. "otherValue operator value == true"), false otherwise. */
         public static boolean compareWithOperator(Operator operator, AbstractType<?> type, ByteBuffer value, ByteBuffer otherValue)
         {
-            if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+            if (value == UNSET_BYTE_BUFFER)
                 throw invalidRequest("Invalid 'unset' value in condition");
 
             if (value == null)
@@ -257,6 +344,55 @@ public abstract class ColumnCondition
             }
             return operator.isSatisfiedBy(type, otherValue, value);
         }
+
+        void checkForUnsetValues(List<ByteBuffer> values)
+        {
+            for (ByteBuffer buffer : values)
+                if (buffer == UNSET_BYTE_BUFFER)
+                    throw invalidRequest("Invalid 'unset' value in condition");
+        }
+        protected abstract BoundKind kind();
+
+        public static final IVersionedSerializer<Bound> serializer = new IVersionedSerializer<Bound>()
+        {
+            @Override
+            @SuppressWarnings("unchecked")
+            public void serialize(Bound bound, DataOutputPlus out, int version) throws IOException
+            {
+                columnMetadataSerializer.serialize(bound.column, out, version);
+                bound.comparisonOperator.writeToUnsignedVInt(out);
+                BoundKind kind = bound.kind();
+                out.writeUnsignedVInt(kind.ordinal());
+                kind.serializer().serialize(bound, out, version);
+            }
+
+            @Override
+            public Bound deserialize(DataInputPlus in, int version) throws IOException
+            {
+                ColumnMetadata column = columnMetadataSerializer.deserialize(in, version);
+                Operator comparisonOperator = Operator.readFromUnsignedVInt(in);
+                BoundKind kind = BoundKind.values()[checkedCast(in.readUnsignedVInt())];
+                return kind.serializer().deserialize(in, version, column, comparisonOperator);
+            }
+
+            @Override
+            @SuppressWarnings("unchecked")
+            public long serializedSize(Bound bound, int version)
+            {
+                BoundKind kind = bound.kind();
+                return columnMetadataSerializer.serializedSize(bound.column, version)
+                       + bound.comparisonOperator.sizeAsUnsignedVInt()
+                       + sizeofUnsignedVInt(kind.ordinal())
+                       + kind.serializer().serializedSize(bound, version);
+            }
+        };
+    }
+
+    private interface BoundSerializer<T extends Bound>
+    {
+        void serialize(T bound, DataOutputPlus out, int version) throws IOException;
+        Bound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException;
+        long serializedSize(T condition, int version);
     }
 
     protected static final Cell<?> getCell(Row row, ColumnMetadata column)
@@ -317,6 +453,7 @@ public abstract class ColumnCondition
         private SimpleBound(ColumnMetadata column, Operator operator, List<ByteBuffer> values)
         {
             super(column, operator);
+            checkForUnsetValues(values);
             this.values = values;
         }
 
@@ -341,6 +478,34 @@ public abstract class ColumnCondition
             }
             return false;
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.SIMPLE;
+        }
+
+        private static final BoundSerializer<SimpleBound> serializer = new BoundSerializer<SimpleBound>()
+        {
+            @Override
+            public void serialize(SimpleBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                serializeList(bound.values, out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public SimpleBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                return new SimpleBound(column, operator, values);
+            }
+
+            @Override
+            public long serializedSize(SimpleBound bound, int version)
+            {
+                return serializedListSize(bound.values, version, vintNullableSerializer);
+            }
+        };
     }
 
     /**
@@ -351,11 +516,13 @@ public abstract class ColumnCondition
         /**
          * The collection element
          */
+        @Nonnull
         private final ByteBuffer collectionElement;
 
         /**
          * The conditions values.
          */
+        @Nonnull
         private final List<ByteBuffer> values;
 
         public ElementAccessBound(ColumnMetadata column,
@@ -364,6 +531,11 @@ public abstract class ColumnCondition
                                    List<ByteBuffer> values)
         {
             super(column, operator);
+            checkForUnsetValues(values);
+
+            boolean isMap = column.type instanceof MapType;
+            if (collectionElement == null)
+                throw invalidRequest("Invalid null value for %s element access", isMap ? "map" : "list");
 
             this.collectionElement = collectionElement;
             this.values = values;
@@ -374,9 +546,6 @@ public abstract class ColumnCondition
         {
             boolean isMap = column.type instanceof MapType;
 
-            if (collectionElement == null)
-                throw invalidRequest("Invalid null value for %s element access", isMap ? "map" : "list");
-
             if (isMap)
             {
                 MapType<?, ?> mapType = (MapType<?, ?>) column.type;
@@ -445,6 +614,36 @@ public abstract class ColumnCondition
             checkFalse(idx < 0, "Invalid negative list index %d", idx);
             return idx;
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.ELEMENT_ACCESS;
+        }
+
+        private static final BoundSerializer<ElementAccessBound> serializer = new BoundSerializer<ElementAccessBound>()
+        {
+            @Override
+            public void serialize(ElementAccessBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                vintSerializer.serialize(bound.collectionElement, out, version);
+                serializeList(bound.values, out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public ElementAccessBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                ByteBuffer collectionElement = vintSerializer.deserialize(in, version);
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                return new ElementAccessBound(column, collectionElement, operator, values);
+            }
+
+            @Override
+            public long serializedSize(ElementAccessBound bound , int version)
+            {
+                return vintSerializer.serializedSize(bound.collectionElement, version) + serializedListSize(bound.values, version, vintNullableSerializer);
+            }
+        };
     }
 
     /**
@@ -452,26 +651,44 @@ public abstract class ColumnCondition
      */
     public static final class MultiCellCollectionBound extends Bound
     {
-        private final List<Term.Terminal> values;
+        @Nonnull
+        private final List<Terminal> values;
 
-        public MultiCellCollectionBound(ColumnMetadata column, Operator operator, List<Term.Terminal> values)
+        @Nullable
+        private Supplier<List<ByteBuffer>> serializedValues;
+
+        public MultiCellCollectionBound(ColumnMetadata column, Operator operator, List<Terminal> values)
         {
             super(column, operator);
+            checkNotNull(values);
             assert column.type.isMultiCell();
             this.values = values;
         }
 
+        public Supplier<List<ByteBuffer>> serializedValues()
+        {
+            if (serializedValues != null)
+                return serializedValues;
+
+            serializedValues = Suppliers.memoize(() ->
+                                                 values.stream()
+                                                       .map(v -> v == null ? null : v.get(ProtocolVersion.CURRENT))
+                                                       .collect(toList()));
+
+            return serializedValues;
+        }
+
         public boolean appliesTo(Row row)
         {
             return appliesTo(column, comparisonOperator, values, row);
         }
 
-        public static boolean appliesTo(ColumnMetadata column, Operator operator, List<Term.Terminal> values, Row row)
+        public static boolean appliesTo(ColumnMetadata column, Operator operator, List<Terminal> values, Row row)
         {
             CollectionType<?> type = (CollectionType<?>) column.type;
 
             // copy iterator contents so that we can properly reuse them for each comparison with an IN value
-            for (Term.Terminal value : values)
+            for (Terminal value : values)
             {
                 Iterator<Cell<?>> iter = getCells(row, column);
                 if (value == null)
@@ -495,7 +712,7 @@ public abstract class ColumnCondition
             return false;
         }
 
-        private static boolean valueAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, Term.Terminal value, Operator operator)
+        private static boolean valueAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, Terminal value, Operator operator)
         {
             if (value == null)
                 return !iter.hasNext();
@@ -527,7 +744,10 @@ public abstract class ColumnCondition
 
                 // for lists we use the cell value; for sets we use the cell name
                 ByteBuffer cellValue = isSet ? iter.next().path().get(0) : iter.next().buffer();
-                int comparison = type.compare(cellValue, conditionIter.next());
+                ByteBuffer conditionValue = conditionIter.next();
+                if (conditionValue == null)
+                    conditionValue = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                int comparison = type.compare(cellValue, conditionValue);
                 if (comparison != 0)
                     return evaluateComparisonWithOperator(comparison, operator);
             }
@@ -546,6 +766,7 @@ public abstract class ColumnCondition
 
         private static boolean setAppliesTo(SetType<?> type, Iterator<Cell<?>> iter, Set<ByteBuffer> elements, Operator operator)
         {
+            // This is redundant since it is already a sorted set?
             ArrayList<ByteBuffer> sortedElements = new ArrayList<>(elements);
             Collections.sort(sortedElements, type.getElementsType());
             return setOrListAppliesTo(type.getElementsType(), iter, sortedElements.iterator(), operator, true);
@@ -562,13 +783,17 @@ public abstract class ColumnCondition
                 Map.Entry<ByteBuffer, ByteBuffer> conditionEntry = conditionIter.next();
                 Cell<?> c = iter.next();
 
+                ByteBuffer conditionEntryKey = conditionEntry.getKey();
+
                 // compare the keys
-                int comparison = type.getKeysType().compare(c.path().get(0), conditionEntry.getKey());
+                int comparison = type.getKeysType().compare(c.path().get(0), conditionEntryKey);
                 if (comparison != 0)
                     return evaluateComparisonWithOperator(comparison, operator);
 
+                ByteBuffer conditionEntryValue = conditionEntry.getValue();
+
                 // compare the values
-                comparison = type.getValuesType().compare(c.buffer(), conditionEntry.getValue());
+                comparison = type.getValuesType().compare(c.buffer(), conditionEntryValue);
                 if (comparison != 0)
                     return evaluateComparisonWithOperator(comparison, operator);
             }
@@ -579,6 +804,43 @@ public abstract class ColumnCondition
             // they're equal
             return operator == Operator.EQ || operator == Operator.LTE || operator == Operator.GTE;
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.MULTI_CELL_COLLECTION;
+        }
+
+        private static final BoundSerializer<MultiCellCollectionBound> serializer = new BoundSerializer<MultiCellCollectionBound>()
+        {
+            @Override
+            public void serialize(MultiCellCollectionBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                serializeList(bound.serializedValues().get(), out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public MultiCellCollectionBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                List<Terminal> terminals;
+                if (operator.isContains() || operator.isContainsKey())
+                {
+                    terminals = values.stream().map(Constants.Value::new).collect(toList());
+                }
+                else
+                {
+                    terminals = values.stream().map(b -> deserializeCqlCollectionAsTerm(b, column.type)).collect(toList());
+                }
+                return new MultiCellCollectionBound(column, operator, terminals);
+            }
+
+            @Override
+            public long serializedSize(MultiCellCollectionBound bound, int version)
+            {
+                return serializedListSize(bound.serializedValues().get(), version, vintNullableSerializer);
+            }
+        };
     }
 
     private static boolean containsAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, ByteBuffer value, Operator operator)
@@ -622,16 +884,21 @@ public abstract class ColumnCondition
         /**
          * The UDT field.
          */
+        @Nonnull
         private final FieldIdentifier field;
 
         /**
          * The conditions values.
          */
+        @Nonnull
         private final List<ByteBuffer> values;
 
         private UDTFieldAccessBound(ColumnMetadata column, FieldIdentifier field, Operator operator, List<ByteBuffer> values)
         {
             super(column, operator);
+            checkNotNull(field);
+            checkNotNull(values);
+            checkForUnsetValues(values);
             assert column.type.isUDT() && field != null;
             this.field = field;
             this.values = values;
@@ -647,6 +914,7 @@ public abstract class ColumnCondition
         {
             UserType userType = (UserType) column.type;
 
+
             if (column.type.isMultiCell())
             {
                 Cell<?> cell = getCell(row, column, userType.cellPathForField(field));
@@ -677,6 +945,37 @@ public abstract class ColumnCondition
         {
             return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.UDT_FIELD_ACCESS;
+        }
+
+        private static final BoundSerializer<UDTFieldAccessBound> serializer = new BoundSerializer<UDTFieldAccessBound>()
+        {
+            @Override
+            public void serialize(UDTFieldAccessBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                vintNullableSerializer.serialize(bound.field.bytes, out, version);
+                serializeList(bound.values, out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public UDTFieldAccessBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                FieldIdentifier field = new FieldIdentifier(vintNullableSerializer.deserialize(in, version));
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                return new UDTFieldAccessBound(column, field, operator, values);
+            }
+
+            @Override
+            public long serializedSize(UDTFieldAccessBound bound, int version)
+            {
+
+                return vintNullableSerializer.serializedSize(bound.field.bytes, version) + serializedListSize(bound.values, version, vintNullableSerializer);
+            }
+        };
     }
 
     /**
@@ -687,16 +986,21 @@ public abstract class ColumnCondition
         /**
          * The conditions values.
          */
+        @Nonnull
         private final List<ByteBuffer> values;
 
         /**
          * The protocol version
          */
+        @Nonnull
         private final ProtocolVersion protocolVersion;
 
         public MultiCellUdtBound(ColumnMetadata column, Operator op, List<ByteBuffer> values, ProtocolVersion protocolVersion)
         {
             super(column, op);
+            checkNotNull(values);
+            checkNotNull(protocolVersion);
+            checkForUnsetValues(values);
             assert column.type.isMultiCell();
             this.values = values;
             this.protocolVersion = protocolVersion;
@@ -730,6 +1034,37 @@ public abstract class ColumnCondition
         {
             return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
         }
+
+        @Override
+        public BoundKind kind()
+        {
+            return BoundKind.MULTI_CELL_UDT;
+        }
+
+        private static final BoundSerializer<MultiCellUdtBound> serializer = new BoundSerializer<MultiCellUdtBound>()
+        {
+            @Override
+            public void serialize(MultiCellUdtBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                serializeList(bound.values, out, version, vintNullableSerializer);
+                out.writeUnsignedVInt(bound.protocolVersion.asInt());
+            }
+
+            @Override
+            public MultiCellUdtBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                int protocolVersion = in.readUnsignedVIntChecked();
+                // Does decode actually do what we want?
+                return new MultiCellUdtBound(column, operator, values, ProtocolVersion.decode(protocolVersion, true));
+            }
+
+            @Override
+            public long serializedSize(MultiCellUdtBound bound, int version)
+            {
+                return serializedListSize(bound.values, version, vintNullableSerializer) + sizeofUnsignedVInt(bound.protocolVersion.asInt());
+            }
+        };
     }
 
     public static class Raw
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 02931005a7..fe68304598 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -18,30 +18,61 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.index.IndexRegistry;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.cql3.*;
+import accord.api.Update;
+import accord.primitives.Txn;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.UpdateParameters;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.CASRequest;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.TimeUUID;
 
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.cassandra.service.accord.txn.TxnDataName.Kind.USER;
 
 /**
  * Processed CAS conditions and update on potentially multiple rows of the same partition.
@@ -343,6 +374,8 @@ public class CQL3CasRequest implements CASRequest
         }
 
         public abstract boolean appliesTo(FilteredPartition current) throws InvalidRequestException;
+
+        public abstract TxnCondition asTxnCondition();
     }
 
     private static class NotExistCondition extends RowCondition
@@ -356,6 +389,14 @@ public class CQL3CasRequest implements CASRequest
         {
             return current.getRow(clustering) == null;
         }
+
+        @Override
+        public TxnCondition asTxnCondition()
+        {
+            TxnDataName txnDataName = new TxnDataName(USER, clustering, TxnRead.DUMMY_NAME);
+            TxnReference txnReference = new TxnReference(txnDataName, null);
+            return new TxnCondition.Exists(txnReference, TxnCondition.Kind.IS_NULL);
+        }
     }
 
     private static class ExistCondition extends RowCondition
@@ -369,6 +410,14 @@ public class CQL3CasRequest implements CASRequest
         {
             return current.getRow(clustering) != null;
         }
+
+        @Override
+        public TxnCondition asTxnCondition()
+        {
+            TxnDataName txnDataName = new TxnDataName(USER, clustering, TxnRead.DUMMY_NAME);
+            TxnReference txnReference = new TxnReference(txnDataName, null);
+            return new TxnCondition.Exists(txnReference, TxnCondition.Kind.IS_NOT_NULL);
+        }
     }
 
     private static class ColumnsConditions extends RowCondition
@@ -399,6 +448,12 @@ public class CQL3CasRequest implements CASRequest
             }
             return true;
         }
+
+        @Override
+        public TxnCondition asTxnCondition()
+        {
+            return new TxnCondition.ColumnConditionsAdapter(clustering, conditions.values());
+        }
     }
     
     @Override
@@ -406,4 +461,59 @@ public class CQL3CasRequest implements CASRequest
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public Txn toAccordTxn(ClientState clientState, int nowInSecs) {
+        SinglePartitionReadCommand readCommand = readCommand(nowInSecs);
+        Update update = createUpdate(clientState);
+        // In a CAS request only one key is supported and writes
+        // can't be dependent on any data that is read (only conditions)
+        // so the only relevant keys are the read key
+        TxnRead read = TxnRead.createRead(readCommand);
+        return new Txn.InMemory(read.keys(), read, TxnQuery.CONDITION, update);
+    }
+
+    private Update createUpdate(ClientState clientState)
+    {
+        return new TxnUpdate(createWriteFragments(clientState), createCondition());
+    }
+
+    private TxnCondition createCondition()
+    {
+        List<TxnCondition> txnConditions = new ArrayList<>();
+        if (staticConditions != null)
+        {
+            txnConditions.add(staticConditions.asTxnCondition());
+        }
+        conditions.values()
+                  .stream()
+                  .map(RowCondition::asTxnCondition)
+                  .forEach(txnConditions::add);
+        // CAS forbids empty conditions
+        checkState(!txnConditions.isEmpty());
+        return conditions.size() == 1 ? txnConditions.get(0) : new TxnCondition.BooleanGroup(TxnCondition.Kind.AND, txnConditions);
+    }
+
+    private List<TxnWrite.Fragment> createWriteFragments(ClientState state)
+    {
+        List<TxnWrite.Fragment> fragments = new ArrayList<>();
+        int idx = 0;
+        for (RowUpdate update : updates)
+        {
+            ModificationStatement modification = update.stmt;
+            QueryOptions options = update.options;
+            TxnWrite.Fragment fragment = modification.getTxnWriteFragment(idx++, state, options);
+            fragments.add(fragment);
+        }
+        return fragments;
+    }
+
+    @Override
+    public RowIterator toCasResult(TxnData txnData)
+    {
+        FilteredPartition partition = txnData.get(TxnRead.DUMMY);
+        if (partition != null)
+            return partition.rowIterator();
+        return null;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index c7a2782ece..7c6fdd04bd 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * A clustering prefix is the unit of what a {@link ClusteringComparator} can compare.
  * <p>
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 99ae73cc8f..0f39180448 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -32,8 +32,8 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.ListSerializer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 
@@ -123,7 +123,7 @@ public class ListType<T> extends CollectionType<List<T>>
     }
 
     @Override
-    public AbstractType<?> freeze()
+    public ListType<T> freeze()
     {
         if (isMultiCell)
             return getInstance(this.elements, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index be74ff1626..89b6b3423e 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -18,7 +18,12 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.cql3.Json;
@@ -31,11 +36,11 @@ import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MapSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
-import org.apache.cassandra.utils.Pair;
 
 public class MapType<K, V> extends CollectionType<Map<K, V>>
 {
@@ -141,7 +146,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
-    public AbstractType<?> freeze()
+    public MapType<K, V> freeze()
     {
         if (isMultiCell)
             return getInstance(this.keys, this.values, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 67699ac3da..c831e0d5e5 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -18,7 +18,12 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.cql3.Json;
@@ -114,7 +119,7 @@ public class SetType<T> extends CollectionType<Set<T>>
     }
 
     @Override
-    public AbstractType<?> freeze()
+    public SetType<T> freeze()
     {
         if (isMultiCell)
             return getInstance(this.elements, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
index d454c5e188..5f46e45c0f 100644
--- a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
+++ b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
@@ -40,7 +40,12 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.utils.TimeUUID;
 
-import static org.apache.cassandra.db.ClusteringPrefix.Kind.*;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_START_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_START_BOUND;
 
 /**
  * ValueAccessor allows serializers and other code dealing with raw bytes to operate on different backing types
@@ -130,6 +135,8 @@ public interface ValueAccessor<V>
      */
     default boolean isEmpty(V value)
     {
+        if (value == null)
+            System.out.println("oops");
         return size(value) == 0;
     }
 
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 36e346c3e7..a9cc57e75d 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -25,10 +25,10 @@ import java.util.List;
 import com.google.common.collect.Range;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class CollectionSerializer<T> extends TypeSerializer<T>
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java
index 19966c883c..b61f7ab79c 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,10 +17,13 @@
  */
 package org.apache.cassandra.service;
 
+import accord.primitives.Txn;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.service.paxos.Ballot;
 
 /**
@@ -44,4 +47,8 @@ public interface CASRequest
      * are passed as argument.
      */
     public PartitionUpdate makeUpdates(FilteredPartition current, ClientState clientState, Ballot ballot) throws InvalidRequestException;
+
+    public Txn toAccordTxn(ClientState clientState, int nowInSecs);
+
+    RowIterator toCasResult(TxnData data);
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0e873506d4..f417b0344b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -46,6 +46,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.primitives.Txn;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
@@ -122,6 +123,8 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.ContentionStrategy;
@@ -145,12 +148,10 @@ import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
+import static com.google.common.collect.Iterables.concat;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-import static com.google.common.collect.Iterables.concat;
-import static org.apache.commons.lang3.StringUtils.join;
-
+import static org.apache.cassandra.config.Config.LegacyPaxosStrategy.accord;
 import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
@@ -178,6 +179,7 @@ import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
+import static org.apache.commons.lang3.StringUtils.join;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -318,9 +320,17 @@ public class StorageProxy implements StorageProxyMBean
                                                             key.toString(), keyspaceName, cfName));
         }
 
-        return Paxos.useV2()
-                ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState)
-                : legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime);
+        if (DatabaseDescriptor.getLegacyPaxosStrategy() == accord)
+        {
+            TxnData data = AccordService.instance().coordinate(request.toAccordTxn(clientState, nowInSeconds));
+            return request.toCasResult(data);
+        }
+        else
+        {
+            return Paxos.useV2()
+                   ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState)
+                   : legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime);
+        }
     }
 
     public static RowIterator legacyCas(String keyspaceName,
@@ -1847,7 +1857,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         return consistencyLevel.isSerialConsistency()
-             ? readWithPaxos(group, consistencyLevel, queryStartNanoTime)
+             ? readWithConsensus(group, consistencyLevel, queryStartNanoTime)
              : readRegular(group, consistencyLevel, queryStartNanoTime);
     }
 
@@ -1861,12 +1871,29 @@ public class StorageProxy implements StorageProxyMBean
         return !StorageService.instance.isBootstrapMode();
     }
 
-    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
+    private static PartitionIterator readWithConsensus(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
-        return Paxos.useV2()
-                ? Paxos.read(group, consistencyLevel)
-                : legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime);
+        if (DatabaseDescriptor.getLegacyPaxosStrategy() == accord)
+        {
+            return readWithAccord(group);
+        }
+        else
+        {
+            return Paxos.useV2()
+                   ? Paxos.read(group, consistencyLevel)
+                   : legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime);
+        }
+    }
+
+    private static PartitionIterator readWithAccord(SinglePartitionReadCommand.Group group)
+    {
+        if (group.queries.size() > 1)
+            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
+        TxnRead read = TxnRead.createRead(group.queries.get(0));
+        Txn txn = new Txn.InMemory(read.keys(), read, TxnQuery.ALL, null);
+        TxnData data = AccordService.instance().coordinate(txn);
+        return PartitionIterators.singletonIterator(data.get(TxnRead.DUMMY).rowIterator());
     }
 
     private static PartitionIterator legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 5036e2b57c..b8da523ebf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -43,7 +43,7 @@ import static org.apache.cassandra.utils.CollectionSerializers.serializeCollecti
 import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
 {
@@ -180,7 +180,7 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
         {
             int size = Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version));
             size += CommandSerializers.txnId.serializedSize();
-            size += serializedSizeNullable(command.executeAt(), version.msgVersion, CommandSerializers.timestamp);
+            size += serializedNullableSize(command.executeAt(), version.msgVersion, CommandSerializers.timestamp);
             size += CommandSerializers.status.serializedSize(command.status(), version.msgVersion);
             size += CommandSerializers.kind.serializedSize(command.kind(), version.msgVersion);
             size += serializedCollectionSize(command.deps, version.msgVersion, CommandSerializers.txnId);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
index a0faaef0ff..a41759a911 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
@@ -20,18 +20,22 @@ package org.apache.cassandra.service.accord;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.cassandra.cql3.Lists;
 import org.apache.cassandra.cql3.Maps;
 import org.apache.cassandra.cql3.Sets;
 import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.ArrayClustering;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringPrefix;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -47,6 +51,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static com.google.common.primitives.Ints.checkedCast;
 import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 import static org.apache.cassandra.db.marshal.CollectionType.Kind.LIST;
@@ -71,6 +76,14 @@ public class AccordSerializers
         }
     }
 
+    public static <T> ByteBuffer[] serialize(List<T> items, IVersionedSerializer<T> serializer)
+    {
+        ByteBuffer[] result = new ByteBuffer[items.size()];
+        for (int i = 0, mi = items.size(); i < mi; i++)
+            result[i] = serialize(items.get(i), serializer);
+        return result;
+    }
+
     public static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> serializer)
     {
         try (DataInputBuffer in = new DataInputBuffer(bytes, true))
@@ -157,7 +170,7 @@ public class AccordSerializers
             String table = in.readUTF();
             ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
             TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table);
-            
+
             // TODO: Can the metadata be null if the table has been dropped?
             return metadata.getColumn(name);
         }
@@ -166,8 +179,8 @@ public class AccordSerializers
         public long serializedSize(ColumnMetadata column, int version)
         {
             long size = 0;
-            size += TypeSizes.sizeof(column.ksName);
-            size += TypeSizes.sizeof(column.cfName);
+            size += sizeof(column.ksName);
+            size += sizeof(column.cfName);
             size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes);
             return size;
         }
@@ -193,4 +206,73 @@ public class AccordSerializers
             return TableId.serializedSize();
         }
     };
-}
+
+    public static final IVersionedSerializer<Clustering<?>> clusteringPrefixSerializer = new IVersionedSerializer<Clustering<?>>()
+    {
+        @Override
+        public void serialize(Clustering<?> clustering, DataOutputPlus out, int version) throws IOException
+        {
+            doSerialize(clustering, out);
+        }
+
+        public <V> void doSerialize(Clustering<V> clustering, DataOutputPlus out) throws IOException
+        {
+            if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
+            {
+                out.writeBoolean(true);
+            }
+            else
+            {
+                out.writeBoolean(false);
+                out.writeUnsignedVInt(clustering.size());
+                ValueAccessor<V> accessor = clustering.accessor();
+                for (int i = 0; i < clustering.size(); i++)
+                {
+                    accessor.writeWithVIntLength(clustering.get(i), out);
+                }
+            }
+        }
+
+        @Override
+        public Clustering<?> deserialize(DataInputPlus in, int version) throws IOException
+        {
+            Clustering<?> clustering;
+            if (in.readBoolean())
+            {
+                clustering = Clustering.STATIC_CLUSTERING;
+            }
+            else
+            {
+                int numComponents = in.readUnsignedVIntChecked();
+                byte[][] components = new byte[numComponents][];
+                for (int ci = 0; ci < numComponents; ci++)
+                {
+                    int componentLength = in.readUnsignedVIntChecked();
+                    components[ci] = new byte[componentLength];
+                    in.readFully(components[ci]);
+                }
+                clustering = new ArrayClustering(components);
+            }
+            return clustering;
+        }
+
+        @Override
+        public long serializedSize(Clustering<?> clustering, int version)
+        {
+            return computeSerializedSize(clustering);
+        }
+
+        private <V> long computeSerializedSize(Clustering<V> clustering)
+        {
+            int size = sizeof(true) + sizeofUnsignedVInt(clustering.size());
+            ValueAccessor<V> accessor = clustering.accessor();
+            for (int i = 0; i < clustering.size(); i++)
+            {
+                int valueSize = accessor.size(clustering.get(i));
+                size += valueSize;
+                size += sizeofUnsignedVInt(valueSize);
+            }
+            return size;
+        }
+    };
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index e793b14eec..5568b9b01e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class BeginInvalidationSerializers
 {
@@ -92,12 +92,12 @@ public class BeginInvalidationSerializers
         @Override
         public long serializedSize(InvalidateReply reply, int version)
         {
-            return serializedSizeNullable(reply.supersededBy, version, CommandSerializers.ballot)
+            return serializedNullableSize(reply.supersededBy, version, CommandSerializers.ballot)
                     + CommandSerializers.ballot.serializedSize(reply.accepted, version)
                     + CommandSerializers.status.serializedSize(reply.status, version)
                     + TypeSizes.BOOL_SIZE
-                    + serializedSizeNullable(reply.route, version, KeySerializers.route)
-                    + serializedSizeNullable(reply.homeKey, version, KeySerializers.routingKey);
+                    + serializedNullableSize(reply.route, version, KeySerializers.route)
+                    + serializedNullableSize(reply.homeKey, version, KeySerializers.routingKey);
         }
     };
 }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index 0741c1e662..ce8ef96b48 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -47,7 +47,7 @@ import org.apache.cassandra.service.accord.txn.TxnData;
 import static accord.messages.CheckStatus.SerializationSupport.createOk;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class CheckStatusSerializers
 {
@@ -166,20 +166,20 @@ public class CheckStatusSerializers
             size += CommandSerializers.saveStatus.serializedSize(ok.saveStatus, version);
             size += CommandSerializers.ballot.serializedSize(ok.promised, version);
             size += CommandSerializers.ballot.serializedSize(ok.accepted, version);
-            size += serializedSizeNullable(ok.executeAt, version, CommandSerializers.timestamp);
+            size += serializedNullableSize(ok.executeAt, version, CommandSerializers.timestamp);
             size += TypeSizes.BOOL_SIZE;
             size += CommandSerializers.durability.serializedSize(ok.durability, version);
-            size += serializedSizeNullable(ok.homeKey, version, KeySerializers.routingKey);
-            size += serializedSizeNullable(ok.route, version, KeySerializers.route);
+            size += serializedNullableSize(ok.homeKey, version, KeySerializers.routingKey);
+            size += serializedNullableSize(ok.route, version, KeySerializers.route);
 
             if (!(reply instanceof CheckStatusOkFull))
                 return size;
 
             CheckStatusOkFull okFull = (CheckStatusOkFull) ok;
-            size += serializedSizeNullable(okFull.partialTxn, version, CommandSerializers.partialTxn);
-            size += serializedSizeNullable(okFull.committedDeps, version, DepsSerializer.partialDeps);
-            size += serializedSizeNullable(okFull.writes, version, CommandSerializers.writes);
-            size += serializedSizeNullable((TxnData) okFull.result, version, TxnData.serializer);
+            size += serializedNullableSize(okFull.partialTxn, version, CommandSerializers.partialTxn);
+            size += serializedNullableSize(okFull.committedDeps, version, DepsSerializer.partialDeps);
+            size += serializedNullableSize(okFull.writes, version, CommandSerializers.writes);
+            size += serializedNullableSize((TxnData) okFull.result, version, TxnData.serializer);
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
index b49b898804..ed251016dd 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class CommitSerializers
 {
@@ -64,10 +64,10 @@ public class CommitSerializers
         public long serializedBodySize(Commit msg, int version)
         {
             return CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
-                   + serializedSizeNullable(msg.partialTxn, version, CommandSerializers.partialTxn)
+                   + serializedNullableSize(msg.partialTxn, version, CommandSerializers.partialTxn)
                    + DepsSerializer.partialDeps.serializedSize(msg.partialDeps, version)
-                   + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
-                   + serializedSizeNullable(msg.read, version, ReadDataSerializers.request);
+                   + serializedNullableSize(msg.route, version, KeySerializers.fullRoute)
+                   + serializedNullableSize(msg.read, version, ReadDataSerializers.request);
         }
     };
 
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
index 251b281621..71d3d69ba1 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
@@ -18,10 +18,6 @@
 
 package org.apache.cassandra.service.accord.serializers;
 
-import java.io.IOException;
-
-import javax.annotation.Nullable;
-
 import accord.messages.PreAccept;
 import accord.messages.PreAccept.PreAcceptOk;
 import accord.messages.PreAccept.PreAcceptReply;
@@ -35,9 +31,10 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.serializers.TxnRequestSerializer.WithUnsyncedSerializer;
 
-import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static org.apache.cassandra.utils.NullableSerializer.*;
 
 public class PreacceptSerializers
 {
@@ -67,7 +64,7 @@ public class PreacceptSerializers
         public long serializedBodySize(PreAccept msg, int version)
         {
             return CommandSerializers.partialTxn.serializedSize(msg.partialTxn, version)
-                   + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
+                   + serializedNullableSize(msg.route, version, KeySerializers.fullRoute)
                    + TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - msg.minEpoch);
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index e9907911a9..fbd00f637e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
-
 import javax.annotation.Nullable;
 
 import accord.api.Result;
@@ -45,7 +44,7 @@ import org.apache.cassandra.service.accord.txn.TxnData;
 
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class RecoverySerializers
 {
@@ -73,7 +72,7 @@ public class RecoverySerializers
         {
             return CommandSerializers.partialTxn.serializedSize(recover.partialTxn, version)
                    + CommandSerializers.ballot.serializedSize(recover.ballot, version)
-                   + serializedSizeNullable(recover.route, version, KeySerializers.fullRoute);
+                   + serializedNullableSize(recover.route, version, KeySerializers.fullRoute);
         }
     };
 
@@ -149,13 +148,13 @@ public class RecoverySerializers
             long size = CommandSerializers.txnId.serializedSize(recoverOk.txnId, version);
             size += CommandSerializers.status.serializedSize(recoverOk.status, version);
             size += CommandSerializers.ballot.serializedSize(recoverOk.accepted, version);
-            size += serializedSizeNullable(recoverOk.executeAt, version, CommandSerializers.timestamp);
+            size += serializedNullableSize(recoverOk.executeAt, version, CommandSerializers.timestamp);
             size += DepsSerializer.partialDeps.serializedSize(recoverOk.deps, version);
             size += DepsSerializer.deps.serializedSize(recoverOk.earlierCommittedWitness, version);
             size += DepsSerializer.deps.serializedSize(recoverOk.earlierAcceptedNoWitness, version);
             size += TypeSizes.sizeof(recoverOk.rejectsFastPath);
-            size += serializedSizeNullable(recoverOk.writes, version, CommandSerializers.writes);
-            size += serializedSizeNullable((TxnData) recoverOk.result, version, TxnData.serializer);
+            size += serializedNullableSize(recoverOk.writes, version, CommandSerializers.writes);
+            size += serializedNullableSize((TxnData) recoverOk.result, version, TxnData.serializer);
             return size;
         }
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
index 120a04fbbe..244efe167e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
@@ -32,6 +35,8 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
+import org.apache.cassandra.cql3.conditions.ColumnCondition.Bound;
+import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -47,11 +52,17 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.cassandra.service.accord.AccordSerializers.clusteringPrefixSerializer;
 import static org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.service.accord.txn.TxnRead.DUMMY;
 import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
 import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
 import static org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
 
+
 public abstract class TxnCondition
 {
     private interface ConditionSerializer<T extends TxnCondition>
@@ -73,9 +84,12 @@ public abstract class TxnCondition
         GREATER_THAN(">", Operator.GT),
         GREATER_THAN_OR_EQUAL(">=", Operator.GTE),
         LESS_THAN("<", Operator.LT),
-        LESS_THAN_OR_EQUAL("<=", Operator.LTE);
+        LESS_THAN_OR_EQUAL("<=", Operator.LTE),
+        COLUMN_CONDITIONS("COLUMN_CONDITIONS", null);
 
+        @Nonnull
         private final String symbol;
+        @Nullable
         private final Operator operator;
 
         Kind(String symbol, Operator operator)
@@ -104,6 +118,8 @@ public abstract class TxnCondition
                     return BooleanGroup.serializer;
                 case NONE:
                     return None.serializer;
+                case COLUMN_CONDITIONS:
+                    return ColumnConditionsAdapter.serializer;
                 default:
                     throw new IllegalArgumentException();
             }
@@ -300,6 +316,62 @@ public abstract class TxnCondition
         };
     }
 
+    public static class ColumnConditionsAdapter extends TxnCondition {
+        @Nonnull
+        public final Collection<Bound> bounds;
+
+        @Nonnull
+        public final Clustering<?> clustering;
+
+        public ColumnConditionsAdapter(Clustering<?> clustering, Collection<Bound> bounds)
+        {
+            super(Kind.COLUMN_CONDITIONS);
+            checkNotNull(bounds);
+            checkNotNull(clustering);
+            this.bounds = bounds;
+            this.clustering = clustering;
+        }
+
+        @Override
+        public boolean applies(@Nonnull TxnData data)
+        {
+            checkNotNull(data);
+            FilteredPartition partition = data.get(DUMMY);
+            Row row = partition.getRow(clustering);
+            for (Bound bound : bounds)
+            {
+                if (!bound.appliesTo(row))
+                    return false;
+            }
+            return true;
+        }
+
+        private static final ConditionSerializer<ColumnConditionsAdapter> serializer = new ConditionSerializer<ColumnConditionsAdapter>()
+        {
+            @Override
+            public void serialize(ColumnConditionsAdapter condition, DataOutputPlus out, int version) throws IOException
+            {
+                clusteringPrefixSerializer.serialize(condition.clustering, out, version);
+                serializeCollection(condition.bounds, out, version, Bound.serializer);
+            }
+
+            @Override
+            public ColumnConditionsAdapter deserialize(DataInputPlus in, int version, Kind ignored) throws IOException
+            {
+                Clustering<?> clustering = clusteringPrefixSerializer.deserialize(in, version);
+                List<Bound> bounds = deserializeList(in, version, Bound.serializer);
+                return new ColumnConditionsAdapter(clustering, bounds);
+            }
+
+            @Override
+            public long serializedSize(ColumnConditionsAdapter condition, int version)
+            {
+                return clusteringPrefixSerializer.serializedSize(condition.clustering, version)
+                    + serializedCollectionSize(condition.bounds, version, Bound.serializer);
+            }
+        };
+    }
+
     public static class Value extends TxnCondition
     {
         private static final Set<Kind> KINDS = ImmutableSet.of(Kind.EQUAL, Kind.NOT_EQUAL,
@@ -503,7 +575,6 @@ public abstract class TxnCondition
 
     public static final IVersionedSerializer<TxnCondition> serializer = new IVersionedSerializer<TxnCondition>()
     {
-        @SuppressWarnings("unchecked")
         @Override
         public void serialize(TxnCondition condition, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 50bdfbbada..fee1361a85 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -75,6 +75,7 @@ public class TxnData implements Data, Result, Iterable<FilteredPartition>
         return data.entrySet();
     }
 
+    // This is inadequate
     @Override
     public Data merge(Data data)
     {
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index e4526beabd..57b33eed61 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -24,7 +24,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -37,6 +40,13 @@ import org.apache.cassandra.utils.ObjectSizes;
 import static com.google.common.primitives.Ints.checkedCast;
 import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.service.accord.AccordSerializers.clusteringPrefixSerializer;
+import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
+
 public class TxnDataName implements Comparable<TxnDataName>
 {
     private static final TxnDataName RETURNING = new TxnDataName(Kind.RETURNING);
@@ -71,13 +81,27 @@ public class TxnDataName implements Comparable<TxnDataName>
         }
     }
 
+    @Nonnull
     private final Kind kind;
+
+    @Nonnull
     private final String[] parts;
 
-    public TxnDataName(Kind kind, String... parts)
+    @Nullable
+    private final Clustering<?> clustering;
+
+    public TxnDataName(@Nonnull Kind kind, @Nonnull String... parts)
+    {
+        this(kind, null, parts);
+    }
+
+    public TxnDataName(@Nonnull Kind kind, @Nullable Clustering<?> clustering, @Nonnull String... parts)
     {
+        checkNotNull(kind);
+        checkNotNull(parts);
         this.kind = kind;
         this.parts = parts;
+        this.clustering = clustering;
     }
 
     public static TxnDataName user(String name)
@@ -204,17 +228,19 @@ public class TxnDataName implements Comparable<TxnDataName>
             out.writeUnsignedVInt(t.parts.length);
             for (String part : t.parts)
                 out.writeUTF(part);
+            serializeNullable(t.clustering, out, version, clusteringPrefixSerializer);
         }
 
         @Override
         public TxnDataName deserialize(DataInputPlus in, int version) throws IOException
         {
             Kind kind = Kind.from(in.readByte());
-            int length = checkedCast(in.readUnsignedVInt());
+            int length = in.readUnsignedVIntChecked();
             String[] parts = new String[length];
             for (int i = 0; i < length; i++)
                 parts[i] = in.readUTF();
-            return new TxnDataName(kind, parts);
+            Clustering<?> clustering = deserializeNullable(in, version, clusteringPrefixSerializer);
+            return new TxnDataName(kind, clustering, parts);
         }
 
         @Override
@@ -223,6 +249,7 @@ public class TxnDataName implements Comparable<TxnDataName>
             int size = Byte.BYTES + sizeofUnsignedVInt(t.parts.length);
             for (String part : t.parts)
                 size += TypeSizes.sizeof(part);
+            size += serializedNullableSize(t.clustering, version, clusteringPrefixSerializer);
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 667df37d8c..93100eba3e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -100,7 +100,7 @@ public class TxnNamedRead extends AbstractSerialized<SinglePartitionReadCommand>
         return "TxnNamedRead{name='" + name + '\'' + ", key=" + key + ", update=" + get() + '}';
     }
 
-    public TxnDataName name()
+    public TxnDataName txnDataName()
     {
         return name;
     }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
index d00da39735..3ceb6e79af 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
-
 import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
@@ -40,6 +39,12 @@ public abstract class TxnQuery implements Query
 {
     public static final TxnQuery ALL = new TxnQuery()
     {
+        @Override
+        protected byte type()
+        {
+            return 1;
+        }
+
         @Override
         public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
         {
@@ -49,6 +54,12 @@ public abstract class TxnQuery implements Query
 
     public static final TxnQuery NONE = new TxnQuery()
     {
+        @Override
+        protected byte type()
+        {
+            return 2;
+        }
+
         @Override
         public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
         {
@@ -56,10 +67,34 @@ public abstract class TxnQuery implements Query
         }
     };
 
+    public static final TxnQuery CONDITION = new TxnQuery()
+    {
+        @Override
+        protected byte type()
+        {
+            return 3;
+        }
+
+        @Override
+        public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
+        {
+            TxnUpdate txnUpdate = (TxnUpdate)update;
+            boolean conditionCheck = txnUpdate.checkCondition(data);
+            // If the condition applied an empty result indicates success
+            if (conditionCheck)
+                return new TxnData();
+            else
+                // If it failed to apply the partition contents (if present) are returned and it indicates failure
+                return (TxnData)data;
+        }
+    };
+
     private static final long SIZE = ObjectSizes.measure(ALL);
 
     private TxnQuery() {}
 
+    abstract protected byte type();
+
     public long estimatedSizeOnHeap()
     {
         return SIZE;
@@ -70,8 +105,8 @@ public abstract class TxnQuery implements Query
         @Override
         public void serialize(TxnQuery query, DataOutputPlus out, int version) throws IOException
         {
-            Preconditions.checkArgument(query == null || query == ALL || query == NONE);
-            out.writeByte(query == null ? 0 : query == ALL ? 1 : 2);
+            Preconditions.checkArgument(query == null || query == ALL || query == NONE || query == CONDITION);
+            out.writeByte(query == null ? 0 : query.type());
         }
 
         @Override
@@ -83,13 +118,14 @@ public abstract class TxnQuery implements Query
                 case 0: return null;
                 case 1: return ALL;
                 case 2: return NONE;
+                case 3: return CONDITION;
             }
         }
 
         @Override
         public long serializedSize(TxnQuery query, int version)
         {
-            Preconditions.checkArgument(query == null || query == ALL || query == NONE);
+            Preconditions.checkArgument(query == null || query == ALL || query == NONE || query == CONDITION);
             return TypeSizes.sizeof((byte)2);
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 2aa91be7e6..5811c8787d 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -24,6 +24,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.BiConsumer;
 
+import com.google.common.collect.ImmutableList;
+
 import accord.api.Data;
 import accord.api.DataStore;
 import accord.api.Key;
@@ -33,6 +35,7 @@ import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -49,6 +52,9 @@ import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
 
 public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
 {
+    // There is only potentially one partition in a CAS and SERIAL/LOCAL_SERIAL read
+    public static final String DUMMY_NAME = "";
+    public static final TxnDataName DUMMY = TxnDataName.user(DUMMY_NAME);
     private static final long EMPTY_SIZE = ObjectSizes.measure(new TxnRead(new TxnNamedRead[0], null));
 
     private final Keys txnKeys;
@@ -65,6 +71,12 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
         this.txnKeys = txnKeys;
     }
 
+    public static TxnRead createRead(SinglePartitionReadCommand readCommand)
+    {
+        TxnNamedRead read = new TxnNamedRead(DUMMY, readCommand);
+        return new TxnRead(ImmutableList.of(read), Keys.of(read.key()));
+    }
+
     public long estimatedSizeOnHeap()
     {
         long size = EMPTY_SIZE;
@@ -76,7 +88,7 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
     @Override
     int compareNonKeyFields(TxnNamedRead left, TxnNamedRead right)
     {
-        return left.name().compareTo(right.name());
+        return left.txnDataName().compareTo(right.txnDataName());
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index 7b26b2acdd..723dbb30df 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -47,14 +47,13 @@ import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
-import static java.lang.Math.toIntExact;
 import static org.apache.cassandra.service.accord.AccordSerializers.serialize;
 import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
 import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
 import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
 import static org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
 import static org.apache.cassandra.utils.ByteBufferUtil.writeWithVIntLength;
-import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
 
 public class TxnUpdate implements Update
 {
@@ -64,6 +63,9 @@ public class TxnUpdate implements Update
     private final ByteBuffer[] fragments;
     private final ByteBuffer condition;
 
+    // Memoize computation of condition
+    private Boolean conditionResult;
+
     public TxnUpdate(List<TxnWrite.Fragment> fragments, TxnCondition condition)
     {
         // TODO: Figure out a way to shove keys into TxnCondition, and have it implement slice/merge.
@@ -169,9 +171,7 @@ public class TxnUpdate implements Update
     @Override
     public Write apply(Data data)
     {
-        TxnCondition condition = AccordSerializers.deserialize(this.condition, TxnCondition.serializer);
-
-        if (!condition.applies((TxnData) data))
+        if (!checkCondition(data))
             return TxnWrite.EMPTY;
 
         List<TxnWrite.Fragment> fragments = deserialize(this.fragments, TxnWrite.Fragment.serializer);
@@ -185,6 +185,7 @@ public class TxnUpdate implements Update
         return new TxnWrite(updates);
     }
 
+    // Should we serialize the conditionResult?
     public static final IVersionedSerializer<TxnUpdate> serializer = new IVersionedSerializer<TxnUpdate>()
     {
         @Override
@@ -210,6 +211,7 @@ public class TxnUpdate implements Update
             long size = KeySerializers.keys.serializedSize(update.keys, version);
             size += serializedSizeWithVIntLength(update.condition);
             size += serializedArraySize(update.fragments, version, ByteBufferUtil.vintSerializer);
+            assert(ByteBufferUtil.serialized(this, update, version).remaining() == size);
             return size;
         }
     };
@@ -288,4 +290,15 @@ public class TxnUpdate implements Update
             result.addAll(deserialize(bytes, serializer));
         return result;
     }
+
+    // maybeCheckCondition? checkConditionMemoized?
+    public boolean checkCondition(Data data)
+    {
+        // Assert data that was memoized is same as data that is provided?
+        if (conditionResult != null)
+            return conditionResult;
+        TxnCondition condition = AccordSerializers.deserialize(this.condition, TxnCondition.serializer);
+        conditionResult = condition.applies((TxnData) data);
+        return conditionResult;
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
index d25ca2fdd5..dcbd8be587 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.RequestCallbackWithFailure;
 import org.apache.cassandra.service.paxos.Commit.Agreed;
 import org.apache.cassandra.service.paxos.Commit.Committed;
@@ -47,7 +46,7 @@ import static org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExec
 import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 /**
  * Nodes that have promised in response to our prepare, may be missing the latestCommit, meaning we cannot be sure the
@@ -238,7 +237,7 @@ public class PaxosPrepareRefresh implements RequestCallbackWithFailure<PaxosPrep
 
         public long serializedSize(Response response, int version)
         {
-            return serializedSizeNullable(response.isSupersededBy, version, Ballot.Serializer.instance);
+            return serializedNullableSize(response.isSupersededBy, version, Ballot.Serializer.instance);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
index ab757cbdc5..94fa9dc3a4 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
@@ -20,7 +20,12 @@ package org.apache.cassandra.service.paxos;
 
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
@@ -63,20 +68,30 @@ import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MonotonicClock;
 
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS;
 import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
 import static org.apache.cassandra.net.Verb.PAXOS2_REPAIR_REQ;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.service.paxos.Commit.*;
+import static org.apache.cassandra.service.paxos.Commit.Accepted;
+import static org.apache.cassandra.service.paxos.Commit.Committed;
+import static org.apache.cassandra.service.paxos.Commit.Proposal;
+import static org.apache.cassandra.service.paxos.Commit.isAfter;
+import static org.apache.cassandra.service.paxos.Commit.latest;
+import static org.apache.cassandra.service.paxos.Commit.timestampsClash;
 import static org.apache.cassandra.service.paxos.ContentionStrategy.Type.REPAIR;
 import static org.apache.cassandra.service.paxos.ContentionStrategy.waitUntilForContention;
-import static org.apache.cassandra.service.paxos.Paxos.*;
-import static org.apache.cassandra.service.paxos.PaxosPrepare.*;
+import static org.apache.cassandra.service.paxos.Paxos.Participants;
+import static org.apache.cassandra.service.paxos.Paxos.isInRangeAndShouldProcess;
+import static org.apache.cassandra.service.paxos.Paxos.staleBallotNewerThan;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteAccepted;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteCommitted;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.Status;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.prepareWithBallot;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 /**
  * Facility to finish any in-progress paxos transaction, and ensure that a quorum of nodes agree on the most recent operation.
@@ -629,7 +644,7 @@ public class PaxosRepair extends AbstractPaxosRepair
         public long serializedSize(Response response, int version)
         {
             return Ballot.sizeInBytes()
-                   + serializedSizeNullable(response.acceptedButNotCommitted, version, Accepted.serializer)
+                   + serializedNullableSize(response.acceptedButNotCommitted, version, Accepted.serializer)
                    + Committed.serializer.serializedSize(response.committed, version);
         }
     }
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index e802cd070f..0afb7cae01 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -723,6 +723,7 @@ public class PaxosState implements PaxosOperationLock
         {
             synchronized (unsafeState.key)
             {
+                // Unused return value?
                 unsafeState.maybeLoad();
                 assert unsafeState.current != null;
 
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 062d65f6e1..6a05b0ef7f 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -23,37 +23,28 @@ package org.apache.cassandra.utils;
  * afterward, and ensure the tests still pass.
  */
 
-import java.io.*;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import net.nicoulaj.compilecommand.annotations.Inline;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.BooleanType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.DateType;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.TimestampType;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
+import static com.google.common.primitives.Ints.checkedCast;
 
 /**
  * Utility methods to make ByteBuffers less painful
@@ -359,6 +350,17 @@ public class ByteBufferUtil
         out.writeUnsignedVInt(bytes.remaining());
         out.write(bytes);
     }
+    public static void writeWithVIntLengthAndNull(ByteBuffer bytes, DataOutputPlus out) throws IOException
+    {
+        if (bytes == null)
+        {
+            out.writeVInt(-1);
+            return;
+        }
+        
+        out.writeVInt(bytes.remaining());
+        out.write(bytes);
+    }
 
     public static void writeWithShortLength(ByteBuffer buffer, DataOutputPlus out) throws IOException
     {
@@ -389,6 +391,17 @@ public class ByteBufferUtil
         return ByteBufferUtil.read(in, length);
     }
 
+    public static ByteBuffer readWithVIntLengthAndNull(DataInputPlus in) throws IOException
+    {
+        int length = checkedCast(in.readVInt());
+        if (length < -1)
+            throw new IOException("Corrupt (negative) value length encountered");
+        if (length == -1)
+            return null;
+
+        return ByteBufferUtil.read(in, length);
+    }
+    
     public static int serializedSizeWithLength(ByteBuffer buffer)
     {
         int size = buffer.remaining();
@@ -401,6 +414,15 @@ public class ByteBufferUtil
         return TypeSizes.sizeofUnsignedVInt(size) + size;
     }
 
+    public static int serializedSizeWithVIntLengthAndNull(ByteBuffer buffer)
+    {
+        if (buffer == null)
+            return TypeSizes.sizeofVInt(-1);
+
+        int size = buffer.remaining();
+        return TypeSizes.sizeofUnsignedVInt(size) + size;
+    }
+
     public static long estimatedSizeOnHeap(ByteBuffer buffer)
     {
         return EMPTY_SIZE_ON_HEAP + buffer.remaining();
@@ -927,6 +949,19 @@ public class ByteBufferUtil
         return true;
     }
 
+    public static <T> ByteBuffer serialized(IVersionedSerializer<T> serializer, T value, int version)
+    {
+        try (DataOutputBuffer dob = new DataOutputBuffer())
+        {
+            serializer.serialize(value, dob, version);
+            return dob.buffer();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static final IVersionedSerializer<ByteBuffer> vintSerializer = new IVersionedSerializer<ByteBuffer>()
     {
         @Override
@@ -947,4 +982,6 @@ public class ByteBufferUtil
             return serializedSizeWithVIntLength(bytes);
         }
     };
+
+    public static final IVersionedSerializer<ByteBuffer> vintNullableSerializer = NullableSerializer.wrap(vintSerializer);
 }
diff --git a/src/java/org/apache/cassandra/utils/NullableSerializer.java b/src/java/org/apache/cassandra/utils/NullableSerializer.java
index a286fe5765..7d834be995 100644
--- a/src/java/org/apache/cassandra/utils/NullableSerializer.java
+++ b/src/java/org/apache/cassandra/utils/NullableSerializer.java
@@ -39,7 +39,7 @@ public class NullableSerializer
         return in.readBoolean() ? serializer.deserialize(in, version) : null;
     }
 
-    public static <T> long serializedSizeNullable(T value, int version, IVersionedSerializer<T> serializer)
+    public static <T> long serializedNullableSize(T value, int version, IVersionedSerializer<T> serializer)
     {
         return value != null
                 ? TypeSizes.sizeof(true) + serializer.serializedSize(value, version)
@@ -61,7 +61,7 @@ public class NullableSerializer
 
             public long serializedSize(T t, int version)
             {
-                return serializedSizeNullable(t, version, wrap);
+                return serializedNullableSize(t, version, wrap);
             }
         };
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
index b49572dc4b..e6d7ff8496 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
@@ -119,5 +119,4 @@ public class ByteBuddyExamples extends TestBaseImpl
             return r.call();
         }
     }
-
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 1c981693e0..383800aa4c 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -18,38 +18,33 @@
 
 package org.apache.cassandra.distributed.test.accord;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.functions.types.utils.Bytes;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.service.accord.AccordService;
 import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.SimpleQueryResult;
-import org.apache.cassandra.service.accord.AccordService;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
-
 public class AccordCQLTest extends AccordTestBase
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordCQLTest.class);
@@ -197,7 +192,7 @@ public class AccordCQLTest extends AccordTestBase
              cluster ->
              {
                  cluster.coordinator(1).execute("INSERT INTO " + currentTable + " (k, c, v) VALUES (0, 0, " + lhs + ");", ConsistencyLevel.ALL);
-             
+
                  String query = "BEGIN TRANSACTION\n" +
                                 "  LET row1 = (SELECT v FROM " + currentTable + " WHERE k = ? LIMIT 1);\n" +
                                 "  SELECT row1.v;\n" +
@@ -206,7 +201,7 @@ public class AccordCQLTest extends AccordTestBase
                                 "  END IF\n" +
                                 "COMMIT TRANSACTION";
                  assertRowEqualsWithPreemptedRetry(cluster, new Object[] { lhs }, query, 0, rhs, 1, 0, 1);
-             
+
                  String check = "BEGIN TRANSACTION\n" +
                                 "  SELECT * FROM " + currentTable + " WHERE k = ? AND c = ?;\n" +
                                 "COMMIT TRANSACTION";
@@ -1990,4 +1985,30 @@ public class AccordCQLTest extends AccordTestBase
              }
         );
     }
+
+    @Test
+    public void testCASAndSerialRead() throws Exception
+    {
+        test("CREATE TABLE " + currentTable + " (id int, c int, v int, s int static, PRIMARY KEY ((id), c));",
+            cluster -> {
+                ICoordinator coordinator = cluster.coordinator(1);
+                int startingAccordCoordinateCount = getAccordCoordinateCount();
+                coordinator.execute("INSERT INTO " + currentTable + " (id, c, v, s) VALUES (1, 2, 3, 5);", ConsistencyLevel.ALL);
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 3, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET v = 4 WHERE id = 1 AND c = 2 IF v = 3");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ false, 4 }, "UPDATE " + currentTable + " SET v = 4 WHERE id = 1 AND c = 2 IF v = 3");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+
+                // Test working with a static column
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ false, 5 }, "UPDATE " + currentTable + " SET v = 5 WHERE id = 1 AND c = 2 IF s = 4");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET v = 5 WHERE id = 1 AND c = 2 IF s = 5");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 5, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET s = 6 WHERE id = 1 IF s = 5");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 5, 6);
+                // Make sure all the consensus using queries actually were run on Accord
+                assertEquals( 11, getAccordCoordinateCount() - startingAccordCoordinateCount);
+        });
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 60b583b40a..d1d16b66fa 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -18,23 +18,13 @@
 
 package org.apache.cassandra.distributed.test.accord;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import accord.coordinate.Preempted;
+import accord.primitives.Txn;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.implementation.bind.annotation.This;
-import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-
-import accord.coordinate.Preempted;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
 import org.apache.cassandra.distributed.Cluster;
@@ -43,10 +33,22 @@ import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.utils.AssertionUtils;
 import org.apache.cassandra.utils.FailingConsumer;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static org.junit.Assert.assertArrayEquals;
 
 public abstract class AccordTestBase extends TestBaseImpl
@@ -76,10 +78,10 @@ public abstract class AccordTestBase extends TestBaseImpl
         currentTable = KEYSPACE + ".tbl" + COUNTER.getAndIncrement();
     }
 
-    protected static void assertRow(Cluster cluster, String query, int k, int c, int v)
+    protected static void assertRowSerial(Cluster cluster, String query, int k, int c, int v, int s)
     {
-        Object[][] result = cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM);
-        assertArrayEquals(new Object[]{new Object[] {k, c, v}}, result);
+        Object[][] result = cluster.coordinator(1).execute(query, ConsistencyLevel.SERIAL);
+        assertArrayEquals(new Object[]{new Object[] {k, c, v, s}}, result);
     }
 
     protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws Exception
@@ -105,14 +107,20 @@ public abstract class AccordTestBase extends TestBaseImpl
         test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary key (k, c))", fn);
     }
 
+    protected int getAccordCoordinateCount()
+    {
+        return sharedCluster.get(1).callOnInstance(() -> BBAccordCoordinateCountHelper.count.get());
+    }
+
     private static Cluster createCluster() throws IOException
     {
         // need to up the timeout else tests get flaky
         // disable vnode for now, but should enable before trunk
         return init(Cluster.build(2)
                            .withoutVNodes()
-                           .withConfig(c -> c.with(Feature.NETWORK).set("write_request_timeout", "10s").set("transaction_timeout", "15s"))
+                           .withConfig(c -> c.with(Feature.NETWORK).set("write_request_timeout", "10s").set("transaction_timeout", "15s").set("legacy_paxos_strategy", "accord"))
                            .withInstanceInitializer(EnforceUpdateDoesNotPerformRead::install)
+                           .withInstanceInitializer(BBAccordCoordinateCountHelper::install)
                            .start());
     }
 
@@ -164,6 +172,27 @@ public abstract class AccordTestBase extends TestBaseImpl
             return map;
         }
     }
-    
+
+    public static class BBAccordCoordinateCountHelper
+    {
+        static AtomicInteger count = new AtomicInteger();
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber != 1)
+                return;
+            new ByteBuddy().rebase(AccordService.class)
+                           .method(named("coordinate").and(takesArguments(1)))
+                           .intercept(MethodDelegation.to(BBAccordCoordinateCountHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static TxnData coordinate(Txn txn, @SuperCall Callable<TxnData> actual) throws Exception
+        {
+            count.incrementAndGet();
+            return actual.call();
+        }
+    }
+
     protected abstract Logger logger();
 }
diff --git a/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java b/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
index dd40cf81db..0dc6383637 100644
--- a/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
@@ -17,162 +17,421 @@
  */
 package org.apache.cassandra.cql3.conditions;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UserTypes;
 import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.serializers.ListSerializer;
+import org.apache.cassandra.serializers.MapSerializer;
+import org.apache.cassandra.serializers.SetSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.TimeUUID;
 
+import static org.apache.cassandra.cql3.Operator.CONTAINS;
+import static org.apache.cassandra.cql3.Operator.CONTAINS_KEY;
+import static org.apache.cassandra.cql3.Operator.EQ;
+import static org.apache.cassandra.cql3.Operator.GT;
+import static org.apache.cassandra.cql3.Operator.GTE;
+import static org.apache.cassandra.cql3.Operator.LT;
+import static org.apache.cassandra.cql3.Operator.LTE;
+import static org.apache.cassandra.cql3.Operator.NEQ;
+import static org.apache.cassandra.transport.ProtocolVersion.CURRENT;
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import static org.apache.cassandra.cql3.Operator.*;
-import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
 
-public class ColumnConditionTest
+public class ColumnConditionTest extends CQLTester
 {
     public static final ByteBuffer ZERO = Int32Type.instance.fromString("0");
     public static final ByteBuffer ONE = Int32Type.instance.fromString("1");
     public static final ByteBuffer TWO = Int32Type.instance.fromString("2");
 
-    private static Row newRow(ColumnMetadata definition, ByteBuffer value)
+    private static final ListType<Integer> listType = ListType.getInstance(Int32Type.instance, true);
+    private static final MapType<Integer, Integer> mapType = MapType.getInstance(Int32Type.instance, Int32Type.instance, true);
+    private static final SetType<Integer> setType = SetType.getInstance(Int32Type.instance, true);
+
+    private Row newRow(ColumnMetadata definition, ByteBuffer value)
     {
-        BufferCell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, null);
+        BufferCell cell;
+        if (definition.type.isUDT() )
+        {
+            if (definition.type.isMultiCell()) {
+                CellPath cellPath = udtType.cellPathForField(udtType.fieldName(0));
+                cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, cellPath);
+            }
+            else
+            {
+                ByteBuffer udtValue = UserType.buildValue(value, TWO);
+                cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, udtValue, null);
+            }
+        }
+        else
+        {
+            cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, null);
+        }
         return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
     }
 
     private static Row newRow(ColumnMetadata definition, List<ByteBuffer> values)
     {
-        Row.Builder builder = BTreeRow.sortedBuilder();
-        builder.newRow(Clustering.EMPTY);
-        long now = System.currentTimeMillis();
-        if (values != null)
+        AbstractType<?> type = definition.type;
+        if (type.isFrozenCollection())
+        {
+            ByteBuffer cellValue = ListSerializer.pack(values, values.size(), CURRENT);
+            Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+            return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+        }
+        else
         {
-            for (int i = 0, m = values.size(); i < m; i++)
+            Row.Builder builder = BTreeRow.sortedBuilder();
+            builder.newRow(Clustering.EMPTY);
+            long now = System.currentTimeMillis();
+            if (values != null)
             {
-                TimeUUID uuid = TimeUUID.Generator.atUnixMillis(now, i);
-                ByteBuffer key = uuid.toBytes();
-                ByteBuffer value = values.get(i);
-                BufferCell cell = new BufferCell(definition,
-                                                 0L,
-                                                 Cell.NO_TTL,
-                                                 Cell.NO_DELETION_TIME,
-                                                 value,
-                                                 CellPath.create(key));
-                builder.addCell(cell);
+                for (int i = 0, m = values.size(); i < m; i++)
+                {
+                    BufferCell cell;
+                    if (type.isUDT())
+                    {
+                        cell = new BufferCell(definition,
+                                              0L,
+                                              Cell.NO_TTL,
+                                              Cell.NO_DELETION_TIME,
+                                              values.get(i),
+                                              ((UserType)type).cellPathForField(((UserType) type).fieldName(i)));
+                    }
+                    else
+                    {
+                        TimeUUID uuid = TimeUUID.Generator.atUnixMillis(now, i);
+                        ByteBuffer key = uuid.toBytes();
+                        ByteBuffer value = values.get(i);
+                        cell = new BufferCell(definition,
+                                              0L,
+                                              Cell.NO_TTL,
+                                              Cell.NO_DELETION_TIME,
+                                              value,
+                                              CellPath.create(key));
+                    }
+                    builder.addCell(cell);
+                }
             }
+            return builder.build();
         }
-        return builder.build();
     }
 
     private static Row newRow(ColumnMetadata definition, SortedSet<ByteBuffer> values)
     {
-        Row.Builder builder = BTreeRow.sortedBuilder();
-        builder.newRow(Clustering.EMPTY);
-        if (values != null)
+        if (definition.type.isFrozenCollection())
         {
-            for (ByteBuffer value : values)
+            ByteBuffer cellValue = SetSerializer.pack(values, values.size(), CURRENT);
+            Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+            return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+        }
+        else
+        {
+            Row.Builder builder = BTreeRow.sortedBuilder();
+            builder.newRow(Clustering.EMPTY);
+            if (values != null)
             {
-                BufferCell cell = new BufferCell(definition,
-                                                 0L,
-                                                 Cell.NO_TTL,
-                                                 Cell.NO_DELETION_TIME,
-                                                 ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                 CellPath.create(value));
-                builder.addCell(cell);
+                for (ByteBuffer value : values)
+                {
+                    BufferCell cell = new BufferCell(definition,
+                                                     0L,
+                                                     Cell.NO_TTL,
+                                                     Cell.NO_DELETION_TIME,
+                                                     ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                     CellPath.create(value));
+                    builder.addCell(cell);
+                }
             }
+            return builder.build();
         }
-        return builder.build();
     }
 
-    private static Row newRow(ColumnMetadata definition, Map<ByteBuffer, ByteBuffer> values)
+    private static Row newRow(ColumnMetadata definition, SortedMap<ByteBuffer, ByteBuffer> values)
     {
-        Row.Builder builder = BTreeRow.sortedBuilder();
-        builder.newRow(Clustering.EMPTY);
-        if (values != null)
+        if (definition.type.isFrozenCollection())
+        {
+            List<ByteBuffer> packableValues = values.entrySet().stream().flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())).collect(Collectors.toList());
+            ByteBuffer cellValue = MapSerializer.pack(packableValues, values.size(), CURRENT);
+            Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+            return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+        }
+        else
         {
-            for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+            Row.Builder builder = BTreeRow.sortedBuilder();
+            builder.newRow(Clustering.EMPTY);
+            if (values != null)
             {
-                BufferCell cell = new BufferCell(definition,
-                                                 0L,
-                                                 Cell.NO_TTL,
-                                                 Cell.NO_DELETION_TIME,
-                                                 entry.getValue(),
-                                                 CellPath.create(entry.getKey()));
-                builder.addCell(cell);
+                for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+                {
+                    BufferCell cell = new BufferCell(definition,
+                                                     0L,
+                                                     Cell.NO_TTL,
+                                                     Cell.NO_DELETION_TIME,
+                                                     entry.getValue(),
+                                                     CellPath.create(entry.getKey()));
+                    builder.addCell(cell);
+                }
             }
+            return builder.build();
+        }
+    }
+
+    private static boolean testRoundtripped(ColumnCondition.Bound bound, Row row)
+    {
+        DataOutputBuffer dab = new DataOutputBuffer();
+        IVersionedSerializer<ColumnCondition.Bound> serializer = ColumnCondition.Bound.serializer;
+        int version = MessagingService.current_version;
+        try
+        {
+            serializer.serialize(bound, dab, version);
+            assertEquals(serializer.serializedSize(bound, version), dab.position());
+            ColumnCondition.Bound deserializedBound = serializer.deserialize(new DataInputBuffer(dab.buffer(), false), version);
+            boolean originalResult = bound.appliesTo(row);
+            assertEquals(originalResult, deserializedBound.appliesTo(row));
+            return originalResult;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
-        return builder.build();
     }
 
-    private static boolean conditionApplies(ByteBuffer rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionApplies(ByteBuffer rowValue, Operator op, ByteBuffer conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", Int32Type.instance);
+        AbstractType<?> columnType = Int32Type.instance;
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, false), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        boolean regularColumnResult = testRoundtripped(bound, newRow(definition, rowValue));
+        // Every simple bound test is also a valid test of the UDT access path
+        boolean conditionUDTApplies = conditionUDTApplies(rowValue, op, conditionValue);
+        assertEquals(regularColumnResult, conditionUDTApplies);
+        return regularColumnResult;
     }
 
-    private static boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
+    private boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
+        boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, listType);
+        boolean frozenResult = conditionApplies(rowValue, op, conditionValue, listType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue, ListType<Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Lists.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    {
+        boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, listType);
+        boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, listType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return frozenResult;
     }
 
-    private static boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, ListType<Integer> columnType)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
     }
 
-    private static boolean conditionContainsApplies(Map<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionContainsApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
+        boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, mapType);
+        boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, mapType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return frozenResult;
+    }
+
+    private boolean conditionContainsApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, MapType<Integer, Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue)
+    {
+        boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, setType);
+        boolean frozenResult = conditionApplies(rowValue, op, conditionValue, setType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
     }
 
-    private static boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue)
+    private boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue, SetType<Integer> columnType)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", SetType.getInstance(Int32Type.instance, true));
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Sets.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
     }
 
-    private static boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", SetType.getInstance(Int32Type.instance, true));
+        boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, setType);
+        boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, setType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return frozenResult;
+    }
+
+    private boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, SetType<Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue)
+    {
+        boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, mapType);
+        boolean frozenResult = conditionApplies(rowValue, op, conditionValue, mapType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
     }
 
-    private static boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue)
+    private boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue, MapType<Integer, Integer> columnType)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Maps.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionElementApplies(List<ByteBuffer> values, ByteBuffer elementIndex, Operator op, ByteBuffer elementValue)
+    {
+        boolean nonFrozenResult = conditionElementApplies(values, elementIndex, op, elementValue, listType);
+        boolean frozenResult = conditionElementApplies(values, elementIndex, op, elementValue, listType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionElementApplies(List<ByteBuffer> values, ByteBuffer elementIndex, Operator op, ByteBuffer elementValue, ListType<Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
+        ColumnCondition condition = ColumnCondition.condition(definition, new Constants.Value(elementIndex), op, Terms.of(new Constants.Value(elementValue)));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, values));
+    }
+
+    private boolean conditionElementApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, ByteBuffer elementKey, Operator op, ByteBuffer elementValue)
+    {
+        boolean nonFrozenResult = conditionElementApplies(rowValue, elementKey, op, elementValue, mapType);
+        boolean frozenResult = conditionElementApplies(rowValue, elementKey, op, elementValue, mapType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionElementApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, ByteBuffer elementKey, Operator op, ByteBuffer elementValue, MapType<Integer, Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
+        ColumnCondition condition = ColumnCondition.condition(definition, new Constants.Value(elementKey), op, Terms.of(new Constants.Value(elementValue)));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionUDTApplies(ByteBuffer fieldValue, Operator op, ByteBuffer elementValue)
+    {
+        boolean nonFrozenResult = conditionUDTApplies(fieldValue, op, elementValue, udtType);
+        boolean frozenResult = conditionUDTApplies(fieldValue, op, elementValue, udtType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionUDTApplies(ByteBuffer fieldValue, Operator op, ByteBuffer elementValue, UserType type)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(type, !type.isMultiCell()), "c", type);
+        ColumnCondition condition = ColumnCondition.condition(definition, type.fieldName(0), op, Terms.of(new Constants.Value(elementValue)));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, fieldValue));
+    }
+
+    private boolean conditionUDTApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(udtType, false), "c", udtType);
+        Term term = conditionValue == null ? Constants.NULL_VALUE : new UserTypes.Value(udtType, conditionValue.toArray(new ByteBuffer[0]));
+        ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(term));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    public void beforeTest() throws Throwable
+    {
+        super.beforeTest();
+        String typeName = createType("CREATE TYPE %s (a int, b int);");
+        udtType = Schema.instance.getKeyspaceMetadata(KEYSPACE).types.get(ByteBufferUtil.bytes(typeName)).get();
+    }
+    @Override
+    public void afterTest() throws Throwable
+    {
+        super.afterTest();
+        typeToTable.clear();
+        udtType = null;
+    }
+
+    // Is this useful enough to have in CQL tester?
+    private UserType udtType;
+    private final Map<Pair<AbstractType<?>, Boolean>, String> typeToTable = new HashMap<>();
+
+    private String maybeCreateTable(AbstractType<?> columnType, boolean frozen)
+    {
+        String columnTypeCQL = columnType.asCQL3Type().toString();
+        String maybeFrozenColumnTypeCQL = frozen ? columnTypeCQL : "frozen<%s>".format(columnTypeCQL);
+        return typeToTable.computeIfAbsent(Pair.create(columnType, frozen), type -> createTable("CREATE TABLE %s (id int primary key, c " + maybeFrozenColumnTypeCQL + ")"));
     }
 
     @FunctionalInterface
@@ -194,7 +453,7 @@ public class ColumnConditionTest
     }
 
     @Test
-    public void testSimpleBoundIsSatisfiedByValue() throws InvalidRequestException
+    public void testSimpleAndUDTBoundIsSatisfiedByValue() throws InvalidRequestException
     {
         // EQ
         assertTrue(conditionApplies(ONE, EQ, ONE));
@@ -359,6 +618,18 @@ public class ColumnConditionTest
         assertFalse(conditionContainsApplies(list(ZERO, ONE, TWO), CONTAINS, ByteBufferUtil.EMPTY_BYTE_BUFFER));
         assertFalse(conditionContainsApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS, ONE));
         assertTrue(conditionContainsApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+
+        //ELEMENT
+        assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, ZERO));
+        assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, ONE));
+        assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, TWO));
+
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, ONE));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, TWO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, ZERO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, TWO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, ZERO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, ONE));
     }
 
     private static SortedSet<ByteBuffer> set(ByteBuffer... values)
@@ -611,5 +882,96 @@ public class ColumnConditionTest
         assertTrue(conditionContainsApplies(map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS_KEY, ONE));
         assertFalse(conditionContainsApplies(map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS_KEY, ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
+        // Element access
+        assertTrue(conditionElementApplies(map(ZERO, ONE), ZERO, EQ, ONE));
+        assertFalse(conditionElementApplies(map(ZERO, ONE), ZERO, EQ, TWO));
+        assertFalse(conditionElementApplies(map(ZERO, ONE), ONE, EQ, TWO));
+        assertFalse(conditionElementApplies(map(), ZERO, EQ, TWO));
+    }
+
+    @Test
+    public void testMultiCellUDTBound() throws InvalidRequestException
+    {
+        // EQ
+        assertTrue(conditionUDTApplies(list(ONE), EQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list(ZERO)));
+        assertFalse(conditionUDTApplies(list(ZERO), EQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE, ONE), EQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list(ONE, ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list()));
+
+        assertFalse(conditionUDTApplies(list(ONE), EQ, (List<ByteBuffer>)null));
+        assertFalse(conditionUDTApplies(null, EQ, list(ONE)));
+        assertTrue(conditionUDTApplies((List<ByteBuffer>) null, EQ, (List<ByteBuffer>) null));
+
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), EQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), EQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // NEQ
+        assertFalse(conditionUDTApplies(list(ONE), NEQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ZERO)));
+        assertTrue(conditionUDTApplies(list(ZERO), NEQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE, ONE), NEQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ONE, ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list()));
+        assertTrue(conditionUDTApplies(list(), NEQ, list(ONE)));
+
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, (List<ByteBuffer>)null));
+        assertTrue(conditionUDTApplies(null, NEQ, list(ONE)));
+        assertFalse(conditionUDTApplies((List<ByteBuffer>) null, NEQ, (List<ByteBuffer>) null));
+
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), NEQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), NEQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // LT
+        assertFalse(conditionUDTApplies(list(ONE), LT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(), LT, list()));
+        assertFalse(conditionUDTApplies(list(ONE), LT, list(ZERO)));
+        assertTrue(conditionUDTApplies(list(ZERO), LT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE, ONE), LT, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), LT, list(ONE, ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), LT, list()));
+
+        assertFalse(conditionUDTApplies(list(ONE), LT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // LTE
+        assertTrue(conditionUDTApplies(list(ONE), LTE, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), LTE, list(ZERO)));
+        assertTrue(conditionUDTApplies(list(ZERO), LTE, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE, ONE), LTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), LTE, list(ONE, ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), LTE, list()));
+
+        assertFalse(conditionUDTApplies(list(ONE), LTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // GT
+        assertFalse(conditionUDTApplies(list(ONE), GT, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GT, list(ZERO)));
+        assertFalse(conditionUDTApplies(list(ZERO), GT, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE, ONE), GT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), GT, list(ONE, ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GT, list()));
+
+        assertTrue(conditionUDTApplies(list(ONE), GT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // GTE
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list(ZERO)));
+        assertFalse(conditionUDTApplies(list(ZERO), GTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE, ONE), GTE, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), GTE, list(ONE, ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list()));
+
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
index 8edb06afd2..17088c8ad2 100644
--- a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
@@ -18,19 +18,11 @@
 
 package org.apache.cassandra.service.accord.txn;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
 import accord.api.Key;
 import accord.primitives.Keys;
 import accord.primitives.Txn;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -48,6 +40,9 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 
+import java.nio.ByteBuffer;
+import java.util.*;
+
 public class TxnBuilder
 {
     private final List<TxnNamedRead> reads = new ArrayList<>();
@@ -107,7 +102,7 @@ public class TxnBuilder
     private TxnReference reference(TxnDataName name, String column)
     {
         // do any reads match the name?
-        Optional<TxnNamedRead> match = reads.stream().filter(n -> n.name().equals(name)).findFirst();
+        Optional<TxnNamedRead> match = reads.stream().filter(n -> n.txnDataName().equals(name)).findFirst();
         if (!match.isPresent())
             throw new IllegalArgumentException("Attempted to create a reference for " + name + " but no read exists with that name");
         TxnNamedRead read = match.get();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org