You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2022/12/09 20:01:38 UTC

[cassandra] branch cas-accord-v2 created (now 9b7cb56c4e)

This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a change to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


      at 9b7cb56c4e Support CAS and serial read on Accord

This branch includes the following new commits:

     new 5acac48812 first batch of fixes after David's review
     new bc96d9565c fixes from David's feedback branch
     new 1c8f38796c post-rebase fix to AccordIntegrationTest
     new d4a8b5b39f removing TxnBuilder-based tests from TransactionStatementTest
     new 9b7cb56c4e Support CAS and serial read on Accord

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 03/05: post-rebase fix to AccordIntegrationTest

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 1c8f38796c35aa75cda755f732494c6156c6887c
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Dec 6 14:26:25 2022 -0600

    post-rebase fix to AccordIntegrationTest
---
 .../cassandra/distributed/test/accord/AccordIntegrationTest.java  | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
index d8ad1ec6e1..b26f549516 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
@@ -32,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.messages.Commit;
-import accord.primitives.Keys;
+import accord.primitives.Unseekables;
 import accord.topology.Topologies;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -121,7 +121,7 @@ public class AccordIntegrationTest extends AccordTestBase
 	at accord.messages.PreAccept.process(PreAccept.java:90)
 	at accord.messages.TxnRequest.process(TxnRequest.java:145)
 	at org.apache.cassandra.service.accord.AccordVerbHandler.doVerb(AccordVerbHandler.java:46)
-	at org.apache.cassandra.net.InboundSink.lambda$new$0(InboundSink.java:78)    
+	at org.apache.cassandra.net.InboundSink.lambda$new$0(InboundSink.java:78)
      */
     @Ignore
     @Test
@@ -173,8 +173,8 @@ public class AccordIntegrationTest extends AccordTestBase
             for (int i = 0; i < keyStrings.size(); i++)
                 txn.withRead("row" + i, "SELECT * FROM " + keyspace + ".tbl WHERE k=" + keyStrings.get(i) + " and c=0");
 
-            Keys keySet = txn.build().keys();
-            Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(keySet, 1);
+            Unseekables<?, ?> routables = txn.build().keys().toUnseekables();
+            Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(routables, 1);
             // we don't detect out-of-bounds read/write yet, so use this to validate we reach different shards
             Assertions.assertThat(topology.totalShards()).isEqualTo(2);
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/05: first batch of fixes after David's review

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 5acac4881225735b3e354ca376c15a49518feea6
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Mon Dec 5 17:14:19 2022 -0600

    first batch of fixes after David's review
---
 src/antlr/Parser.g                                 |  2 +-
 .../cassandra/audit/AuditLogEntryCategory.java     |  2 +-
 .../apache/cassandra/audit/AuditLogEntryType.java  |  4 +---
 .../org/apache/cassandra/audit/AuditLogFilter.java |  2 +-
 src/java/org/apache/cassandra/config/Config.java   |  3 +--
 .../cassandra/config/DatabaseDescriptor.java       | 25 ++++++++++++++++------
 .../apache/cassandra/service/StorageService.java   | 11 ++++++++++
 .../cassandra/service/StorageServiceMBean.java     |  5 +++++
 .../cassandra/service/accord/AccordService.java    |  4 ++--
 .../cassandra/service/accord/txn/TxnDataName.java  | 16 ++++++++------
 .../org/apache/cassandra/utils/ByteBufferUtil.java | 11 ++++++++--
 .../distributed/test/accord/AccordTestBase.java    | 25 ++++++++++++----------
 .../cassandra/config/DatabaseDescriptorTest.java   |  2 ++
 .../org/apache/cassandra/utils/Generators.java     | 18 +++++++++++-----
 14 files changed, 89 insertions(+), 41 deletions(-)

diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index c91329e115..b17ffc6f1a 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -83,7 +83,7 @@ options {
     public RowDataReference.Raw newRowDataReference(Selectable.RawIdentifier tuple, Selectable.Raw selectable)
     {
         if (!isParsingTxn)
-            throw new IllegalStateException();
+            throw new SyntaxException("Cannot create a row data reference unless parsing a transaction");
 
         if (references == null)
             references = new ArrayList<>();
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
index 9db4ce05e9..ae5efc6677 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryCategory.java
@@ -23,5 +23,5 @@ package org.apache.cassandra.audit;
  */
 public enum AuditLogEntryCategory
 {
-    QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE
+    QUERY, DML, DDL, DCL, OTHER, AUTH, ERROR, PREPARE, TRANSACTION
 }
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
index ec3fe4d54c..9285429a5f 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntryType.java
@@ -60,9 +60,7 @@ public enum AuditLogEntryType
     CREATE_ROLE(AuditLogEntryCategory.DCL),
     USE_KEYSPACE(AuditLogEntryCategory.OTHER),
     DESCRIBE(AuditLogEntryCategory.OTHER),
-    
-    // TODO: Is DML the most appropriate classification, given a transaction can read, write, or both?
-    TRANSACTION(AuditLogEntryCategory.DML),
+    TRANSACTION(AuditLogEntryCategory.TRANSACTION),
 
     /*
      * Common Audit Log Entry Types
diff --git a/src/java/org/apache/cassandra/audit/AuditLogFilter.java b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
index d240e78c83..d75e544752 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogFilter.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogFilter.java
@@ -28,7 +28,7 @@ final class AuditLogFilter
 {
     private static final Logger logger = LoggerFactory.getLogger(AuditLogFilter.class);
 
-    private static ImmutableSet<String> EMPTY_FILTERS = ImmutableSet.of();
+    private static final ImmutableSet<String> EMPTY_FILTERS = ImmutableSet.of();
 
     final ImmutableSet<String> excludedKeyspaces;
     final ImmutableSet<String> includedKeyspaces;
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 8a7dcacf44..db1ed12c8c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -1173,11 +1173,10 @@ public class Config
             String value;
             try
             {
-                // don't use exceptions for normal control flow!
                 Object obj = field.get(config);
                 value = obj != null ? obj.toString() : "null";
             }
-            catch (NullPointerException | IllegalAccessException npe)
+            catch (IllegalAccessException npe)
             {
                 value = "null";
             }
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 57c7db266b..2955635694 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1192,46 +1192,52 @@ public class DatabaseDescriptor
     @VisibleForTesting
     static void checkForLowestAcceptedTimeouts(Config conf)
     {
-        if(conf.read_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.read_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("read_request_timeout", conf.read_request_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.read_request_timeout = new DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.range_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.range_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("range_request_timeout", conf.range_request_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.range_request_timeout = new DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("request_timeout", conf.request_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.request_timeout = new DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.write_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.write_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("write_request_timeout", conf.write_request_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.write_request_timeout = new DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.cas_contention_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.cas_contention_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("cas_contention_timeout", conf.cas_contention_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.cas_contention_timeout = new DurationSpec.LongMillisecondsBound("10ms");
         }
 
-        if(conf.counter_write_request_timeout.toMilliseconds()< LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.counter_write_request_timeout.toMilliseconds()< LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("counter_write_request_timeout", conf.counter_write_request_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.counter_write_request_timeout = new DurationSpec.LongMillisecondsBound("10ms");
         }
-        if(conf.truncate_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        if (conf.truncate_request_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
         {
             logInfo("truncate_request_timeout", conf.truncate_request_timeout, LOWEST_ACCEPTED_TIMEOUT);
             conf.truncate_request_timeout = LOWEST_ACCEPTED_TIMEOUT;
         }
+
+        if (conf.transaction_timeout.toMilliseconds() < LOWEST_ACCEPTED_TIMEOUT.toMilliseconds())
+        {
+            logInfo("transaction_timeout", conf.transaction_timeout, LOWEST_ACCEPTED_TIMEOUT);
+            conf.transaction_timeout = LOWEST_ACCEPTED_TIMEOUT;
+        }
     }
 
     private static void logInfo(String property, DurationSpec.LongMillisecondsBound actualValue, DurationSpec.LongMillisecondsBound lowestAcceptedValue)
@@ -1860,6 +1866,11 @@ public class DatabaseDescriptor
         return conf.transaction_timeout.to(unit);
     }
 
+    public static void setTransactionTimeout(long timeOutInMillis)
+    {
+        conf.transaction_timeout = new DurationSpec.LongMillisecondsBound(timeOutInMillis);
+    }
+
     public static void setCasContentionTimeout(long timeOutInMillis)
     {
         conf.cas_contention_timeout = new DurationSpec.LongMillisecondsBound(timeOutInMillis);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index f20167569a..70f4308154 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1587,6 +1587,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.getTruncateRpcTimeout(MILLISECONDS);
     }
 
+    public void setTransactionTimeout(long value)
+    {
+        DatabaseDescriptor.setTransactionTimeout(value);
+        logger.info("set transaction timeout to {} ms", value);
+    }
+
+    public long getTransactionTimeout()
+    {
+        return DatabaseDescriptor.getTransactionTimeout(MILLISECONDS);
+    }
+
     @Deprecated
     public void setStreamThroughputMbPerSec(int value)
     {
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c4e143410e..c036bf15bc 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import javax.management.NotificationEmitter;
@@ -33,6 +34,7 @@ import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
+import org.apache.cassandra.config.DurationSpec;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.BreaksJMX;
@@ -630,6 +632,9 @@ public interface StorageServiceMBean extends NotificationEmitter
     public void setTruncateRpcTimeout(long value);
     public long getTruncateRpcTimeout();
 
+    public void setTransactionTimeout(long value);
+    public long getTransactionTimeout();
+
     public void setStreamThroughputMbitPerSec(int value);
     /**
      * @return stream_throughput_outbound in megabits
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 344f7829d0..6d21f652de 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -115,7 +115,7 @@ public class AccordService implements Shutdownable
         try
         {
             Future<Result> future = node.coordinate(txn);
-            Result result = future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.SECONDS), TimeUnit.SECONDS);
+            Result result = future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
             return (TxnData) result;
         }
         catch (ExecutionException e)
@@ -123,7 +123,7 @@ public class AccordService implements Shutdownable
             Throwable cause = e.getCause();
             if (cause instanceof Timeout)
                 throw throwTimeout(txn);
-            throw new RuntimeException(e);
+            throw new RuntimeException(cause);
         }
         catch (InterruptedException e)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index e073ee5d01..e4526beabd 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -32,10 +32,15 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 
 public class TxnDataName implements Comparable<TxnDataName>
 {
     private static final TxnDataName RETURNING = new TxnDataName(Kind.RETURNING);
+    private static final long EMPTY_SIZE = ObjectSizes.measure(RETURNING);
 
     public enum Kind
     {
@@ -114,8 +119,7 @@ public class TxnDataName implements Comparable<TxnDataName>
     {
         if (kind != Kind.AUTO_READ)
             return false;
-        return metadata.keyspace.equals(parts[0])
-               && metadata.name.equals(parts[1]);
+        return metadata.keyspace.equals(parts[0]) && metadata.name.equals(parts[1]);
     }
 
     public DecoratedKey getDecoratedKey(TableMetadata metadata)
@@ -139,7 +143,7 @@ public class TxnDataName implements Comparable<TxnDataName>
 
     public long estimatedSizeOnHeap()
     {
-        long size = 0;
+        long size = EMPTY_SIZE;
         for (String part : parts)
             size += part.length();
         return size;
@@ -197,7 +201,7 @@ public class TxnDataName implements Comparable<TxnDataName>
         public void serialize(TxnDataName t, DataOutputPlus out, int version) throws IOException
         {
             out.writeByte(t.kind.value);
-            out.writeInt(t.parts.length);
+            out.writeUnsignedVInt(t.parts.length);
             for (String part : t.parts)
                 out.writeUTF(part);
         }
@@ -206,7 +210,7 @@ public class TxnDataName implements Comparable<TxnDataName>
         public TxnDataName deserialize(DataInputPlus in, int version) throws IOException
         {
             Kind kind = Kind.from(in.readByte());
-            int length = in.readInt();
+            int length = checkedCast(in.readUnsignedVInt());
             String[] parts = new String[length];
             for (int i = 0; i < length; i++)
                 parts[i] = in.readUTF();
@@ -216,7 +220,7 @@ public class TxnDataName implements Comparable<TxnDataName>
         @Override
         public long serializedSize(TxnDataName t, int version)
         {
-            int size = Byte.BYTES + Integer.BYTES;
+            int size = Byte.BYTES + sizeofUnsignedVInt(t.parts.length);
             for (String part : t.parts)
                 size += TypeSizes.sizeof(part);
             return size;
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 9310741580..062d65f6e1 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -565,7 +565,12 @@ public class ByteBufferUtil
             // convert subtypes to BB
             Map<ByteBuffer, ByteBuffer> bbs = new LinkedHashMap<>();
             for (Map.Entry<?, ?> e : map.entrySet())
-                bbs.put(objectToBytes(e.getKey()), objectToBytes(e.getValue()));
+            {
+                Object key = e.getKey();
+                ByteBuffer previousValue = bbs.put(objectToBytes(key), objectToBytes(e.getValue()));
+                if (previousValue != null)
+                    throw new IllegalStateException("Key " + key + " already maps to value " + previousValue);
+            }
             // decompose/serializer doesn't use the isMultiCell, so safe to do this
             return MapType.getInstance(BytesType.instance, BytesType.instance, false).decompose(bbs);
         }
@@ -574,7 +579,9 @@ public class ByteBufferUtil
             Set<?> set = (Set<?>) obj;
             // convert subtypes to BB
             Set<ByteBuffer> bbs = new LinkedHashSet<>();
-            set.forEach(o -> bbs.add(objectToBytes(o)));
+            for (Object o : set)
+                if (!bbs.add(objectToBytes(o)))
+                    throw new IllegalStateException("Object " + o + " maps to a buffer that already exists in the set");
             // decompose/serializer doesn't use the isMultiCell, so safe to do this
             return SetType.getInstance(BytesType.instance, false).decompose(bbs);
         }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index c0e40707f5..60b583b40a 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.base.Throwables;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
@@ -44,6 +43,7 @@ import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.utils.AssertionUtils;
 import org.apache.cassandra.utils.FailingConsumer;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -90,10 +90,14 @@ public abstract class AccordTestBase extends TestBaseImpl
         // Evict commands from the cache immediately to expose problems loading from disk.
         sharedCluster.forEach(node -> node.runOnInstance(() -> AccordService.instance().setCacheSize(0)));
 
-        fn.accept(sharedCluster);
-
-        // Reset any messaging filters.
-        sharedCluster.filters().reset();
+        try
+        {
+            fn.accept(sharedCluster);
+        }
+        finally
+        {
+            sharedCluster.filters().reset();
+        }
     }
 
     protected void test(FailingConsumer<Cluster> fn) throws Exception
@@ -108,7 +112,7 @@ public abstract class AccordTestBase extends TestBaseImpl
         return init(Cluster.build(2)
                            .withoutVNodes()
                            .withConfig(c -> c.with(Feature.NETWORK).set("write_request_timeout", "10s").set("transaction_timeout", "15s"))
-                           .withInstanceInitializer(ByteBuddyHelper::install)
+                           .withInstanceInitializer(EnforceUpdateDoesNotPerformRead::install)
                            .start());
     }
 
@@ -128,20 +132,20 @@ public abstract class AccordTestBase extends TestBaseImpl
         }
         catch (Throwable t)
         {
-            if (Throwables.getRootCause(t).toString().contains(Preempted.class.getName()))
+            if (AssertionUtils.rootCauseIs(Preempted.class).matches(t))
                 return executeWithRetry(cluster, check, boundValues);
 
             throw t;
         }
     }
 
-    public static class ByteBuddyHelper
+    public static class EnforceUpdateDoesNotPerformRead
     {
         public static void install(ClassLoader classLoader, Integer num)
         {
             new ByteBuddy().rebase(ModificationStatement.class)
                            .method(named("readRequiredLists"))
-                           .intercept(MethodDelegation.to(ByteBuddyHelper.class))
+                           .intercept(MethodDelegation.to(EnforceUpdateDoesNotPerformRead.class))
                            .make()
                            .load(classLoader, ClassLoadingStrategy.Default.INJECTION);
         }
@@ -152,8 +156,7 @@ public abstract class AccordTestBase extends TestBaseImpl
             Map<?, ?> map = fn.call();
             if (map != null)
             {
-                // if the call tree has a TransactionStatement, then fail as this violates
-                // the query
+                // if the call tree has a TransactionStatement, then fail as this violates the query
                 for (StackTraceElement e : Thread.currentThread().getStackTrace())
                     if (TransactionStatement.class.getCanonicalName().equals(e.getClassName()))
                         throw new IllegalStateException("Attempted to load required partition!");
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 2b72cd733b..39f9636a43 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -383,6 +383,7 @@ public class DatabaseDescriptorTest
         testConfig.cas_contention_timeout = lowerThanLowestTimeout;
         testConfig.counter_write_request_timeout = lowerThanLowestTimeout;
         testConfig.request_timeout = lowerThanLowestTimeout;
+        testConfig.transaction_timeout = lowerThanLowestTimeout;
 
         DatabaseDescriptor.checkForLowestAcceptedTimeouts(testConfig);
 
@@ -393,6 +394,7 @@ public class DatabaseDescriptorTest
         assertEquals(testConfig.cas_contention_timeout, DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
         assertEquals(testConfig.counter_write_request_timeout, DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
         assertEquals(testConfig.request_timeout, DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
+        assertEquals(testConfig.transaction_timeout, DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/utils/Generators.java b/test/unit/org/apache/cassandra/utils/Generators.java
index 51b583e749..d16b7bcfee 100644
--- a/test/unit/org/apache/cassandra/utils/Generators.java
+++ b/test/unit/org/apache/cassandra/utils/Generators.java
@@ -50,9 +50,12 @@ public final class Generators
 
     private static final Constraint DNS_DOMAIN_PARTS_CONSTRAINT = Constraint.between(1, 127);
 
+    private static final char CHAR_UNDERSCORE = 95;
+
     private static final char[] LETTER_DOMAIN = createLetterDomain();
     private static final Constraint LETTER_CONSTRAINT = Constraint.between(0, LETTER_DOMAIN.length - 1).withNoShrinkPoint();
     private static final char[] LETTER_OR_DIGIT_DOMAIN = createLetterOrDigitDomain();
+    private static final char[] LETTER_OR_DIGIT_DOMAIN_WITH_UNDERSCORE = createLetterOrDigitDomainWithUnderscore();
     private static final Constraint LETTER_OR_DIGIT_CONSTRAINT = Constraint.between(0, LETTER_OR_DIGIT_DOMAIN.length - 1).withNoShrinkPoint();
     private static final char[] REGEX_WORD_DOMAIN = createRegexWordDomain();
     private static final Constraint REGEX_WORD_CONSTRAINT = Constraint.between(0, REGEX_WORD_DOMAIN.length - 1).withNoShrinkPoint();
@@ -69,13 +72,10 @@ public final class Generators
         // see CASSANDRA-17919
         return !("P".equals(value) || "PT".equals(value));
     }
-    private static final char CHAR_UNDERSCORE = 95;
+
     public static Gen<String> symbolGen(Gen<Integer> size)
     {
-        char[] domain = new char[LETTER_OR_DIGIT_DOMAIN.length + 1];
-        System.arraycopy(LETTER_OR_DIGIT_DOMAIN, 0, domain, 0, LETTER_OR_DIGIT_DOMAIN.length);
-        domain[domain.length - 1] = CHAR_UNDERSCORE;
-        return string(size, domain, (index, c) -> !(index == 0 && !Character.isLetter(c)));
+        return string(size, LETTER_OR_DIGIT_DOMAIN_WITH_UNDERSCORE, (index, c) -> !(index == 0 && !Character.isLetter(c)));
     }
 
     public static final Gen<UUID> UUID_RANDOM_GEN = rnd -> {
@@ -324,6 +324,14 @@ public final class Generators
         return domain;
     }
 
+    private static char[] createLetterOrDigitDomainWithUnderscore()
+    {
+        char[] domain = new char[LETTER_OR_DIGIT_DOMAIN.length + 1];
+        System.arraycopy(LETTER_OR_DIGIT_DOMAIN, 0, domain, 0, LETTER_OR_DIGIT_DOMAIN.length);
+        domain[domain.length - 1] = CHAR_UNDERSCORE;
+        return domain;
+    }
+
     private static char[] createRegexWordDomain()
     {
         // \w == [a-zA-Z_0-9] the only difference with letterOrDigit is the addition of _


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 05/05: Support CAS and serial read on Accord

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9b7cb56c4e368c08e5927c239724d6236d1699c3
Author: Ariel Weisberg <aw...@apple.com>
AuthorDate: Fri Dec 2 14:05:06 2022 -0500

    Support CAS and serial read on Accord
    
    patch by Ariel Weisberg; Reviewed by ? and ? for CASSANDRA-18100
---
 src/java/org/apache/cassandra/config/Config.java   |  31 +-
 .../cassandra/config/DatabaseDescriptor.java       |  11 +-
 src/java/org/apache/cassandra/cql3/Lists.java      |  24 +-
 src/java/org/apache/cassandra/cql3/Operator.java   |  38 ++
 .../cassandra/cql3/conditions/ColumnCondition.java | 379 ++++++++++++++-
 .../cassandra/cql3/statements/CQL3CasRequest.java  | 134 +++++-
 .../org/apache/cassandra/db/ClusteringPrefix.java  |   2 +
 .../org/apache/cassandra/db/marshal/ListType.java  |   4 +-
 .../org/apache/cassandra/db/marshal/MapType.java   |  11 +-
 .../org/apache/cassandra/db/marshal/SetType.java   |   9 +-
 .../apache/cassandra/db/marshal/ValueAccessor.java |   9 +-
 .../serializers/CollectionSerializer.java          |   2 +-
 .../org/apache/cassandra/service/CASRequest.java   |   7 +
 .../org/apache/cassandra/service/StorageProxy.java |  51 ++-
 .../service/accord/AccordPartialCommand.java       |   4 +-
 .../service/accord/AccordSerializers.java          |  92 +++-
 .../serializers/BeginInvalidationSerializers.java  |   8 +-
 .../accord/serializers/CheckStatusSerializers.java |  16 +-
 .../accord/serializers/CommitSerializers.java      |   8 +-
 .../accord/serializers/PreacceptSerializers.java   |  13 +-
 .../accord/serializers/RecoverySerializers.java    |  11 +-
 .../cassandra/service/accord/txn/TxnCondition.java |  75 ++-
 .../cassandra/service/accord/txn/TxnData.java      |   1 +
 .../cassandra/service/accord/txn/TxnDataName.java  |  33 +-
 .../cassandra/service/accord/txn/TxnNamedRead.java |   2 +-
 .../cassandra/service/accord/txn/TxnQuery.java     |  44 +-
 .../cassandra/service/accord/txn/TxnRead.java      |  14 +-
 .../cassandra/service/accord/txn/TxnUpdate.java    |  23 +-
 .../service/paxos/PaxosPrepareRefresh.java         |   5 +-
 .../cassandra/service/paxos/PaxosRepair.java       |  29 +-
 .../apache/cassandra/service/paxos/PaxosState.java |   1 +
 .../org/apache/cassandra/utils/ByteBufferUtil.java |  85 +++-
 .../apache/cassandra/utils/NullableSerializer.java |   4 +-
 .../distributed/test/ByteBuddyExamples.java        |   1 -
 .../distributed/test/accord/AccordCQLTest.java     |  61 ++-
 .../distributed/test/accord/AccordTestBase.java    |  63 ++-
 .../cql3/conditions/ColumnConditionTest.java       | 506 ++++++++++++++++++---
 .../cassandra/service/accord/txn/TxnBuilder.java   |  17 +-
 38 files changed, 1550 insertions(+), 278 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index db1ed12c8c..eed967d68f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -21,19 +21,17 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Supplier;
-
 import javax.annotation.Nullable;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1054,6 +1052,8 @@ public class Config
      */
     public volatile int paxos_repair_parallelism = -1;
 
+    public volatile LegacyPaxosStrategy legacy_paxos_strategy = LegacyPaxosStrategy.migration;
+
     public volatile int max_top_size_partition_count = 10;
     public volatile int max_top_tombstone_partition_count = 10;
     public volatile DataStorageSpec.LongBytesBound min_tracked_partition_size = new DataStorageSpec.LongBytesBound("1MiB");
@@ -1149,6 +1149,31 @@ public class Config
         exception
     }
 
+    /*
+     * How to pick a consensus protocol for CAS
+     * and serial read operations. Transaction statements
+     * will always run on Accord. Legacy in this context includes PaxosV2.
+     */
+    public enum LegacyPaxosStrategy
+    {
+        /*
+         * Default setting
+         *
+         * Allow both Accord and PaxoV1/V2 to run on the same cluster
+         * Some keys and ranges might be running on Accord if they
+         * have been migrated and the rest will run on Paxos until
+         * they are migrated.
+         */
+        migration,
+
+        /*
+         * Everything will be run on Accord. Useful for new deployments
+         * that don't want to accidentally start using legacy Paxos
+         * requiring migration to Accord.
+         */
+        accord
+    }
+
     private static final Set<String> SENSITIVE_KEYS = new HashSet<String>() {{
         add("client_encryption_options");
         add("server_encryption_options");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2955635694..7015382aca 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
-
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -2838,6 +2837,16 @@ public class DatabaseDescriptor
         return conf.paxos_topology_repair_strict_each_quorum;
     }
 
+    public static Config.LegacyPaxosStrategy getLegacyPaxosStrategy()
+    {
+        return conf.legacy_paxos_strategy;
+    }
+
+    public static void setLegacyPaxosStrategy(Config.LegacyPaxosStrategy strategy)
+    {
+        conf.legacy_paxos_strategy = strategy;
+    }
+
     public static void setNativeTransportMaxRequestDataInFlightPerIpInBytes(long maxRequestDataInFlightInBytes)
     {
         if (maxRequestDataInFlightInBytes == -1)
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index f24e94aac3..2ad2790006 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -17,10 +17,6 @@
  */
 package org.apache.cassandra.cql3;
 
-import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
-import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
-import static org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -30,22 +26,30 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import org.apache.cassandra.db.guardrails.Guardrails;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.schema.ColumnMetadata;
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
+
 /**
  * Static helper methods and classes for lists.
  */
@@ -210,8 +214,10 @@ public abstract class Lists
                 List<?> l = type.getSerializer().deserializeForNativeProtocol(value, ByteBufferAccessor.instance, version);
                 List<ByteBuffer> elements = new ArrayList<>(l.size());
                 for (Object element : l)
+                {
                     // elements can be null in lists that represent a set of IN values
                     elements.add(element == null ? null : type.getElementsType().decompose(element));
+                }
                 return new Value(elements);
             }
             catch (MarshalException e)
diff --git a/src/java/org/apache/cassandra/cql3/Operator.java b/src/java/org/apache/cassandra/cql3/Operator.java
index bcb5f63be6..8985a33f71 100644
--- a/src/java/org/apache/cassandra/cql3/Operator.java
+++ b/src/java/org/apache/cassandra/cql3/Operator.java
@@ -26,9 +26,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.ListSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
 public enum Operator
 {
     EQ(0)
@@ -285,6 +290,17 @@ public enum Operator
         output.writeInt(b);
     }
 
+    /**
+     * Write the serialized version of this <code>Operator</code> to the specified output.
+     *
+     * @param output the output to write to
+     * @throws IOException if an I/O problem occurs while writing to the specified output
+     */
+    public void writeToUnsignedVInt(DataOutputPlus output) throws IOException
+    {
+        output.writeUnsignedVInt(b);
+    }
+
     public int getValue()
     {
         return b;
@@ -307,6 +323,23 @@ public enum Operator
           throw new IOException(String.format("Cannot resolve Relation.Type from binary representation: %s", b));
     }
 
+    /**
+     * Deserializes a <code>Operator</code> instance from the specified input.
+     *
+     * @param input the input to read from
+     * @return the <code>Operator</code> instance deserialized
+     * @throws IOException if a problem occurs while deserializing the <code>Type</code> instance.
+     */
+    public static Operator readFromUnsignedVInt(DataInputPlus input) throws IOException
+    {
+        int b = checkedCast(input.readUnsignedVInt());
+        for (Operator operator : values())
+            if (operator.b == b)
+                return operator;
+
+        throw new IOException(String.format("Cannot resolve Relation.Type from binary representation: %s", b));
+    }
+
     /**
      * Whether 2 values satisfy this operator (given the type they should be compared with).
      */
@@ -358,4 +391,9 @@ public enum Operator
     {
         return this == CONTAINS_KEY;
     }
+
+    public long sizeAsUnsignedVInt()
+    {
+        return sizeofUnsignedVInt(b);
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
index 9d9506ff53..ca45c2adf4 100644
--- a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
@@ -17,24 +17,73 @@
  */
 package org.apache.cassandra.cql3.conditions;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Suppliers;
 import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.Term.Terminal;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.cql3.VariableSpecifications;
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
 
-import static org.apache.cassandra.cql3.statements.RequestValidations.*;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.primitives.Ints.checkedCast;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+import static org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER;
+import static org.apache.cassandra.utils.ByteBufferUtil.vintNullableSerializer;
+import static org.apache.cassandra.utils.ByteBufferUtil.vintSerializer;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
 
 /**
  * A CQL3 condition on the value of a column or collection element.  For example, "UPDATE .. IF a = 0".
@@ -73,7 +122,7 @@ public abstract class ColumnCondition
         terms.collectMarkerSpecification(boundNames);
     }
 
-    public abstract ColumnCondition.Bound bind(QueryOptions options);
+    public abstract Bound bind(QueryOptions options);
 
     protected final List<ByteBuffer> bindAndGetTerms(QueryOptions options)
     {
@@ -107,7 +156,7 @@ public abstract class ColumnCondition
         {
             T value = values.get(i);
             // The value can be ByteBuffer or Constants.Value so we need to check the 2 type of UNSET
-            if (value != ByteBufferUtil.UNSET_BYTE_BUFFER && value != Constants.UNSET_VALUE)
+            if (value != UNSET_BYTE_BUFFER && value != UNSET_VALUE)
                 filtered.add(value);
         }
         return filtered;
@@ -210,13 +259,51 @@ public abstract class ColumnCondition
         return new UDTFieldCondition(column, udtField, op, terms);
     }
 
+    /*
+     * Don't change or remove entries (and ordinals) or it will break cross version compatibility
+     * as the ordinals are sent in messages and stored by Accord for in flight transactions
+     */
+    enum BoundKind
+    {
+        ELEMENT_ACCESS,
+        MULTI_CELL_COLLECTION,
+        MULTI_CELL_UDT,
+        SIMPLE,
+        UDT_FIELD_ACCESS;
+
+        @SuppressWarnings("rawtypes")
+        BoundSerializer serializer()
+        {
+            switch(this)
+            {
+                case ELEMENT_ACCESS:
+                    return ElementAccessBound.serializer;
+                case MULTI_CELL_COLLECTION:
+                    return MultiCellCollectionBound.serializer;
+                case MULTI_CELL_UDT:
+                    return MultiCellUdtBound.serializer;
+                case SIMPLE:
+                     return SimpleBound.serializer;
+                case UDT_FIELD_ACCESS:
+                    return UDTFieldAccessBound.serializer;
+                default:
+                    throw new AssertionError("Shouldn't have an enum with no serializer");
+            }
+        }
+    }
+
     public static abstract class Bound
     {
+        @Nonnull
         public final ColumnMetadata column;
+
+        @Nonnull
         public final Operator comparisonOperator;
 
         protected Bound(ColumnMetadata column, Operator operator)
         {
+            checkNotNull(column);
+            checkNotNull(operator);
             this.column = column;
             // If the operator is an IN we want to compare the value using an EQ.
             this.comparisonOperator = operator.isIN() ? Operator.EQ : operator;
@@ -235,7 +322,7 @@ public abstract class ColumnCondition
         /** Returns true if the operator is satisfied (i.e. "otherValue operator value == true"), false otherwise. */
         public static boolean compareWithOperator(Operator operator, AbstractType<?> type, ByteBuffer value, ByteBuffer otherValue)
         {
-            if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+            if (value == UNSET_BYTE_BUFFER)
                 throw invalidRequest("Invalid 'unset' value in condition");
 
             if (value == null)
@@ -257,6 +344,55 @@ public abstract class ColumnCondition
             }
             return operator.isSatisfiedBy(type, otherValue, value);
         }
+
+        void checkForUnsetValues(List<ByteBuffer> values)
+        {
+            for (ByteBuffer buffer : values)
+                if (buffer == UNSET_BYTE_BUFFER)
+                    throw invalidRequest("Invalid 'unset' value in condition");
+        }
+        protected abstract BoundKind kind();
+
+        public static final IVersionedSerializer<Bound> serializer = new IVersionedSerializer<Bound>()
+        {
+            @Override
+            @SuppressWarnings("unchecked")
+            public void serialize(Bound bound, DataOutputPlus out, int version) throws IOException
+            {
+                columnMetadataSerializer.serialize(bound.column, out, version);
+                bound.comparisonOperator.writeToUnsignedVInt(out);
+                BoundKind kind = bound.kind();
+                out.writeUnsignedVInt(kind.ordinal());
+                kind.serializer().serialize(bound, out, version);
+            }
+
+            @Override
+            public Bound deserialize(DataInputPlus in, int version) throws IOException
+            {
+                ColumnMetadata column = columnMetadataSerializer.deserialize(in, version);
+                Operator comparisonOperator = Operator.readFromUnsignedVInt(in);
+                BoundKind kind = BoundKind.values()[checkedCast(in.readUnsignedVInt())];
+                return kind.serializer().deserialize(in, version, column, comparisonOperator);
+            }
+
+            @Override
+            @SuppressWarnings("unchecked")
+            public long serializedSize(Bound bound, int version)
+            {
+                BoundKind kind = bound.kind();
+                return columnMetadataSerializer.serializedSize(bound.column, version)
+                       + bound.comparisonOperator.sizeAsUnsignedVInt()
+                       + sizeofUnsignedVInt(kind.ordinal())
+                       + kind.serializer().serializedSize(bound, version);
+            }
+        };
+    }
+
+    private interface BoundSerializer<T extends Bound>
+    {
+        void serialize(T bound, DataOutputPlus out, int version) throws IOException;
+        Bound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException;
+        long serializedSize(T condition, int version);
     }
 
     protected static final Cell<?> getCell(Row row, ColumnMetadata column)
@@ -317,6 +453,7 @@ public abstract class ColumnCondition
         private SimpleBound(ColumnMetadata column, Operator operator, List<ByteBuffer> values)
         {
             super(column, operator);
+            checkForUnsetValues(values);
             this.values = values;
         }
 
@@ -341,6 +478,34 @@ public abstract class ColumnCondition
             }
             return false;
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.SIMPLE;
+        }
+
+        private static final BoundSerializer<SimpleBound> serializer = new BoundSerializer<SimpleBound>()
+        {
+            @Override
+            public void serialize(SimpleBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                serializeList(bound.values, out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public SimpleBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                return new SimpleBound(column, operator, values);
+            }
+
+            @Override
+            public long serializedSize(SimpleBound bound, int version)
+            {
+                return serializedListSize(bound.values, version, vintNullableSerializer);
+            }
+        };
     }
 
     /**
@@ -351,11 +516,13 @@ public abstract class ColumnCondition
         /**
          * The collection element
          */
+        @Nonnull
         private final ByteBuffer collectionElement;
 
         /**
          * The conditions values.
          */
+        @Nonnull
         private final List<ByteBuffer> values;
 
         public ElementAccessBound(ColumnMetadata column,
@@ -364,6 +531,11 @@ public abstract class ColumnCondition
                                    List<ByteBuffer> values)
         {
             super(column, operator);
+            checkForUnsetValues(values);
+
+            boolean isMap = column.type instanceof MapType;
+            if (collectionElement == null)
+                throw invalidRequest("Invalid null value for %s element access", isMap ? "map" : "list");
 
             this.collectionElement = collectionElement;
             this.values = values;
@@ -374,9 +546,6 @@ public abstract class ColumnCondition
         {
             boolean isMap = column.type instanceof MapType;
 
-            if (collectionElement == null)
-                throw invalidRequest("Invalid null value for %s element access", isMap ? "map" : "list");
-
             if (isMap)
             {
                 MapType<?, ?> mapType = (MapType<?, ?>) column.type;
@@ -445,6 +614,36 @@ public abstract class ColumnCondition
             checkFalse(idx < 0, "Invalid negative list index %d", idx);
             return idx;
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.ELEMENT_ACCESS;
+        }
+
+        private static final BoundSerializer<ElementAccessBound> serializer = new BoundSerializer<ElementAccessBound>()
+        {
+            @Override
+            public void serialize(ElementAccessBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                vintSerializer.serialize(bound.collectionElement, out, version);
+                serializeList(bound.values, out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public ElementAccessBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                ByteBuffer collectionElement = vintSerializer.deserialize(in, version);
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                return new ElementAccessBound(column, collectionElement, operator, values);
+            }
+
+            @Override
+            public long serializedSize(ElementAccessBound bound , int version)
+            {
+                return vintSerializer.serializedSize(bound.collectionElement, version) + serializedListSize(bound.values, version, vintNullableSerializer);
+            }
+        };
     }
 
     /**
@@ -452,26 +651,44 @@ public abstract class ColumnCondition
      */
     public static final class MultiCellCollectionBound extends Bound
     {
-        private final List<Term.Terminal> values;
+        @Nonnull
+        private final List<Terminal> values;
 
-        public MultiCellCollectionBound(ColumnMetadata column, Operator operator, List<Term.Terminal> values)
+        @Nullable
+        private Supplier<List<ByteBuffer>> serializedValues;
+
+        public MultiCellCollectionBound(ColumnMetadata column, Operator operator, List<Terminal> values)
         {
             super(column, operator);
+            checkNotNull(values);
             assert column.type.isMultiCell();
             this.values = values;
         }
 
+        public Supplier<List<ByteBuffer>> serializedValues()
+        {
+            if (serializedValues != null)
+                return serializedValues;
+
+            serializedValues = Suppliers.memoize(() ->
+                                                 values.stream()
+                                                       .map(v -> v == null ? null : v.get(ProtocolVersion.CURRENT))
+                                                       .collect(toList()));
+
+            return serializedValues;
+        }
+
         public boolean appliesTo(Row row)
         {
             return appliesTo(column, comparisonOperator, values, row);
         }
 
-        public static boolean appliesTo(ColumnMetadata column, Operator operator, List<Term.Terminal> values, Row row)
+        public static boolean appliesTo(ColumnMetadata column, Operator operator, List<Terminal> values, Row row)
         {
             CollectionType<?> type = (CollectionType<?>) column.type;
 
             // copy iterator contents so that we can properly reuse them for each comparison with an IN value
-            for (Term.Terminal value : values)
+            for (Terminal value : values)
             {
                 Iterator<Cell<?>> iter = getCells(row, column);
                 if (value == null)
@@ -495,7 +712,7 @@ public abstract class ColumnCondition
             return false;
         }
 
-        private static boolean valueAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, Term.Terminal value, Operator operator)
+        private static boolean valueAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, Terminal value, Operator operator)
         {
             if (value == null)
                 return !iter.hasNext();
@@ -527,7 +744,10 @@ public abstract class ColumnCondition
 
                 // for lists we use the cell value; for sets we use the cell name
                 ByteBuffer cellValue = isSet ? iter.next().path().get(0) : iter.next().buffer();
-                int comparison = type.compare(cellValue, conditionIter.next());
+                ByteBuffer conditionValue = conditionIter.next();
+                if (conditionValue == null)
+                    conditionValue = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                int comparison = type.compare(cellValue, conditionValue);
                 if (comparison != 0)
                     return evaluateComparisonWithOperator(comparison, operator);
             }
@@ -546,6 +766,7 @@ public abstract class ColumnCondition
 
         private static boolean setAppliesTo(SetType<?> type, Iterator<Cell<?>> iter, Set<ByteBuffer> elements, Operator operator)
         {
+            // This is redundant since it is already a sorted set?
             ArrayList<ByteBuffer> sortedElements = new ArrayList<>(elements);
             Collections.sort(sortedElements, type.getElementsType());
             return setOrListAppliesTo(type.getElementsType(), iter, sortedElements.iterator(), operator, true);
@@ -562,13 +783,17 @@ public abstract class ColumnCondition
                 Map.Entry<ByteBuffer, ByteBuffer> conditionEntry = conditionIter.next();
                 Cell<?> c = iter.next();
 
+                ByteBuffer conditionEntryKey = conditionEntry.getKey();
+
                 // compare the keys
-                int comparison = type.getKeysType().compare(c.path().get(0), conditionEntry.getKey());
+                int comparison = type.getKeysType().compare(c.path().get(0), conditionEntryKey);
                 if (comparison != 0)
                     return evaluateComparisonWithOperator(comparison, operator);
 
+                ByteBuffer conditionEntryValue = conditionEntry.getValue();
+
                 // compare the values
-                comparison = type.getValuesType().compare(c.buffer(), conditionEntry.getValue());
+                comparison = type.getValuesType().compare(c.buffer(), conditionEntryValue);
                 if (comparison != 0)
                     return evaluateComparisonWithOperator(comparison, operator);
             }
@@ -579,6 +804,43 @@ public abstract class ColumnCondition
             // they're equal
             return operator == Operator.EQ || operator == Operator.LTE || operator == Operator.GTE;
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.MULTI_CELL_COLLECTION;
+        }
+
+        private static final BoundSerializer<MultiCellCollectionBound> serializer = new BoundSerializer<MultiCellCollectionBound>()
+        {
+            @Override
+            public void serialize(MultiCellCollectionBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                serializeList(bound.serializedValues().get(), out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public MultiCellCollectionBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                List<Terminal> terminals;
+                if (operator.isContains() || operator.isContainsKey())
+                {
+                    terminals = values.stream().map(Constants.Value::new).collect(toList());
+                }
+                else
+                {
+                    terminals = values.stream().map(b -> deserializeCqlCollectionAsTerm(b, column.type)).collect(toList());
+                }
+                return new MultiCellCollectionBound(column, operator, terminals);
+            }
+
+            @Override
+            public long serializedSize(MultiCellCollectionBound bound, int version)
+            {
+                return serializedListSize(bound.serializedValues().get(), version, vintNullableSerializer);
+            }
+        };
     }
 
     private static boolean containsAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, ByteBuffer value, Operator operator)
@@ -622,16 +884,21 @@ public abstract class ColumnCondition
         /**
          * The UDT field.
          */
+        @Nonnull
         private final FieldIdentifier field;
 
         /**
          * The conditions values.
          */
+        @Nonnull
         private final List<ByteBuffer> values;
 
         private UDTFieldAccessBound(ColumnMetadata column, FieldIdentifier field, Operator operator, List<ByteBuffer> values)
         {
             super(column, operator);
+            checkNotNull(field);
+            checkNotNull(values);
+            checkForUnsetValues(values);
             assert column.type.isUDT() && field != null;
             this.field = field;
             this.values = values;
@@ -647,6 +914,7 @@ public abstract class ColumnCondition
         {
             UserType userType = (UserType) column.type;
 
+
             if (column.type.isMultiCell())
             {
                 Cell<?> cell = getCell(row, column, userType.cellPathForField(field));
@@ -677,6 +945,37 @@ public abstract class ColumnCondition
         {
             return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
         }
+
+        @Override
+        protected BoundKind kind()
+        {
+            return BoundKind.UDT_FIELD_ACCESS;
+        }
+
+        private static final BoundSerializer<UDTFieldAccessBound> serializer = new BoundSerializer<UDTFieldAccessBound>()
+        {
+            @Override
+            public void serialize(UDTFieldAccessBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                vintNullableSerializer.serialize(bound.field.bytes, out, version);
+                serializeList(bound.values, out, version, vintNullableSerializer);
+            }
+
+            @Override
+            public UDTFieldAccessBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                FieldIdentifier field = new FieldIdentifier(vintNullableSerializer.deserialize(in, version));
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                return new UDTFieldAccessBound(column, field, operator, values);
+            }
+
+            @Override
+            public long serializedSize(UDTFieldAccessBound bound, int version)
+            {
+
+                return vintNullableSerializer.serializedSize(bound.field.bytes, version) + serializedListSize(bound.values, version, vintNullableSerializer);
+            }
+        };
     }
 
     /**
@@ -687,16 +986,21 @@ public abstract class ColumnCondition
         /**
          * The conditions values.
          */
+        @Nonnull
         private final List<ByteBuffer> values;
 
         /**
          * The protocol version
          */
+        @Nonnull
         private final ProtocolVersion protocolVersion;
 
         public MultiCellUdtBound(ColumnMetadata column, Operator op, List<ByteBuffer> values, ProtocolVersion protocolVersion)
         {
             super(column, op);
+            checkNotNull(values);
+            checkNotNull(protocolVersion);
+            checkForUnsetValues(values);
             assert column.type.isMultiCell();
             this.values = values;
             this.protocolVersion = protocolVersion;
@@ -730,6 +1034,37 @@ public abstract class ColumnCondition
         {
             return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
         }
+
+        @Override
+        public BoundKind kind()
+        {
+            return BoundKind.MULTI_CELL_UDT;
+        }
+
+        private static final BoundSerializer<MultiCellUdtBound> serializer = new BoundSerializer<MultiCellUdtBound>()
+        {
+            @Override
+            public void serialize(MultiCellUdtBound bound, DataOutputPlus out, int version) throws IOException
+            {
+                serializeList(bound.values, out, version, vintNullableSerializer);
+                out.writeUnsignedVInt(bound.protocolVersion.asInt());
+            }
+
+            @Override
+            public MultiCellUdtBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+            {
+                List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+                int protocolVersion = in.readUnsignedVIntChecked();
+                // Does decode actually do what we want?
+                return new MultiCellUdtBound(column, operator, values, ProtocolVersion.decode(protocolVersion, true));
+            }
+
+            @Override
+            public long serializedSize(MultiCellUdtBound bound, int version)
+            {
+                return serializedListSize(bound.values, version, vintNullableSerializer) + sizeofUnsignedVInt(bound.protocolVersion.asInt());
+            }
+        };
     }
 
     public static class Raw
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 02931005a7..fe68304598 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -18,30 +18,61 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.index.IndexRegistry;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.cql3.*;
+import accord.api.Update;
+import accord.primitives.Txn;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.UpdateParameters;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.Partition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.CASRequest;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.TimeUUID;
 
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.cassandra.service.accord.txn.TxnDataName.Kind.USER;
 
 /**
  * Processed CAS conditions and update on potentially multiple rows of the same partition.
@@ -343,6 +374,8 @@ public class CQL3CasRequest implements CASRequest
         }
 
         public abstract boolean appliesTo(FilteredPartition current) throws InvalidRequestException;
+
+        public abstract TxnCondition asTxnCondition();
     }
 
     private static class NotExistCondition extends RowCondition
@@ -356,6 +389,14 @@ public class CQL3CasRequest implements CASRequest
         {
             return current.getRow(clustering) == null;
         }
+
+        @Override
+        public TxnCondition asTxnCondition()
+        {
+            TxnDataName txnDataName = new TxnDataName(USER, clustering, TxnRead.DUMMY_NAME);
+            TxnReference txnReference = new TxnReference(txnDataName, null);
+            return new TxnCondition.Exists(txnReference, TxnCondition.Kind.IS_NULL);
+        }
     }
 
     private static class ExistCondition extends RowCondition
@@ -369,6 +410,14 @@ public class CQL3CasRequest implements CASRequest
         {
             return current.getRow(clustering) != null;
         }
+
+        @Override
+        public TxnCondition asTxnCondition()
+        {
+            TxnDataName txnDataName = new TxnDataName(USER, clustering, TxnRead.DUMMY_NAME);
+            TxnReference txnReference = new TxnReference(txnDataName, null);
+            return new TxnCondition.Exists(txnReference, TxnCondition.Kind.IS_NOT_NULL);
+        }
     }
 
     private static class ColumnsConditions extends RowCondition
@@ -399,6 +448,12 @@ public class CQL3CasRequest implements CASRequest
             }
             return true;
         }
+
+        @Override
+        public TxnCondition asTxnCondition()
+        {
+            return new TxnCondition.ColumnConditionsAdapter(clustering, conditions.values());
+        }
     }
     
     @Override
@@ -406,4 +461,59 @@ public class CQL3CasRequest implements CASRequest
     {
         return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
+
+    @Override
+    public Txn toAccordTxn(ClientState clientState, int nowInSecs) {
+        SinglePartitionReadCommand readCommand = readCommand(nowInSecs);
+        Update update = createUpdate(clientState);
+        // In a CAS request only one key is supported and writes
+        // can't be dependent on any data that is read (only conditions)
+        // so the only relevant keys are the read key
+        TxnRead read = TxnRead.createRead(readCommand);
+        return new Txn.InMemory(read.keys(), read, TxnQuery.CONDITION, update);
+    }
+
+    private Update createUpdate(ClientState clientState)
+    {
+        return new TxnUpdate(createWriteFragments(clientState), createCondition());
+    }
+
+    private TxnCondition createCondition()
+    {
+        List<TxnCondition> txnConditions = new ArrayList<>();
+        if (staticConditions != null)
+        {
+            txnConditions.add(staticConditions.asTxnCondition());
+        }
+        conditions.values()
+                  .stream()
+                  .map(RowCondition::asTxnCondition)
+                  .forEach(txnConditions::add);
+        // CAS forbids empty conditions
+        checkState(!txnConditions.isEmpty());
+        return conditions.size() == 1 ? txnConditions.get(0) : new TxnCondition.BooleanGroup(TxnCondition.Kind.AND, txnConditions);
+    }
+
+    private List<TxnWrite.Fragment> createWriteFragments(ClientState state)
+    {
+        List<TxnWrite.Fragment> fragments = new ArrayList<>();
+        int idx = 0;
+        for (RowUpdate update : updates)
+        {
+            ModificationStatement modification = update.stmt;
+            QueryOptions options = update.options;
+            TxnWrite.Fragment fragment = modification.getTxnWriteFragment(idx++, state, options);
+            fragments.add(fragment);
+        }
+        return fragments;
+    }
+
+    @Override
+    public RowIterator toCasResult(TxnData txnData)
+    {
+        FilteredPartition partition = txnData.get(TxnRead.DUMMY);
+        if (partition != null)
+            return partition.rowIterator();
+        return null;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index c7a2782ece..7c6fdd04bd 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * A clustering prefix is the unit of what a {@link ClusteringComparator} can compare.
  * <p>
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 99ae73cc8f..0f39180448 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -32,8 +32,8 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.ListSerializer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 
@@ -123,7 +123,7 @@ public class ListType<T> extends CollectionType<List<T>>
     }
 
     @Override
-    public AbstractType<?> freeze()
+    public ListType<T> freeze()
     {
         if (isMultiCell)
             return getInstance(this.elements, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index be74ff1626..89b6b3423e 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -18,7 +18,12 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.cql3.Json;
@@ -31,11 +36,11 @@ import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MapSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
-import org.apache.cassandra.utils.Pair;
 
 public class MapType<K, V> extends CollectionType<Map<K, V>>
 {
@@ -141,7 +146,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
-    public AbstractType<?> freeze()
+    public MapType<K, V> freeze()
     {
         if (isMultiCell)
             return getInstance(this.keys, this.values, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 67699ac3da..c831e0d5e5 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -18,7 +18,12 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cassandra.cql3.Json;
@@ -114,7 +119,7 @@ public class SetType<T> extends CollectionType<Set<T>>
     }
 
     @Override
-    public AbstractType<?> freeze()
+    public SetType<T> freeze()
     {
         if (isMultiCell)
             return getInstance(this.elements, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
index d454c5e188..5f46e45c0f 100644
--- a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
+++ b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
@@ -40,7 +40,12 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.utils.TimeUUID;
 
-import static org.apache.cassandra.db.ClusteringPrefix.Kind.*;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_START_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_START_BOUND;
 
 /**
  * ValueAccessor allows serializers and other code dealing with raw bytes to operate on different backing types
@@ -130,6 +135,8 @@ public interface ValueAccessor<V>
      */
     default boolean isEmpty(V value)
     {
+        if (value == null)
+            System.out.println("oops");
         return size(value) == 0;
     }
 
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 36e346c3e7..a9cc57e75d 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -25,10 +25,10 @@ import java.util.List;
 import com.google.common.collect.Range;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class CollectionSerializer<T> extends TypeSerializer<T>
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java
index 19966c883c..b61f7ab79c 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,10 +17,13 @@
  */
 package org.apache.cassandra.service;
 
+import accord.primitives.Txn;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.service.paxos.Ballot;
 
 /**
@@ -44,4 +47,8 @@ public interface CASRequest
      * are passed as argument.
      */
     public PartitionUpdate makeUpdates(FilteredPartition current, ClientState clientState, Ballot ballot) throws InvalidRequestException;
+
+    public Txn toAccordTxn(ClientState clientState, int nowInSecs);
+
+    RowIterator toCasResult(TxnData data);
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0e873506d4..f417b0344b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -46,6 +46,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.primitives.Txn;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
@@ -122,6 +123,8 @@ import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.ContentionStrategy;
@@ -145,12 +148,10 @@ import org.apache.cassandra.utils.TimeUUID;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
+import static com.google.common.collect.Iterables.concat;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-import static com.google.common.collect.Iterables.concat;
-import static org.apache.commons.lang3.StringUtils.join;
-
+import static org.apache.cassandra.config.Config.LegacyPaxosStrategy.accord;
 import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
@@ -178,6 +179,7 @@ import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
+import static org.apache.commons.lang3.StringUtils.join;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -318,9 +320,17 @@ public class StorageProxy implements StorageProxyMBean
                                                             key.toString(), keyspaceName, cfName));
         }
 
-        return Paxos.useV2()
-                ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState)
-                : legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime);
+        if (DatabaseDescriptor.getLegacyPaxosStrategy() == accord)
+        {
+            TxnData data = AccordService.instance().coordinate(request.toAccordTxn(clientState, nowInSeconds));
+            return request.toCasResult(data);
+        }
+        else
+        {
+            return Paxos.useV2()
+                   ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState)
+                   : legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime);
+        }
     }
 
     public static RowIterator legacyCas(String keyspaceName,
@@ -1847,7 +1857,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         return consistencyLevel.isSerialConsistency()
-             ? readWithPaxos(group, consistencyLevel, queryStartNanoTime)
+             ? readWithConsensus(group, consistencyLevel, queryStartNanoTime)
              : readRegular(group, consistencyLevel, queryStartNanoTime);
     }
 
@@ -1861,12 +1871,29 @@ public class StorageProxy implements StorageProxyMBean
         return !StorageService.instance.isBootstrapMode();
     }
 
-    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
+    private static PartitionIterator readWithConsensus(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
-        return Paxos.useV2()
-                ? Paxos.read(group, consistencyLevel)
-                : legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime);
+        if (DatabaseDescriptor.getLegacyPaxosStrategy() == accord)
+        {
+            return readWithAccord(group);
+        }
+        else
+        {
+            return Paxos.useV2()
+                   ? Paxos.read(group, consistencyLevel)
+                   : legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime);
+        }
+    }
+
+    private static PartitionIterator readWithAccord(SinglePartitionReadCommand.Group group)
+    {
+        if (group.queries.size() > 1)
+            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
+        TxnRead read = TxnRead.createRead(group.queries.get(0));
+        Txn txn = new Txn.InMemory(read.keys(), read, TxnQuery.ALL, null);
+        TxnData data = AccordService.instance().coordinate(txn);
+        return PartitionIterators.singletonIterator(data.get(TxnRead.DUMMY).rowIterator());
     }
 
     private static PartitionIterator legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 5036e2b57c..b8da523ebf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -43,7 +43,7 @@ import static org.apache.cassandra.utils.CollectionSerializers.serializeCollecti
 import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
 {
@@ -180,7 +180,7 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
         {
             int size = Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version));
             size += CommandSerializers.txnId.serializedSize();
-            size += serializedSizeNullable(command.executeAt(), version.msgVersion, CommandSerializers.timestamp);
+            size += serializedNullableSize(command.executeAt(), version.msgVersion, CommandSerializers.timestamp);
             size += CommandSerializers.status.serializedSize(command.status(), version.msgVersion);
             size += CommandSerializers.kind.serializedSize(command.kind(), version.msgVersion);
             size += serializedCollectionSize(command.deps, version.msgVersion, CommandSerializers.txnId);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
index a0faaef0ff..a41759a911 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
@@ -20,18 +20,22 @@ package org.apache.cassandra.service.accord;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.cassandra.cql3.Lists;
 import org.apache.cassandra.cql3.Maps;
 import org.apache.cassandra.cql3.Sets;
 import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.ArrayClustering;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringPrefix;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -47,6 +51,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static com.google.common.primitives.Ints.checkedCast;
 import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 import static org.apache.cassandra.db.marshal.CollectionType.Kind.LIST;
@@ -71,6 +76,14 @@ public class AccordSerializers
         }
     }
 
+    public static <T> ByteBuffer[] serialize(List<T> items, IVersionedSerializer<T> serializer)
+    {
+        ByteBuffer[] result = new ByteBuffer[items.size()];
+        for (int i = 0, mi = items.size(); i < mi; i++)
+            result[i] = serialize(items.get(i), serializer);
+        return result;
+    }
+
     public static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> serializer)
     {
         try (DataInputBuffer in = new DataInputBuffer(bytes, true))
@@ -157,7 +170,7 @@ public class AccordSerializers
             String table = in.readUTF();
             ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
             TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table);
-            
+
             // TODO: Can the metadata be null if the table has been dropped?
             return metadata.getColumn(name);
         }
@@ -166,8 +179,8 @@ public class AccordSerializers
         public long serializedSize(ColumnMetadata column, int version)
         {
             long size = 0;
-            size += TypeSizes.sizeof(column.ksName);
-            size += TypeSizes.sizeof(column.cfName);
+            size += sizeof(column.ksName);
+            size += sizeof(column.cfName);
             size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes);
             return size;
         }
@@ -193,4 +206,73 @@ public class AccordSerializers
             return TableId.serializedSize();
         }
     };
-}
+
+    public static final IVersionedSerializer<Clustering<?>> clusteringPrefixSerializer = new IVersionedSerializer<Clustering<?>>()
+    {
+        @Override
+        public void serialize(Clustering<?> clustering, DataOutputPlus out, int version) throws IOException
+        {
+            doSerialize(clustering, out);
+        }
+
+        public <V> void doSerialize(Clustering<V> clustering, DataOutputPlus out) throws IOException
+        {
+            if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
+            {
+                out.writeBoolean(true);
+            }
+            else
+            {
+                out.writeBoolean(false);
+                out.writeUnsignedVInt(clustering.size());
+                ValueAccessor<V> accessor = clustering.accessor();
+                for (int i = 0; i < clustering.size(); i++)
+                {
+                    accessor.writeWithVIntLength(clustering.get(i), out);
+                }
+            }
+        }
+
+        @Override
+        public Clustering<?> deserialize(DataInputPlus in, int version) throws IOException
+        {
+            Clustering<?> clustering;
+            if (in.readBoolean())
+            {
+                clustering = Clustering.STATIC_CLUSTERING;
+            }
+            else
+            {
+                int numComponents = in.readUnsignedVIntChecked();
+                byte[][] components = new byte[numComponents][];
+                for (int ci = 0; ci < numComponents; ci++)
+                {
+                    int componentLength = in.readUnsignedVIntChecked();
+                    components[ci] = new byte[componentLength];
+                    in.readFully(components[ci]);
+                }
+                clustering = new ArrayClustering(components);
+            }
+            return clustering;
+        }
+
+        @Override
+        public long serializedSize(Clustering<?> clustering, int version)
+        {
+            return computeSerializedSize(clustering);
+        }
+
+        private <V> long computeSerializedSize(Clustering<V> clustering)
+        {
+            int size = sizeof(true) + sizeofUnsignedVInt(clustering.size());
+            ValueAccessor<V> accessor = clustering.accessor();
+            for (int i = 0; i < clustering.size(); i++)
+            {
+                int valueSize = accessor.size(clustering.get(i));
+                size += valueSize;
+                size += sizeofUnsignedVInt(valueSize);
+            }
+            return size;
+        }
+    };
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index e793b14eec..5568b9b01e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class BeginInvalidationSerializers
 {
@@ -92,12 +92,12 @@ public class BeginInvalidationSerializers
         @Override
         public long serializedSize(InvalidateReply reply, int version)
         {
-            return serializedSizeNullable(reply.supersededBy, version, CommandSerializers.ballot)
+            return serializedNullableSize(reply.supersededBy, version, CommandSerializers.ballot)
                     + CommandSerializers.ballot.serializedSize(reply.accepted, version)
                     + CommandSerializers.status.serializedSize(reply.status, version)
                     + TypeSizes.BOOL_SIZE
-                    + serializedSizeNullable(reply.route, version, KeySerializers.route)
-                    + serializedSizeNullable(reply.homeKey, version, KeySerializers.routingKey);
+                    + serializedNullableSize(reply.route, version, KeySerializers.route)
+                    + serializedNullableSize(reply.homeKey, version, KeySerializers.routingKey);
         }
     };
 }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index 0741c1e662..ce8ef96b48 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -47,7 +47,7 @@ import org.apache.cassandra.service.accord.txn.TxnData;
 import static accord.messages.CheckStatus.SerializationSupport.createOk;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class CheckStatusSerializers
 {
@@ -166,20 +166,20 @@ public class CheckStatusSerializers
             size += CommandSerializers.saveStatus.serializedSize(ok.saveStatus, version);
             size += CommandSerializers.ballot.serializedSize(ok.promised, version);
             size += CommandSerializers.ballot.serializedSize(ok.accepted, version);
-            size += serializedSizeNullable(ok.executeAt, version, CommandSerializers.timestamp);
+            size += serializedNullableSize(ok.executeAt, version, CommandSerializers.timestamp);
             size += TypeSizes.BOOL_SIZE;
             size += CommandSerializers.durability.serializedSize(ok.durability, version);
-            size += serializedSizeNullable(ok.homeKey, version, KeySerializers.routingKey);
-            size += serializedSizeNullable(ok.route, version, KeySerializers.route);
+            size += serializedNullableSize(ok.homeKey, version, KeySerializers.routingKey);
+            size += serializedNullableSize(ok.route, version, KeySerializers.route);
 
             if (!(reply instanceof CheckStatusOkFull))
                 return size;
 
             CheckStatusOkFull okFull = (CheckStatusOkFull) ok;
-            size += serializedSizeNullable(okFull.partialTxn, version, CommandSerializers.partialTxn);
-            size += serializedSizeNullable(okFull.committedDeps, version, DepsSerializer.partialDeps);
-            size += serializedSizeNullable(okFull.writes, version, CommandSerializers.writes);
-            size += serializedSizeNullable((TxnData) okFull.result, version, TxnData.serializer);
+            size += serializedNullableSize(okFull.partialTxn, version, CommandSerializers.partialTxn);
+            size += serializedNullableSize(okFull.committedDeps, version, DepsSerializer.partialDeps);
+            size += serializedNullableSize(okFull.writes, version, CommandSerializers.writes);
+            size += serializedNullableSize((TxnData) okFull.result, version, TxnData.serializer);
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
index b49b898804..ed251016dd 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class CommitSerializers
 {
@@ -64,10 +64,10 @@ public class CommitSerializers
         public long serializedBodySize(Commit msg, int version)
         {
             return CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
-                   + serializedSizeNullable(msg.partialTxn, version, CommandSerializers.partialTxn)
+                   + serializedNullableSize(msg.partialTxn, version, CommandSerializers.partialTxn)
                    + DepsSerializer.partialDeps.serializedSize(msg.partialDeps, version)
-                   + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
-                   + serializedSizeNullable(msg.read, version, ReadDataSerializers.request);
+                   + serializedNullableSize(msg.route, version, KeySerializers.fullRoute)
+                   + serializedNullableSize(msg.read, version, ReadDataSerializers.request);
         }
     };
 
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
index 251b281621..71d3d69ba1 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
@@ -18,10 +18,6 @@
 
 package org.apache.cassandra.service.accord.serializers;
 
-import java.io.IOException;
-
-import javax.annotation.Nullable;
-
 import accord.messages.PreAccept;
 import accord.messages.PreAccept.PreAcceptOk;
 import accord.messages.PreAccept.PreAcceptReply;
@@ -35,9 +31,10 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.serializers.TxnRequestSerializer.WithUnsyncedSerializer;
 
-import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static org.apache.cassandra.utils.NullableSerializer.*;
 
 public class PreacceptSerializers
 {
@@ -67,7 +64,7 @@ public class PreacceptSerializers
         public long serializedBodySize(PreAccept msg, int version)
         {
             return CommandSerializers.partialTxn.serializedSize(msg.partialTxn, version)
-                   + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
+                   + serializedNullableSize(msg.route, version, KeySerializers.fullRoute)
                    + TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - msg.minEpoch);
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index e9907911a9..fbd00f637e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
-
 import javax.annotation.Nullable;
 
 import accord.api.Result;
@@ -45,7 +44,7 @@ import org.apache.cassandra.service.accord.txn.TxnData;
 
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 public class RecoverySerializers
 {
@@ -73,7 +72,7 @@ public class RecoverySerializers
         {
             return CommandSerializers.partialTxn.serializedSize(recover.partialTxn, version)
                    + CommandSerializers.ballot.serializedSize(recover.ballot, version)
-                   + serializedSizeNullable(recover.route, version, KeySerializers.fullRoute);
+                   + serializedNullableSize(recover.route, version, KeySerializers.fullRoute);
         }
     };
 
@@ -149,13 +148,13 @@ public class RecoverySerializers
             long size = CommandSerializers.txnId.serializedSize(recoverOk.txnId, version);
             size += CommandSerializers.status.serializedSize(recoverOk.status, version);
             size += CommandSerializers.ballot.serializedSize(recoverOk.accepted, version);
-            size += serializedSizeNullable(recoverOk.executeAt, version, CommandSerializers.timestamp);
+            size += serializedNullableSize(recoverOk.executeAt, version, CommandSerializers.timestamp);
             size += DepsSerializer.partialDeps.serializedSize(recoverOk.deps, version);
             size += DepsSerializer.deps.serializedSize(recoverOk.earlierCommittedWitness, version);
             size += DepsSerializer.deps.serializedSize(recoverOk.earlierAcceptedNoWitness, version);
             size += TypeSizes.sizeof(recoverOk.rejectsFastPath);
-            size += serializedSizeNullable(recoverOk.writes, version, CommandSerializers.writes);
-            size += serializedSizeNullable((TxnData) recoverOk.result, version, TxnData.serializer);
+            size += serializedNullableSize(recoverOk.writes, version, CommandSerializers.writes);
+            size += serializedNullableSize((TxnData) recoverOk.result, version, TxnData.serializer);
             return size;
         }
 
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
index 120a04fbbe..244efe167e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
@@ -32,6 +35,8 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
+import org.apache.cassandra.cql3.conditions.ColumnCondition.Bound;
+import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
@@ -47,11 +52,17 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.cassandra.service.accord.AccordSerializers.clusteringPrefixSerializer;
 import static org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.service.accord.txn.TxnRead.DUMMY;
 import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
 import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
 import static org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
 
+
 public abstract class TxnCondition
 {
     private interface ConditionSerializer<T extends TxnCondition>
@@ -73,9 +84,12 @@ public abstract class TxnCondition
         GREATER_THAN(">", Operator.GT),
         GREATER_THAN_OR_EQUAL(">=", Operator.GTE),
         LESS_THAN("<", Operator.LT),
-        LESS_THAN_OR_EQUAL("<=", Operator.LTE);
+        LESS_THAN_OR_EQUAL("<=", Operator.LTE),
+        COLUMN_CONDITIONS("COLUMN_CONDITIONS", null);
 
+        @Nonnull
         private final String symbol;
+        @Nullable
         private final Operator operator;
 
         Kind(String symbol, Operator operator)
@@ -104,6 +118,8 @@ public abstract class TxnCondition
                     return BooleanGroup.serializer;
                 case NONE:
                     return None.serializer;
+                case COLUMN_CONDITIONS:
+                    return ColumnConditionsAdapter.serializer;
                 default:
                     throw new IllegalArgumentException();
             }
@@ -300,6 +316,62 @@ public abstract class TxnCondition
         };
     }
 
+    public static class ColumnConditionsAdapter extends TxnCondition {
+        @Nonnull
+        public final Collection<Bound> bounds;
+
+        @Nonnull
+        public final Clustering<?> clustering;
+
+        public ColumnConditionsAdapter(Clustering<?> clustering, Collection<Bound> bounds)
+        {
+            super(Kind.COLUMN_CONDITIONS);
+            checkNotNull(bounds);
+            checkNotNull(clustering);
+            this.bounds = bounds;
+            this.clustering = clustering;
+        }
+
+        @Override
+        public boolean applies(@Nonnull TxnData data)
+        {
+            checkNotNull(data);
+            FilteredPartition partition = data.get(DUMMY);
+            Row row = partition.getRow(clustering);
+            for (Bound bound : bounds)
+            {
+                if (!bound.appliesTo(row))
+                    return false;
+            }
+            return true;
+        }
+
+        private static final ConditionSerializer<ColumnConditionsAdapter> serializer = new ConditionSerializer<ColumnConditionsAdapter>()
+        {
+            @Override
+            public void serialize(ColumnConditionsAdapter condition, DataOutputPlus out, int version) throws IOException
+            {
+                clusteringPrefixSerializer.serialize(condition.clustering, out, version);
+                serializeCollection(condition.bounds, out, version, Bound.serializer);
+            }
+
+            @Override
+            public ColumnConditionsAdapter deserialize(DataInputPlus in, int version, Kind ignored) throws IOException
+            {
+                Clustering<?> clustering = clusteringPrefixSerializer.deserialize(in, version);
+                List<Bound> bounds = deserializeList(in, version, Bound.serializer);
+                return new ColumnConditionsAdapter(clustering, bounds);
+            }
+
+            @Override
+            public long serializedSize(ColumnConditionsAdapter condition, int version)
+            {
+                return clusteringPrefixSerializer.serializedSize(condition.clustering, version)
+                    + serializedCollectionSize(condition.bounds, version, Bound.serializer);
+            }
+        };
+    }
+
     public static class Value extends TxnCondition
     {
         private static final Set<Kind> KINDS = ImmutableSet.of(Kind.EQUAL, Kind.NOT_EQUAL,
@@ -503,7 +575,6 @@ public abstract class TxnCondition
 
     public static final IVersionedSerializer<TxnCondition> serializer = new IVersionedSerializer<TxnCondition>()
     {
-        @SuppressWarnings("unchecked")
         @Override
         public void serialize(TxnCondition condition, DataOutputPlus out, int version) throws IOException
         {
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 50bdfbbada..fee1361a85 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -75,6 +75,7 @@ public class TxnData implements Data, Result, Iterable<FilteredPartition>
         return data.entrySet();
     }
 
+    // This is inadequate
     @Override
     public Data merge(Data data)
     {
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index e4526beabd..57b33eed61 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -24,7 +24,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -37,6 +40,13 @@ import org.apache.cassandra.utils.ObjectSizes;
 import static com.google.common.primitives.Ints.checkedCast;
 import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.service.accord.AccordSerializers.clusteringPrefixSerializer;
+import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
+
 public class TxnDataName implements Comparable<TxnDataName>
 {
     private static final TxnDataName RETURNING = new TxnDataName(Kind.RETURNING);
@@ -71,13 +81,27 @@ public class TxnDataName implements Comparable<TxnDataName>
         }
     }
 
+    @Nonnull
     private final Kind kind;
+
+    @Nonnull
     private final String[] parts;
 
-    public TxnDataName(Kind kind, String... parts)
+    @Nullable
+    private final Clustering<?> clustering;
+
+    public TxnDataName(@Nonnull Kind kind, @Nonnull String... parts)
+    {
+        this(kind, null, parts);
+    }
+
+    public TxnDataName(@Nonnull Kind kind, @Nullable Clustering<?> clustering, @Nonnull String... parts)
     {
+        checkNotNull(kind);
+        checkNotNull(parts);
         this.kind = kind;
         this.parts = parts;
+        this.clustering = clustering;
     }
 
     public static TxnDataName user(String name)
@@ -204,17 +228,19 @@ public class TxnDataName implements Comparable<TxnDataName>
             out.writeUnsignedVInt(t.parts.length);
             for (String part : t.parts)
                 out.writeUTF(part);
+            serializeNullable(t.clustering, out, version, clusteringPrefixSerializer);
         }
 
         @Override
         public TxnDataName deserialize(DataInputPlus in, int version) throws IOException
         {
             Kind kind = Kind.from(in.readByte());
-            int length = checkedCast(in.readUnsignedVInt());
+            int length = in.readUnsignedVIntChecked();
             String[] parts = new String[length];
             for (int i = 0; i < length; i++)
                 parts[i] = in.readUTF();
-            return new TxnDataName(kind, parts);
+            Clustering<?> clustering = deserializeNullable(in, version, clusteringPrefixSerializer);
+            return new TxnDataName(kind, clustering, parts);
         }
 
         @Override
@@ -223,6 +249,7 @@ public class TxnDataName implements Comparable<TxnDataName>
             int size = Byte.BYTES + sizeofUnsignedVInt(t.parts.length);
             for (String part : t.parts)
                 size += TypeSizes.sizeof(part);
+            size += serializedNullableSize(t.clustering, version, clusteringPrefixSerializer);
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 667df37d8c..93100eba3e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -100,7 +100,7 @@ public class TxnNamedRead extends AbstractSerialized<SinglePartitionReadCommand>
         return "TxnNamedRead{name='" + name + '\'' + ", key=" + key + ", update=" + get() + '}';
     }
 
-    public TxnDataName name()
+    public TxnDataName txnDataName()
     {
         return name;
     }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
index d00da39735..3ceb6e79af 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
-
 import javax.annotation.Nullable;
 
 import com.google.common.base.Preconditions;
@@ -40,6 +39,12 @@ public abstract class TxnQuery implements Query
 {
     public static final TxnQuery ALL = new TxnQuery()
     {
+        @Override
+        protected byte type()
+        {
+            return 1;
+        }
+
         @Override
         public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
         {
@@ -49,6 +54,12 @@ public abstract class TxnQuery implements Query
 
     public static final TxnQuery NONE = new TxnQuery()
     {
+        @Override
+        protected byte type()
+        {
+            return 2;
+        }
+
         @Override
         public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
         {
@@ -56,10 +67,34 @@ public abstract class TxnQuery implements Query
         }
     };
 
+    public static final TxnQuery CONDITION = new TxnQuery()
+    {
+        @Override
+        protected byte type()
+        {
+            return 3;
+        }
+
+        @Override
+        public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
+        {
+            TxnUpdate txnUpdate = (TxnUpdate)update;
+            boolean conditionCheck = txnUpdate.checkCondition(data);
+            // If the condition applied an empty result indicates success
+            if (conditionCheck)
+                return new TxnData();
+            else
+                // If it failed to apply the partition contents (if present) are returned and it indicates failure
+                return (TxnData)data;
+        }
+    };
+
     private static final long SIZE = ObjectSizes.measure(ALL);
 
     private TxnQuery() {}
 
+    abstract protected byte type();
+
     public long estimatedSizeOnHeap()
     {
         return SIZE;
@@ -70,8 +105,8 @@ public abstract class TxnQuery implements Query
         @Override
         public void serialize(TxnQuery query, DataOutputPlus out, int version) throws IOException
         {
-            Preconditions.checkArgument(query == null || query == ALL || query == NONE);
-            out.writeByte(query == null ? 0 : query == ALL ? 1 : 2);
+            Preconditions.checkArgument(query == null || query == ALL || query == NONE || query == CONDITION);
+            out.writeByte(query == null ? 0 : query.type());
         }
 
         @Override
@@ -83,13 +118,14 @@ public abstract class TxnQuery implements Query
                 case 0: return null;
                 case 1: return ALL;
                 case 2: return NONE;
+                case 3: return CONDITION;
             }
         }
 
         @Override
         public long serializedSize(TxnQuery query, int version)
         {
-            Preconditions.checkArgument(query == null || query == ALL || query == NONE);
+            Preconditions.checkArgument(query == null || query == ALL || query == NONE || query == CONDITION);
             return TypeSizes.sizeof((byte)2);
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 2aa91be7e6..5811c8787d 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -24,6 +24,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.BiConsumer;
 
+import com.google.common.collect.ImmutableList;
+
 import accord.api.Data;
 import accord.api.DataStore;
 import accord.api.Key;
@@ -33,6 +35,7 @@ import accord.primitives.Keys;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -49,6 +52,9 @@ import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
 
 public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
 {
+    // There is only potentially one partition in a CAS and SERIAL/LOCAL_SERIAL read
+    public static final String DUMMY_NAME = "";
+    public static final TxnDataName DUMMY = TxnDataName.user(DUMMY_NAME);
     private static final long EMPTY_SIZE = ObjectSizes.measure(new TxnRead(new TxnNamedRead[0], null));
 
     private final Keys txnKeys;
@@ -65,6 +71,12 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
         this.txnKeys = txnKeys;
     }
 
+    public static TxnRead createRead(SinglePartitionReadCommand readCommand)
+    {
+        TxnNamedRead read = new TxnNamedRead(DUMMY, readCommand);
+        return new TxnRead(ImmutableList.of(read), Keys.of(read.key()));
+    }
+
     public long estimatedSizeOnHeap()
     {
         long size = EMPTY_SIZE;
@@ -76,7 +88,7 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
     @Override
     int compareNonKeyFields(TxnNamedRead left, TxnNamedRead right)
     {
-        return left.name().compareTo(right.name());
+        return left.txnDataName().compareTo(right.txnDataName());
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index 7b26b2acdd..723dbb30df 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -47,14 +47,13 @@ import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
-import static java.lang.Math.toIntExact;
 import static org.apache.cassandra.service.accord.AccordSerializers.serialize;
 import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
 import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
 import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
 import static org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
 import static org.apache.cassandra.utils.ByteBufferUtil.writeWithVIntLength;
-import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
 
 public class TxnUpdate implements Update
 {
@@ -64,6 +63,9 @@ public class TxnUpdate implements Update
     private final ByteBuffer[] fragments;
     private final ByteBuffer condition;
 
+    // Memoize computation of condition
+    private Boolean conditionResult;
+
     public TxnUpdate(List<TxnWrite.Fragment> fragments, TxnCondition condition)
     {
         // TODO: Figure out a way to shove keys into TxnCondition, and have it implement slice/merge.
@@ -169,9 +171,7 @@ public class TxnUpdate implements Update
     @Override
     public Write apply(Data data)
     {
-        TxnCondition condition = AccordSerializers.deserialize(this.condition, TxnCondition.serializer);
-
-        if (!condition.applies((TxnData) data))
+        if (!checkCondition(data))
             return TxnWrite.EMPTY;
 
         List<TxnWrite.Fragment> fragments = deserialize(this.fragments, TxnWrite.Fragment.serializer);
@@ -185,6 +185,7 @@ public class TxnUpdate implements Update
         return new TxnWrite(updates);
     }
 
+    // Should we serialize the conditionResult?
     public static final IVersionedSerializer<TxnUpdate> serializer = new IVersionedSerializer<TxnUpdate>()
     {
         @Override
@@ -210,6 +211,7 @@ public class TxnUpdate implements Update
             long size = KeySerializers.keys.serializedSize(update.keys, version);
             size += serializedSizeWithVIntLength(update.condition);
             size += serializedArraySize(update.fragments, version, ByteBufferUtil.vintSerializer);
+            assert(ByteBufferUtil.serialized(this, update, version).remaining() == size);
             return size;
         }
     };
@@ -288,4 +290,15 @@ public class TxnUpdate implements Update
             result.addAll(deserialize(bytes, serializer));
         return result;
     }
+
+    // maybeCheckCondition? checkConditionMemoized?
+    public boolean checkCondition(Data data)
+    {
+        // Assert data that was memoized is same as data that is provided?
+        if (conditionResult != null)
+            return conditionResult;
+        TxnCondition condition = AccordSerializers.deserialize(this.condition, TxnCondition.serializer);
+        conditionResult = condition.applies((TxnData) data);
+        return conditionResult;
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
index d25ca2fdd5..dcbd8be587 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.RequestCallbackWithFailure;
 import org.apache.cassandra.service.paxos.Commit.Agreed;
 import org.apache.cassandra.service.paxos.Commit.Committed;
@@ -47,7 +46,7 @@ import static org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExec
 import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 /**
  * Nodes that have promised in response to our prepare, may be missing the latestCommit, meaning we cannot be sure the
@@ -238,7 +237,7 @@ public class PaxosPrepareRefresh implements RequestCallbackWithFailure<PaxosPrep
 
         public long serializedSize(Response response, int version)
         {
-            return serializedSizeNullable(response.isSupersededBy, version, Ballot.Serializer.instance);
+            return serializedNullableSize(response.isSupersededBy, version, Ballot.Serializer.instance);
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
index ab757cbdc5..94fa9dc3a4 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
@@ -20,7 +20,12 @@ package org.apache.cassandra.service.paxos;
 
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
@@ -63,20 +68,30 @@ import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MonotonicClock;
 
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS;
 import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
 import static org.apache.cassandra.net.Verb.PAXOS2_REPAIR_REQ;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.service.paxos.Commit.*;
+import static org.apache.cassandra.service.paxos.Commit.Accepted;
+import static org.apache.cassandra.service.paxos.Commit.Committed;
+import static org.apache.cassandra.service.paxos.Commit.Proposal;
+import static org.apache.cassandra.service.paxos.Commit.isAfter;
+import static org.apache.cassandra.service.paxos.Commit.latest;
+import static org.apache.cassandra.service.paxos.Commit.timestampsClash;
 import static org.apache.cassandra.service.paxos.ContentionStrategy.Type.REPAIR;
 import static org.apache.cassandra.service.paxos.ContentionStrategy.waitUntilForContention;
-import static org.apache.cassandra.service.paxos.Paxos.*;
-import static org.apache.cassandra.service.paxos.PaxosPrepare.*;
+import static org.apache.cassandra.service.paxos.Paxos.Participants;
+import static org.apache.cassandra.service.paxos.Paxos.isInRangeAndShouldProcess;
+import static org.apache.cassandra.service.paxos.Paxos.staleBallotNewerThan;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteAccepted;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteCommitted;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.Status;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.prepareWithBallot;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
 import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
 
 /**
  * Facility to finish any in-progress paxos transaction, and ensure that a quorum of nodes agree on the most recent operation.
@@ -629,7 +644,7 @@ public class PaxosRepair extends AbstractPaxosRepair
         public long serializedSize(Response response, int version)
         {
             return Ballot.sizeInBytes()
-                   + serializedSizeNullable(response.acceptedButNotCommitted, version, Accepted.serializer)
+                   + serializedNullableSize(response.acceptedButNotCommitted, version, Accepted.serializer)
                    + Committed.serializer.serializedSize(response.committed, version);
         }
     }
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index e802cd070f..0afb7cae01 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -723,6 +723,7 @@ public class PaxosState implements PaxosOperationLock
         {
             synchronized (unsafeState.key)
             {
+                // Unused return value?
                 unsafeState.maybeLoad();
                 assert unsafeState.current != null;
 
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 062d65f6e1..6a05b0ef7f 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -23,37 +23,28 @@ package org.apache.cassandra.utils;
  * afterward, and ensure the tests still pass.
  */
 
-import java.io.*;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import net.nicoulaj.compilecommand.annotations.Inline;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.BooleanType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.DateType;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.TimestampType;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
+import static com.google.common.primitives.Ints.checkedCast;
 
 /**
  * Utility methods to make ByteBuffers less painful
@@ -359,6 +350,17 @@ public class ByteBufferUtil
         out.writeUnsignedVInt(bytes.remaining());
         out.write(bytes);
     }
+    public static void writeWithVIntLengthAndNull(ByteBuffer bytes, DataOutputPlus out) throws IOException
+    {
+        if (bytes == null)
+        {
+            out.writeVInt(-1);
+            return;
+        }
+        
+        out.writeVInt(bytes.remaining());
+        out.write(bytes);
+    }
 
     public static void writeWithShortLength(ByteBuffer buffer, DataOutputPlus out) throws IOException
     {
@@ -389,6 +391,17 @@ public class ByteBufferUtil
         return ByteBufferUtil.read(in, length);
     }
 
+    public static ByteBuffer readWithVIntLengthAndNull(DataInputPlus in) throws IOException
+    {
+        int length = checkedCast(in.readVInt());
+        if (length < -1)
+            throw new IOException("Corrupt (negative) value length encountered");
+        if (length == -1)
+            return null;
+
+        return ByteBufferUtil.read(in, length);
+    }
+    
     public static int serializedSizeWithLength(ByteBuffer buffer)
     {
         int size = buffer.remaining();
@@ -401,6 +414,15 @@ public class ByteBufferUtil
         return TypeSizes.sizeofUnsignedVInt(size) + size;
     }
 
+    public static int serializedSizeWithVIntLengthAndNull(ByteBuffer buffer)
+    {
+        if (buffer == null)
+            return TypeSizes.sizeofVInt(-1);
+
+        int size = buffer.remaining();
+        return TypeSizes.sizeofUnsignedVInt(size) + size;
+    }
+
     public static long estimatedSizeOnHeap(ByteBuffer buffer)
     {
         return EMPTY_SIZE_ON_HEAP + buffer.remaining();
@@ -927,6 +949,19 @@ public class ByteBufferUtil
         return true;
     }
 
+    public static <T> ByteBuffer serialized(IVersionedSerializer<T> serializer, T value, int version)
+    {
+        try (DataOutputBuffer dob = new DataOutputBuffer())
+        {
+            serializer.serialize(value, dob, version);
+            return dob.buffer();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static final IVersionedSerializer<ByteBuffer> vintSerializer = new IVersionedSerializer<ByteBuffer>()
     {
         @Override
@@ -947,4 +982,6 @@ public class ByteBufferUtil
             return serializedSizeWithVIntLength(bytes);
         }
     };
+
+    public static final IVersionedSerializer<ByteBuffer> vintNullableSerializer = NullableSerializer.wrap(vintSerializer);
 }
diff --git a/src/java/org/apache/cassandra/utils/NullableSerializer.java b/src/java/org/apache/cassandra/utils/NullableSerializer.java
index a286fe5765..7d834be995 100644
--- a/src/java/org/apache/cassandra/utils/NullableSerializer.java
+++ b/src/java/org/apache/cassandra/utils/NullableSerializer.java
@@ -39,7 +39,7 @@ public class NullableSerializer
         return in.readBoolean() ? serializer.deserialize(in, version) : null;
     }
 
-    public static <T> long serializedSizeNullable(T value, int version, IVersionedSerializer<T> serializer)
+    public static <T> long serializedNullableSize(T value, int version, IVersionedSerializer<T> serializer)
     {
         return value != null
                 ? TypeSizes.sizeof(true) + serializer.serializedSize(value, version)
@@ -61,7 +61,7 @@ public class NullableSerializer
 
             public long serializedSize(T t, int version)
             {
-                return serializedSizeNullable(t, version, wrap);
+                return serializedNullableSize(t, version, wrap);
             }
         };
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
index b49572dc4b..e6d7ff8496 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
@@ -119,5 +119,4 @@ public class ByteBuddyExamples extends TestBaseImpl
             return r.call();
         }
     }
-
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 1c981693e0..383800aa4c 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -18,38 +18,33 @@
 
 package org.apache.cassandra.distributed.test.accord;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.functions.types.utils.Bytes;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.service.accord.AccordService;
 import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.SimpleQueryResult;
-import org.apache.cassandra.service.accord.AccordService;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
-
 public class AccordCQLTest extends AccordTestBase
 {
     private static final Logger logger = LoggerFactory.getLogger(AccordCQLTest.class);
@@ -197,7 +192,7 @@ public class AccordCQLTest extends AccordTestBase
              cluster ->
              {
                  cluster.coordinator(1).execute("INSERT INTO " + currentTable + " (k, c, v) VALUES (0, 0, " + lhs + ");", ConsistencyLevel.ALL);
-             
+
                  String query = "BEGIN TRANSACTION\n" +
                                 "  LET row1 = (SELECT v FROM " + currentTable + " WHERE k = ? LIMIT 1);\n" +
                                 "  SELECT row1.v;\n" +
@@ -206,7 +201,7 @@ public class AccordCQLTest extends AccordTestBase
                                 "  END IF\n" +
                                 "COMMIT TRANSACTION";
                  assertRowEqualsWithPreemptedRetry(cluster, new Object[] { lhs }, query, 0, rhs, 1, 0, 1);
-             
+
                  String check = "BEGIN TRANSACTION\n" +
                                 "  SELECT * FROM " + currentTable + " WHERE k = ? AND c = ?;\n" +
                                 "COMMIT TRANSACTION";
@@ -1990,4 +1985,30 @@ public class AccordCQLTest extends AccordTestBase
              }
         );
     }
+
+    @Test
+    public void testCASAndSerialRead() throws Exception
+    {
+        test("CREATE TABLE " + currentTable + " (id int, c int, v int, s int static, PRIMARY KEY ((id), c));",
+            cluster -> {
+                ICoordinator coordinator = cluster.coordinator(1);
+                int startingAccordCoordinateCount = getAccordCoordinateCount();
+                coordinator.execute("INSERT INTO " + currentTable + " (id, c, v, s) VALUES (1, 2, 3, 5);", ConsistencyLevel.ALL);
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 3, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET v = 4 WHERE id = 1 AND c = 2 IF v = 3");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ false, 4 }, "UPDATE " + currentTable + " SET v = 4 WHERE id = 1 AND c = 2 IF v = 3");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+
+                // Test working with a static column
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ false, 5 }, "UPDATE " + currentTable + " SET v = 5 WHERE id = 1 AND c = 2 IF s = 4");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET v = 5 WHERE id = 1 AND c = 2 IF s = 5");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 5, 5);
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET s = 6 WHERE id = 1 IF s = 5");
+                assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 5, 6);
+                // Make sure all the consensus using queries actually were run on Accord
+                assertEquals( 11, getAccordCoordinateCount() - startingAccordCoordinateCount);
+        });
+    }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 60b583b40a..d1d16b66fa 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -18,23 +18,13 @@
 
 package org.apache.cassandra.distributed.test.accord;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import accord.coordinate.Preempted;
+import accord.primitives.Txn;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.implementation.bind.annotation.This;
-import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-
-import accord.coordinate.Preempted;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
 import org.apache.cassandra.distributed.Cluster;
@@ -43,10 +33,22 @@ import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.utils.AssertionUtils;
 import org.apache.cassandra.utils.FailingConsumer;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static org.junit.Assert.assertArrayEquals;
 
 public abstract class AccordTestBase extends TestBaseImpl
@@ -76,10 +78,10 @@ public abstract class AccordTestBase extends TestBaseImpl
         currentTable = KEYSPACE + ".tbl" + COUNTER.getAndIncrement();
     }
 
-    protected static void assertRow(Cluster cluster, String query, int k, int c, int v)
+    protected static void assertRowSerial(Cluster cluster, String query, int k, int c, int v, int s)
     {
-        Object[][] result = cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM);
-        assertArrayEquals(new Object[]{new Object[] {k, c, v}}, result);
+        Object[][] result = cluster.coordinator(1).execute(query, ConsistencyLevel.SERIAL);
+        assertArrayEquals(new Object[]{new Object[] {k, c, v, s}}, result);
     }
 
     protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws Exception
@@ -105,14 +107,20 @@ public abstract class AccordTestBase extends TestBaseImpl
         test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary key (k, c))", fn);
     }
 
+    protected int getAccordCoordinateCount()
+    {
+        return sharedCluster.get(1).callOnInstance(() -> BBAccordCoordinateCountHelper.count.get());
+    }
+
     private static Cluster createCluster() throws IOException
     {
         // need to up the timeout else tests get flaky
         // disable vnode for now, but should enable before trunk
         return init(Cluster.build(2)
                            .withoutVNodes()
-                           .withConfig(c -> c.with(Feature.NETWORK).set("write_request_timeout", "10s").set("transaction_timeout", "15s"))
+                           .withConfig(c -> c.with(Feature.NETWORK).set("write_request_timeout", "10s").set("transaction_timeout", "15s").set("legacy_paxos_strategy", "accord"))
                            .withInstanceInitializer(EnforceUpdateDoesNotPerformRead::install)
+                           .withInstanceInitializer(BBAccordCoordinateCountHelper::install)
                            .start());
     }
 
@@ -164,6 +172,27 @@ public abstract class AccordTestBase extends TestBaseImpl
             return map;
         }
     }
-    
+
+    public static class BBAccordCoordinateCountHelper
+    {
+        static AtomicInteger count = new AtomicInteger();
+        static void install(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber != 1)
+                return;
+            new ByteBuddy().rebase(AccordService.class)
+                           .method(named("coordinate").and(takesArguments(1)))
+                           .intercept(MethodDelegation.to(BBAccordCoordinateCountHelper.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static TxnData coordinate(Txn txn, @SuperCall Callable<TxnData> actual) throws Exception
+        {
+            count.incrementAndGet();
+            return actual.call();
+        }
+    }
+
     protected abstract Logger logger();
 }
diff --git a/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java b/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
index dd40cf81db..0dc6383637 100644
--- a/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
@@ -17,162 +17,421 @@
  */
 package org.apache.cassandra.cql3.conditions;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UserTypes;
 import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.serializers.ListSerializer;
+import org.apache.cassandra.serializers.MapSerializer;
+import org.apache.cassandra.serializers.SetSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.TimeUUID;
 
+import static org.apache.cassandra.cql3.Operator.CONTAINS;
+import static org.apache.cassandra.cql3.Operator.CONTAINS_KEY;
+import static org.apache.cassandra.cql3.Operator.EQ;
+import static org.apache.cassandra.cql3.Operator.GT;
+import static org.apache.cassandra.cql3.Operator.GTE;
+import static org.apache.cassandra.cql3.Operator.LT;
+import static org.apache.cassandra.cql3.Operator.LTE;
+import static org.apache.cassandra.cql3.Operator.NEQ;
+import static org.apache.cassandra.transport.ProtocolVersion.CURRENT;
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import static org.apache.cassandra.cql3.Operator.*;
-import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
 
-public class ColumnConditionTest
+public class ColumnConditionTest extends CQLTester
 {
     public static final ByteBuffer ZERO = Int32Type.instance.fromString("0");
     public static final ByteBuffer ONE = Int32Type.instance.fromString("1");
     public static final ByteBuffer TWO = Int32Type.instance.fromString("2");
 
-    private static Row newRow(ColumnMetadata definition, ByteBuffer value)
+    private static final ListType<Integer> listType = ListType.getInstance(Int32Type.instance, true);
+    private static final MapType<Integer, Integer> mapType = MapType.getInstance(Int32Type.instance, Int32Type.instance, true);
+    private static final SetType<Integer> setType = SetType.getInstance(Int32Type.instance, true);
+
+    private Row newRow(ColumnMetadata definition, ByteBuffer value)
     {
-        BufferCell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, null);
+        BufferCell cell;
+        if (definition.type.isUDT() )
+        {
+            if (definition.type.isMultiCell()) {
+                CellPath cellPath = udtType.cellPathForField(udtType.fieldName(0));
+                cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, cellPath);
+            }
+            else
+            {
+                ByteBuffer udtValue = UserType.buildValue(value, TWO);
+                cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, udtValue, null);
+            }
+        }
+        else
+        {
+            cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, null);
+        }
         return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
     }
 
     private static Row newRow(ColumnMetadata definition, List<ByteBuffer> values)
     {
-        Row.Builder builder = BTreeRow.sortedBuilder();
-        builder.newRow(Clustering.EMPTY);
-        long now = System.currentTimeMillis();
-        if (values != null)
+        AbstractType<?> type = definition.type;
+        if (type.isFrozenCollection())
+        {
+            ByteBuffer cellValue = ListSerializer.pack(values, values.size(), CURRENT);
+            Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+            return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+        }
+        else
         {
-            for (int i = 0, m = values.size(); i < m; i++)
+            Row.Builder builder = BTreeRow.sortedBuilder();
+            builder.newRow(Clustering.EMPTY);
+            long now = System.currentTimeMillis();
+            if (values != null)
             {
-                TimeUUID uuid = TimeUUID.Generator.atUnixMillis(now, i);
-                ByteBuffer key = uuid.toBytes();
-                ByteBuffer value = values.get(i);
-                BufferCell cell = new BufferCell(definition,
-                                                 0L,
-                                                 Cell.NO_TTL,
-                                                 Cell.NO_DELETION_TIME,
-                                                 value,
-                                                 CellPath.create(key));
-                builder.addCell(cell);
+                for (int i = 0, m = values.size(); i < m; i++)
+                {
+                    BufferCell cell;
+                    if (type.isUDT())
+                    {
+                        cell = new BufferCell(definition,
+                                              0L,
+                                              Cell.NO_TTL,
+                                              Cell.NO_DELETION_TIME,
+                                              values.get(i),
+                                              ((UserType)type).cellPathForField(((UserType) type).fieldName(i)));
+                    }
+                    else
+                    {
+                        TimeUUID uuid = TimeUUID.Generator.atUnixMillis(now, i);
+                        ByteBuffer key = uuid.toBytes();
+                        ByteBuffer value = values.get(i);
+                        cell = new BufferCell(definition,
+                                              0L,
+                                              Cell.NO_TTL,
+                                              Cell.NO_DELETION_TIME,
+                                              value,
+                                              CellPath.create(key));
+                    }
+                    builder.addCell(cell);
+                }
             }
+            return builder.build();
         }
-        return builder.build();
     }
 
     private static Row newRow(ColumnMetadata definition, SortedSet<ByteBuffer> values)
     {
-        Row.Builder builder = BTreeRow.sortedBuilder();
-        builder.newRow(Clustering.EMPTY);
-        if (values != null)
+        if (definition.type.isFrozenCollection())
         {
-            for (ByteBuffer value : values)
+            ByteBuffer cellValue = SetSerializer.pack(values, values.size(), CURRENT);
+            Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+            return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+        }
+        else
+        {
+            Row.Builder builder = BTreeRow.sortedBuilder();
+            builder.newRow(Clustering.EMPTY);
+            if (values != null)
             {
-                BufferCell cell = new BufferCell(definition,
-                                                 0L,
-                                                 Cell.NO_TTL,
-                                                 Cell.NO_DELETION_TIME,
-                                                 ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                                                 CellPath.create(value));
-                builder.addCell(cell);
+                for (ByteBuffer value : values)
+                {
+                    BufferCell cell = new BufferCell(definition,
+                                                     0L,
+                                                     Cell.NO_TTL,
+                                                     Cell.NO_DELETION_TIME,
+                                                     ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                     CellPath.create(value));
+                    builder.addCell(cell);
+                }
             }
+            return builder.build();
         }
-        return builder.build();
     }
 
-    private static Row newRow(ColumnMetadata definition, Map<ByteBuffer, ByteBuffer> values)
+    private static Row newRow(ColumnMetadata definition, SortedMap<ByteBuffer, ByteBuffer> values)
     {
-        Row.Builder builder = BTreeRow.sortedBuilder();
-        builder.newRow(Clustering.EMPTY);
-        if (values != null)
+        if (definition.type.isFrozenCollection())
+        {
+            List<ByteBuffer> packableValues = values.entrySet().stream().flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())).collect(Collectors.toList());
+            ByteBuffer cellValue = MapSerializer.pack(packableValues, values.size(), CURRENT);
+            Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+            return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+        }
+        else
         {
-            for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+            Row.Builder builder = BTreeRow.sortedBuilder();
+            builder.newRow(Clustering.EMPTY);
+            if (values != null)
             {
-                BufferCell cell = new BufferCell(definition,
-                                                 0L,
-                                                 Cell.NO_TTL,
-                                                 Cell.NO_DELETION_TIME,
-                                                 entry.getValue(),
-                                                 CellPath.create(entry.getKey()));
-                builder.addCell(cell);
+                for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+                {
+                    BufferCell cell = new BufferCell(definition,
+                                                     0L,
+                                                     Cell.NO_TTL,
+                                                     Cell.NO_DELETION_TIME,
+                                                     entry.getValue(),
+                                                     CellPath.create(entry.getKey()));
+                    builder.addCell(cell);
+                }
             }
+            return builder.build();
+        }
+    }
+
+    private static boolean testRoundtripped(ColumnCondition.Bound bound, Row row)
+    {
+        DataOutputBuffer dab = new DataOutputBuffer();
+        IVersionedSerializer<ColumnCondition.Bound> serializer = ColumnCondition.Bound.serializer;
+        int version = MessagingService.current_version;
+        try
+        {
+            serializer.serialize(bound, dab, version);
+            assertEquals(serializer.serializedSize(bound, version), dab.position());
+            ColumnCondition.Bound deserializedBound = serializer.deserialize(new DataInputBuffer(dab.buffer(), false), version);
+            boolean originalResult = bound.appliesTo(row);
+            assertEquals(originalResult, deserializedBound.appliesTo(row));
+            return originalResult;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
-        return builder.build();
     }
 
-    private static boolean conditionApplies(ByteBuffer rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionApplies(ByteBuffer rowValue, Operator op, ByteBuffer conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", Int32Type.instance);
+        AbstractType<?> columnType = Int32Type.instance;
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, false), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        boolean regularColumnResult = testRoundtripped(bound, newRow(definition, rowValue));
+        // Every simple bound test is also a valid test of the UDT access path
+        boolean conditionUDTApplies = conditionUDTApplies(rowValue, op, conditionValue);
+        assertEquals(regularColumnResult, conditionUDTApplies);
+        return regularColumnResult;
     }
 
-    private static boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
+    private boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
+        boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, listType);
+        boolean frozenResult = conditionApplies(rowValue, op, conditionValue, listType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue, ListType<Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Lists.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    {
+        boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, listType);
+        boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, listType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return frozenResult;
     }
 
-    private static boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, ListType<Integer> columnType)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
     }
 
-    private static boolean conditionContainsApplies(Map<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionContainsApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
+        boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, mapType);
+        boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, mapType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return frozenResult;
+    }
+
+    private boolean conditionContainsApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, MapType<Integer, Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue)
+    {
+        boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, setType);
+        boolean frozenResult = conditionApplies(rowValue, op, conditionValue, setType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
     }
 
-    private static boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue)
+    private boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue, SetType<Integer> columnType)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", SetType.getInstance(Int32Type.instance, true));
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Sets.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
     }
 
-    private static boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+    private boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", SetType.getInstance(Int32Type.instance, true));
+        boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, setType);
+        boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, setType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return frozenResult;
+    }
+
+    private boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, SetType<Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue)
+    {
+        boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, mapType);
+        boolean frozenResult = conditionApplies(rowValue, op, conditionValue, mapType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
     }
 
-    private static boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue)
+    private boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue, MapType<Integer, Integer> columnType)
     {
-        ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
         ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Maps.Value(conditionValue)));
         ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
-        return bound.appliesTo(newRow(definition, rowValue));
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionElementApplies(List<ByteBuffer> values, ByteBuffer elementIndex, Operator op, ByteBuffer elementValue)
+    {
+        boolean nonFrozenResult = conditionElementApplies(values, elementIndex, op, elementValue, listType);
+        boolean frozenResult = conditionElementApplies(values, elementIndex, op, elementValue, listType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionElementApplies(List<ByteBuffer> values, ByteBuffer elementIndex, Operator op, ByteBuffer elementValue, ListType<Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
+        ColumnCondition condition = ColumnCondition.condition(definition, new Constants.Value(elementIndex), op, Terms.of(new Constants.Value(elementValue)));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, values));
+    }
+
+    private boolean conditionElementApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, ByteBuffer elementKey, Operator op, ByteBuffer elementValue)
+    {
+        boolean nonFrozenResult = conditionElementApplies(rowValue, elementKey, op, elementValue, mapType);
+        boolean frozenResult = conditionElementApplies(rowValue, elementKey, op, elementValue, mapType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionElementApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, ByteBuffer elementKey, Operator op, ByteBuffer elementValue, MapType<Integer, Integer> columnType)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
+        ColumnCondition condition = ColumnCondition.condition(definition, new Constants.Value(elementKey), op, Terms.of(new Constants.Value(elementValue)));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    private boolean conditionUDTApplies(ByteBuffer fieldValue, Operator op, ByteBuffer elementValue)
+    {
+        boolean nonFrozenResult = conditionUDTApplies(fieldValue, op, elementValue, udtType);
+        boolean frozenResult = conditionUDTApplies(fieldValue, op, elementValue, udtType.freeze());
+        assertEquals(nonFrozenResult, frozenResult);
+        return nonFrozenResult;
+    }
+
+    private boolean conditionUDTApplies(ByteBuffer fieldValue, Operator op, ByteBuffer elementValue, UserType type)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(type, !type.isMultiCell()), "c", type);
+        ColumnCondition condition = ColumnCondition.condition(definition, type.fieldName(0), op, Terms.of(new Constants.Value(elementValue)));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, fieldValue));
+    }
+
+    private boolean conditionUDTApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
+    {
+        ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(udtType, false), "c", udtType);
+        Term term = conditionValue == null ? Constants.NULL_VALUE : new UserTypes.Value(udtType, conditionValue.toArray(new ByteBuffer[0]));
+        ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(term));
+        ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+        return testRoundtripped(bound, newRow(definition, rowValue));
+    }
+
+    public void beforeTest() throws Throwable
+    {
+        super.beforeTest();
+        String typeName = createType("CREATE TYPE %s (a int, b int);");
+        udtType = Schema.instance.getKeyspaceMetadata(KEYSPACE).types.get(ByteBufferUtil.bytes(typeName)).get();
+    }
+    @Override
+    public void afterTest() throws Throwable
+    {
+        super.afterTest();
+        typeToTable.clear();
+        udtType = null;
+    }
+
+    // Is this useful enough to have in CQL tester?
+    private UserType udtType;
+    private final Map<Pair<AbstractType<?>, Boolean>, String> typeToTable = new HashMap<>();
+
+    private String maybeCreateTable(AbstractType<?> columnType, boolean frozen)
+    {
+        String columnTypeCQL = columnType.asCQL3Type().toString();
+        String maybeFrozenColumnTypeCQL = frozen ? columnTypeCQL : "frozen<%s>".format(columnTypeCQL);
+        return typeToTable.computeIfAbsent(Pair.create(columnType, frozen), type -> createTable("CREATE TABLE %s (id int primary key, c " + maybeFrozenColumnTypeCQL + ")"));
     }
 
     @FunctionalInterface
@@ -194,7 +453,7 @@ public class ColumnConditionTest
     }
 
     @Test
-    public void testSimpleBoundIsSatisfiedByValue() throws InvalidRequestException
+    public void testSimpleAndUDTBoundIsSatisfiedByValue() throws InvalidRequestException
     {
         // EQ
         assertTrue(conditionApplies(ONE, EQ, ONE));
@@ -359,6 +618,18 @@ public class ColumnConditionTest
         assertFalse(conditionContainsApplies(list(ZERO, ONE, TWO), CONTAINS, ByteBufferUtil.EMPTY_BYTE_BUFFER));
         assertFalse(conditionContainsApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS, ONE));
         assertTrue(conditionContainsApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+
+        //ELEMENT
+        assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, ZERO));
+        assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, ONE));
+        assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, TWO));
+
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, ONE));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, TWO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, ZERO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, TWO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, ZERO));
+        assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, ONE));
     }
 
     private static SortedSet<ByteBuffer> set(ByteBuffer... values)
@@ -611,5 +882,96 @@ public class ColumnConditionTest
         assertTrue(conditionContainsApplies(map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS_KEY, ONE));
         assertFalse(conditionContainsApplies(map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS_KEY, ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
+        // Element access
+        assertTrue(conditionElementApplies(map(ZERO, ONE), ZERO, EQ, ONE));
+        assertFalse(conditionElementApplies(map(ZERO, ONE), ZERO, EQ, TWO));
+        assertFalse(conditionElementApplies(map(ZERO, ONE), ONE, EQ, TWO));
+        assertFalse(conditionElementApplies(map(), ZERO, EQ, TWO));
+    }
+
+    @Test
+    public void testMultiCellUDTBound() throws InvalidRequestException
+    {
+        // EQ
+        assertTrue(conditionUDTApplies(list(ONE), EQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list(ZERO)));
+        assertFalse(conditionUDTApplies(list(ZERO), EQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE, ONE), EQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list(ONE, ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list()));
+
+        assertFalse(conditionUDTApplies(list(ONE), EQ, (List<ByteBuffer>)null));
+        assertFalse(conditionUDTApplies(null, EQ, list(ONE)));
+        assertTrue(conditionUDTApplies((List<ByteBuffer>) null, EQ, (List<ByteBuffer>) null));
+
+        assertFalse(conditionUDTApplies(list(ONE), EQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), EQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), EQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // NEQ
+        assertFalse(conditionUDTApplies(list(ONE), NEQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ZERO)));
+        assertTrue(conditionUDTApplies(list(ZERO), NEQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE, ONE), NEQ, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ONE, ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list()));
+        assertTrue(conditionUDTApplies(list(), NEQ, list(ONE)));
+
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, (List<ByteBuffer>)null));
+        assertTrue(conditionUDTApplies(null, NEQ, list(ONE)));
+        assertFalse(conditionUDTApplies((List<ByteBuffer>) null, NEQ, (List<ByteBuffer>) null));
+
+        assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), NEQ, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), NEQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // LT
+        assertFalse(conditionUDTApplies(list(ONE), LT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(), LT, list()));
+        assertFalse(conditionUDTApplies(list(ONE), LT, list(ZERO)));
+        assertTrue(conditionUDTApplies(list(ZERO), LT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE, ONE), LT, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), LT, list(ONE, ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), LT, list()));
+
+        assertFalse(conditionUDTApplies(list(ONE), LT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // LTE
+        assertTrue(conditionUDTApplies(list(ONE), LTE, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), LTE, list(ZERO)));
+        assertTrue(conditionUDTApplies(list(ZERO), LTE, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE, ONE), LTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), LTE, list(ONE, ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), LTE, list()));
+
+        assertFalse(conditionUDTApplies(list(ONE), LTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // GT
+        assertFalse(conditionUDTApplies(list(ONE), GT, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GT, list(ZERO)));
+        assertFalse(conditionUDTApplies(list(ZERO), GT, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE, ONE), GT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), GT, list(ONE, ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GT, list()));
+
+        assertTrue(conditionUDTApplies(list(ONE), GT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GT, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+        // GTE
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list(ZERO)));
+        assertFalse(conditionUDTApplies(list(ZERO), GTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ONE, ONE), GTE, list(ONE)));
+        assertFalse(conditionUDTApplies(list(ONE), GTE, list(ONE, ONE)));
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list()));
+
+        assertTrue(conditionUDTApplies(list(ONE), GTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+        assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GTE, list(ONE)));
+        assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
index 8edb06afd2..17088c8ad2 100644
--- a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
@@ -18,19 +18,11 @@
 
 package org.apache.cassandra.service.accord.txn;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
 import accord.api.Key;
 import accord.primitives.Keys;
 import accord.primitives.Txn;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -48,6 +40,9 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 
+import java.nio.ByteBuffer;
+import java.util.*;
+
 public class TxnBuilder
 {
     private final List<TxnNamedRead> reads = new ArrayList<>();
@@ -107,7 +102,7 @@ public class TxnBuilder
     private TxnReference reference(TxnDataName name, String column)
     {
         // do any reads match the name?
-        Optional<TxnNamedRead> match = reads.stream().filter(n -> n.name().equals(name)).findFirst();
+        Optional<TxnNamedRead> match = reads.stream().filter(n -> n.txnDataName().equals(name)).findFirst();
         if (!match.isPresent())
             throw new IllegalArgumentException("Attempted to create a reference for " + name + " but no read exists with that name");
         TxnNamedRead read = match.get();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 04/05: removing TxnBuilder-based tests from TransactionStatementTest

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d4a8b5b39fd3f655eb2f6a6f0638f849cadd5494
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Dec 6 17:10:04 2022 -0600

    removing TxnBuilder-based tests from TransactionStatementTest
---
 src/java/org/apache/cassandra/cql3/Constants.java  |   2 +
 .../cql3/transactions/ReferenceOperation.java      |   1 +
 .../cql3/statements/TransactionStatementTest.java  | 295 ---------------------
 .../cassandra/service/accord/txn/TxnBuilder.java   |   2 +-
 4 files changed, 4 insertions(+), 296 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 0badfd967e..5bdbe823f4 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -490,6 +490,8 @@ public abstract class Constants
             {
                 @SuppressWarnings("unchecked") NumberType<Number> type = (NumberType<Number>) column.type;
                 ByteBuffer increment = t.bindAndGet(params.options);
+                if (increment == null)
+                    throw new InvalidRequestException("Invalid null value for number increment");
                 ByteBuffer current = getCurrentCellBuffer(partitionKey, params);
                 if (current == null)
                     return;
diff --git a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
index 5f25ada796..c944fdd132 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
@@ -70,6 +70,7 @@ public class ReferenceOperation
         ColumnMetadata receiver = operation.column;
         ReferenceValue value = new ReferenceValue.Constant(operation.term());
 
+        // TODO: Receiver might need to be modified here, just as we do in Raw#prepare()
         Term key = extractKeyOrIndex(operation);
         FieldIdentifier field = extractField(operation);
         return new ReferenceOperation(receiver, kind, key, field, value);
diff --git a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
index 492e3e67a2..d83cb0ae44 100644
--- a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
+++ b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
@@ -18,47 +18,17 @@
 
 package org.apache.cassandra.cql3.statements;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.cassandra.service.accord.txn.*;
 import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import accord.primitives.Txn;
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.VariableSpecifications;
-import org.apache.cassandra.db.BufferClustering;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.Columns;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.RegularAndStaticColumns;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.BTreeRow;
-import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.accord.txn.TxnReferenceValue.Substitution;
-
-import static org.junit.Assert.assertEquals;
 
 import static org.apache.cassandra.cql3.statements.TransactionStatement.DUPLICATE_TUPLE_NAME_MESSAGE;
 import static org.apache.cassandra.cql3.statements.TransactionStatement.EMPTY_TRANSACTION_MESSAGE;
@@ -67,13 +37,11 @@ import static org.apache.cassandra.cql3.statements.TransactionStatement.INCOMPLE
 import static org.apache.cassandra.cql3.statements.TransactionStatement.NO_CONDITIONS_IN_UPDATES_MESSAGE;
 import static org.apache.cassandra.cql3.statements.TransactionStatement.NO_TIMESTAMPS_IN_UPDATES_MESSAGE;
 import static org.apache.cassandra.cql3.statements.TransactionStatement.SELECT_REFS_NEED_COLUMN_MESSAGE;
-import static org.apache.cassandra.service.accord.txn.TxnDataName.user;
 import static org.apache.cassandra.cql3.statements.UpdateStatement.UPDATING_PRIMARY_KEY_MESSAGE;
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
 import static org.apache.cassandra.cql3.transactions.RowDataReference.CANNOT_FIND_TUPLE_MESSAGE;
 import static org.apache.cassandra.cql3.transactions.RowDataReference.COLUMN_NOT_IN_TUPLE_MESSAGE;
 import static org.apache.cassandra.schema.TableMetadata.UNDEFINED_COLUMN_NAME_MESSAGE;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class TransactionStatementTest
 {
@@ -82,11 +50,6 @@ public class TransactionStatementTest
     private static final TableId TABLE3_ID = TableId.fromString("00000000-0000-0000-0000-000000000003");
     private static final TableId TABLE4_ID = TableId.fromString("00000000-0000-0000-0000-000000000004");
 
-    private static TableMetadata TABLE1;
-    private static TableMetadata TABLE2;
-    private static TableMetadata TABLE3;
-    private static TableMetadata TABLE4;
-
     @BeforeClass
     public static void beforeClass() throws Exception
     {
@@ -96,11 +59,6 @@ public class TransactionStatementTest
                                     parse("CREATE TABLE tbl2 (k int, c int, v int, primary key (k, c))", "ks").id(TABLE2_ID),
                                     parse("CREATE TABLE tbl3 (k int PRIMARY KEY, \"with spaces\" int, \"with\"\"quote\" int, \"MiXeD_CaSe\" int)", "ks").id(TABLE3_ID),
                                     parse("CREATE TABLE tbl4 (k int PRIMARY KEY, int_list list<int>)", "ks").id(TABLE4_ID));
-
-        TABLE1 = Schema.instance.getTableMetadata("ks", "tbl1");
-        TABLE2 = Schema.instance.getTableMetadata("ks", "tbl2");
-        TABLE3 = Schema.instance.getTableMetadata("ks", "tbl3");
-        TABLE4 = Schema.instance.getTableMetadata("ks", "tbl4");
     }
 
     @Test
@@ -359,257 +317,4 @@ public class TransactionStatementTest
                 .isInstanceOf(InvalidRequestException.class)
                 .hasMessageContaining(String.format(COLUMN_NOT_IN_TUPLE_MESSAGE, "q", "row1"));
     }
-
-    @Test
-    public void simpleQueryTest()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "  LET row1 = (SELECT * FROM ks.tbl1 WHERE k=1 AND c=?);\n" +
-                       "  LET row2 = (SELECT * FROM ks.tbl2 WHERE k=2 AND c=2);\n" +
-                       "  SELECT v FROM ks.tbl1 WHERE k=2 AND c=?;\n" +
-                       "  IF row1 IS NOT NULL AND row1.v = 3 AND row2.v=? THEN\n" +
-                       "    UPDATE ks.tbl1 SET v=? WHERE k=1 AND c=2;\n" +
-                       "  END IF\n" +
-                       "COMMIT TRANSACTION";
-
-        Txn expected = TxnBuilder.builder()
-                                 .withRead("row1", "SELECT * FROM ks.tbl1 WHERE k=1 AND c=2")
-                                 .withRead("row2", "SELECT * FROM ks.tbl2 WHERE k=2 AND c=2")
-                                 .withRead(TxnDataName.returning(), "SELECT v FROM ks.tbl1 WHERE k=2 AND c=2")
-                                 .withWrite("UPDATE ks.tbl1 SET v=1 WHERE k=1 AND c=2")
-                                 .withIsNotNullCondition(user("row1"), null)
-                                 .withEqualsCondition("row1", "v", bytes(3))
-                                 .withEqualsCondition("row2", "v", bytes(4))
-                                 .build();
-
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
-        TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls());
-        List<ByteBuffer> values = ImmutableList.of(bytes(2), bytes(2), bytes(4), bytes(1));
-        Txn actual = statement.createTxn(ClientState.forInternalCalls(), QueryOptions.forInternalCalls(values));
-
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void txnWithReturningStatement()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "  LET row1 = (SELECT * FROM ks.tbl1 WHERE k=1 AND c=2);\n" +
-                       "  LET row2 = (SELECT * FROM ks.tbl2 WHERE k=2 AND c=2);\n" +
-                       "  SELECT row1.v, row2.v;\n" +
-                       "  IF row1 IS NOT NULL AND row1.v = 3 AND row2.v=4 THEN\n" +
-                       "    UPDATE ks.tbl1 SET v=1 WHERE k=1 AND c=2;\n" +
-                       "  END IF\n" +
-                       "COMMIT TRANSACTION";
-
-        Txn expected = TxnBuilder.builder()
-                                 .withRead("row1", "SELECT * FROM ks.tbl1 WHERE k=1 AND c=2")
-                                 .withRead("row2", "SELECT * FROM ks.tbl2 WHERE k=2 AND c=2")
-                                 .withWrite("UPDATE ks.tbl1 SET v=1 WHERE k=1 AND c=2")
-                                 .withIsNotNullCondition(user("row1"), null)
-                                 .withEqualsCondition("row1", "v", bytes(3))
-                                 .withEqualsCondition("row2", "v", bytes(4))
-                                 .build();
-
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
-        TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls());
-        Txn actual = statement.createTxn(ClientState.forInternalCalls(), QueryOptions.DEFAULT);
-
-        assertEquals(expected, actual);
-        assertEquals(2, statement.getReturningReferences().size());
-    }
-
-    @Test
-    public void testQuotedColumnNames()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "  LET row1 = (SELECT * FROM ks.tbl3 WHERE k=1);\n" +
-                       "  SELECT \"row1\".\"with spaces\", row1.\"with\"\"quote\", row1.\"MiXeD_CaSe\";\n" +
-                       "  IF row1.\"with spaces\" IS NULL THEN\n" +
-                       "    INSERT INTO ks.tbl3 (k, \"with spaces\") VALUES (1, 2);\n" +
-                       "  END IF\n" +
-                       "COMMIT TRANSACTION";
-
-        Txn expected = TxnBuilder.builder()
-                                 .withRead("row1", "SELECT * FROM ks.tbl3 WHERE k=1")
-                                 .withWrite("INSERT INTO ks.tbl3 (k, \"with spaces\") VALUES (1, 2)")
-                                 .withIsNullCondition(user("row1"), "with spaces")
-                                 .build();
-
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
-        TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls());
-        Txn actual = statement.createTxn(ClientState.forInternalCalls(), QueryOptions.DEFAULT);
-
-        assertEquals(expected, actual);
-        assertEquals(3, statement.getReturningReferences().size());
-        assertEquals("with spaces", statement.getReturningReferences().get(0).column().name.toString());
-        assertEquals("with\"quote", statement.getReturningReferences().get(1).column().name.toString());
-        assertEquals("MiXeD_CaSe", statement.getReturningReferences().get(2).column().name.toString());
-    }
-
-    @Test
-    public void updateVariableSubstitutionTest()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "  LET row1 = (SELECT * FROM ks.tbl1 WHERE k=1 AND c=2);\n" +
-                       "  LET row2 = (SELECT * FROM ks.tbl2 WHERE k=2 AND c=2);\n" +
-                       "  SELECT v FROM ks.tbl1 WHERE k=1 AND c=2;\n" +
-                       "  IF row1.v = 3 AND row2.v=4 THEN\n" +
-                       "    UPDATE ks.tbl1 SET v = row2.v WHERE k=1 AND c=2;\n" +
-                       "  END IF\n" +
-                       "COMMIT TRANSACTION";
-
-        List<TxnReferenceOperation> regularOps = new ArrayList<>();
-        regularOps.add(new TxnReferenceOperation(TxnReferenceOperation.Kind.ConstantSetter, column(TABLE1, "v"), null, null, new Substitution(reference(user("row2"), TABLE2, "v"))));
-        TxnReferenceOperations referenceOps = new TxnReferenceOperations(TABLE1, Clustering.make(bytes(2)), regularOps, Collections.emptyList());
-        Txn expected = TxnBuilder.builder()
-                                 .withRead("row1", "SELECT * FROM ks.tbl1 WHERE k=1 AND c=2")
-                                 .withRead("row2", "SELECT * FROM ks.tbl2 WHERE k=2 AND c=2")
-                                 .withRead(TxnDataName.returning(), "SELECT v FROM ks.tbl1 WHERE k=1 AND c=2")
-                                 .withWrite(emptyUpdate(TABLE1, 1, 2, false), referenceOps)
-                                 .withEqualsCondition("row1", "v", bytes(3))
-                                 .withEqualsCondition("row2", "v", bytes(4))
-                                 .build();
-
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
-        TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls());
-        Txn actual = statement.createTxn(ClientState.forInternalCalls(), QueryOptions.DEFAULT);
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void insertVariableSubstitutionTest()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "  LET row1 = (SELECT * FROM ks.tbl1 WHERE k=1 AND c=2);\n" +
-                       "  LET row2 = (SELECT * FROM ks.tbl2 WHERE k=2 AND c=2);\n" +
-                       "  SELECT v FROM ks.tbl1 WHERE k=1 AND c=2;\n" +
-                       "  IF row1.v = 3 AND row2.v=4 THEN\n" +
-                       "    INSERT INTO ks.tbl1 (k, c, v) VALUES (1, 2, row2.v);\n" +
-                       "  END IF\n" +
-                       "COMMIT TRANSACTION";
-
-        List<TxnReferenceOperation> regularOps = new ArrayList<>();
-        regularOps.add(new TxnReferenceOperation(TxnReferenceOperation.Kind.ConstantSetter, column(TABLE1, "v"), null, null, new Substitution(reference(user("row2"), TABLE2, "v"))));
-        TxnReferenceOperations referenceOps = new TxnReferenceOperations(TABLE1, Clustering.make(bytes(2)), regularOps, Collections.emptyList());
-        Txn expected = TxnBuilder.builder()
-                                 .withRead("row1", "SELECT * FROM ks.tbl1 WHERE k=1 AND c=2")
-                                 .withRead("row2", "SELECT * FROM ks.tbl2 WHERE k=2 AND c=2")
-                                 .withRead(TxnDataName.returning(), "SELECT v FROM ks.tbl1 WHERE k=1 AND c=2")
-                                 .withWrite(emptyUpdate(TABLE1, 1, 2, true), referenceOps)
-                                 .withEqualsCondition("row1", "v", bytes(3))
-                                 .withEqualsCondition("row2", "v", bytes(4))
-                                 .build();
-
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
-        TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls());
-        Txn actual = statement.createTxn(ClientState.forInternalCalls(), QueryOptions.DEFAULT);
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void testListCondition()
-    {
-        String update = "BEGIN TRANSACTION\n" +
-                        "  LET row1 = (SELECT * FROM ks.tbl4 WHERE k = ?);\n" +
-                        "  SELECT row1.int_list;\n" +
-                        "  IF row1.int_list = ? THEN\n" +
-                        "    UPDATE ks.tbl4 SET int_list = ? WHERE k = ?;\n" +
-                        "  END IF\n" +
-                        "COMMIT TRANSACTION";
-
-        ListType<Integer> listType = ListType.getInstance(Int32Type.instance, true);
-        List<Integer> initialList = Arrays.asList(1, 2);
-        ByteBuffer initialListBytes = listType.getSerializer().serialize(initialList);
-
-        List<Integer> updatedList = Arrays.asList(1, 2, 3);
-        ByteBuffer updatedListBytes = listType.getSerializer().serialize(updatedList);
-
-        Txn expected = TxnBuilder.builder()
-                                 .withRead("row1", "SELECT * FROM ks.tbl4 WHERE k = 0")
-                                 .withWrite("UPDATE ks.tbl4 SET int_list = ? WHERE k = 0",
-                                            TxnReferenceOperations.empty(),
-                                            new VariableSpecifications(Collections.singletonList(null)),
-                                            updatedListBytes)
-                                 .withEqualsCondition("row1", "int_list", initialListBytes)
-                                 .build();
-
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(update);
-        TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls());
-
-        List<ByteBuffer> values = ImmutableList.of(bytes(0), initialListBytes, updatedListBytes, bytes(0));
-        Txn actual = statement.createTxn(ClientState.forInternalCalls(), QueryOptions.forInternalCalls(values));
-
-        // TODO: Find a better way to test, given list paths are randomly generated and therefore not comparable.
-        assertEquals(expected.toString().length(), actual.toString().length());
-
-        assertEquals(1, statement.getReturningReferences().size());
-        assertEquals("int_list", statement.getReturningReferences().get(0).column().name.toString());
-    }
-
-    @Test
-    public void testListSubstitution()
-    {
-        String update = "BEGIN TRANSACTION\n" +
-                        "  LET row1 = (SELECT * FROM ks.tbl4 WHERE k = ?);\n" +
-                        "  SELECT row1.int_list;\n" +
-                        "  IF row1.int_list = [1, 2] THEN\n" +
-                        "    UPDATE ks.tbl4 SET int_list = row1.int_list WHERE k = ?;\n" +
-                        "  END IF\n" +
-                        "COMMIT TRANSACTION";
-
-        ListType<Integer> listType = ListType.getInstance(Int32Type.instance, true);
-        List<Integer> initialList = Arrays.asList(1, 2);
-        ByteBuffer initialListBytes = listType.getSerializer().serialize(initialList);
-
-        List<TxnReferenceOperation> partitionOps = new ArrayList<>();
-        partitionOps.add(new TxnReferenceOperation(TxnReferenceOperation.Kind.ListSetter, column(TABLE4, "int_list"), null, null, new Substitution(reference(user("row1"), TABLE4, "int_list"))));
-        TxnReferenceOperations referenceOps = new TxnReferenceOperations(TABLE4, Clustering.EMPTY, partitionOps, Collections.emptyList());
-
-        Txn expected = TxnBuilder.builder()
-                                 .withRead("row1", "SELECT * FROM ks.tbl4 WHERE k = 0")
-                                 .withWrite(emptyUpdate(TABLE4, 1, Clustering.EMPTY, false), referenceOps)
-                                 .withEqualsCondition("row1", "int_list", initialListBytes)
-                                 .build();
-
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(update);
-        TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls());
-
-        List<ByteBuffer> values = ImmutableList.of(bytes(0), bytes(1));
-        Txn actual = statement.createTxn(ClientState.forInternalCalls(), QueryOptions.forInternalCalls(values));
-
-        assertEquals(expected, actual);
-        assertEquals(1, statement.getReturningReferences().size());
-        assertEquals("int_list", statement.getReturningReferences().get(0).column().name.toString());
-    }
-
-    private static PartitionUpdate emptyUpdate(TableMetadata metadata, int k, int c, boolean forInsert)
-    {
-        return emptyUpdate(metadata, k, new BufferClustering(bytes(c)), forInsert);
-    }
-
-    private static PartitionUpdate emptyUpdate(TableMetadata metadata, int k, Clustering<?> c, boolean forInsert)
-    {
-        DecoratedKey dk = metadata.partitioner.decorateKey(bytes(k));
-        RegularAndStaticColumns columns = new RegularAndStaticColumns(Columns.from(metadata.regularColumns()), Columns.NONE);
-        PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk, columns, 1);
-
-        Row.Builder row = BTreeRow.unsortedBuilder();
-        row.newRow(c);
-        if (forInsert)
-            row.addPrimaryKeyLivenessInfo(LivenessInfo.create(0, 0));
-        builder.add(row.build());
-
-        return builder.build();
-    }
-
-    private static ColumnMetadata column(TableMetadata metadata, String name)
-    {
-        return metadata.getColumn(new ColumnIdentifier(name, true));
-    }
-
-    private static TxnReference reference(TxnDataName name, TableMetadata metadata, String column)
-    {
-        return new TxnReference(name, column(metadata, column), null);
-    }
 }
diff --git a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
index fe335a03cf..8edb06afd2 100644
--- a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
@@ -69,7 +69,7 @@ public class TxnBuilder
         return withRead(name, query, VariableSpecifications.empty());
     }
 
-    public TxnBuilder withRead(TxnDataName name, String query, VariableSpecifications bindVariables, Object... values)
+    private TxnBuilder withRead(TxnDataName name, String query, VariableSpecifications bindVariables, Object... values)
     {
         SelectStatement.RawStatement parsed = (SelectStatement.RawStatement) QueryProcessor.parseStatement(query);
         // the parser will only let us define a ref name if we're parsing a transaction, which we're not


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/05: fixes from David's feedback branch

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit bc96d9565c22f94751b7c0c2f93c53c5b816c9f4
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Dec 6 12:20:09 2022 -0600

    fixes from David's feedback branch
---
 .../cql3/statements/TransactionStatement.java      |  2 ++
 .../cassandra/serializers/ListSerializer.java      | 12 ++++-----
 .../cql3/statements/TransactionStatementTest.java  | 22 ++++++++--------
 .../cassandra/service/accord/txn/TxnBuilder.java   | 30 ++++++++++++----------
 4 files changed, 36 insertions(+), 30 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 19ead226ba..c78f116a53 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -353,6 +353,8 @@ public class TransactionStatement implements CQLStatement
         public void setKeyspace(ClientState state)
         {
             assignments.forEach(select -> select.setKeyspace(state));
+            if (select != null)
+                select.setKeyspace(state);
             updates.forEach(update -> update.setKeyspace(state));
         }
 
diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java
index c024cb6f35..c8b859e655 100644
--- a/src/java/org/apache/cassandra/serializers/ListSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java
@@ -230,20 +230,20 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>>
     {
         try
         {
-            int n = readCollectionSize(collection, ByteBufferAccessor.instance, ProtocolVersion.V3);
-            int offset = sizeOfCollectionSize(n, ProtocolVersion.V3);
+            ProtocolVersion version = ProtocolVersion.V3;
+            int n = readCollectionSize(collection, ByteBufferAccessor.instance, version);
+            int offset = sizeOfCollectionSize(n, version);
             int idx = ByteBufferUtil.toInt(index);
 
             Preconditions.checkElementIndex(idx, n);
 
             for (int i = 0; i <= idx; i++)
             {
-                ByteBuffer value = readValue(collection, ByteBufferAccessor.instance, offset, ProtocolVersion.V3);
-                offset += sizeOfValue(value, ByteBufferAccessor.instance, ProtocolVersion.V3);
                 if (i == idx)
-                    return value;
+                    return readValue(collection, ByteBufferAccessor.instance, offset, version);
+                offset += skipValue(collection, ByteBufferAccessor.instance, offset, version);
             }
-            return null;
+            throw new AssertionError("Asked to read index " + idx + " but never read the index");
         }
         catch (BufferUnderflowException | IndexOutOfBoundsException e)
         {
diff --git a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
index 45e11a8fcf..492e3e67a2 100644
--- a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
+++ b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
@@ -378,8 +378,8 @@ public class TransactionStatementTest
                                  .withRead(TxnDataName.returning(), "SELECT v FROM ks.tbl1 WHERE k=2 AND c=2")
                                  .withWrite("UPDATE ks.tbl1 SET v=1 WHERE k=1 AND c=2")
                                  .withIsNotNullCondition(user("row1"), null)
-                                 .withEqualsCondition("row1", "ks.tbl1.v", bytes(3))
-                                 .withEqualsCondition("row2", "ks.tbl2.v", bytes(4))
+                                 .withEqualsCondition("row1", "v", bytes(3))
+                                 .withEqualsCondition("row2", "v", bytes(4))
                                  .build();
 
         TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
@@ -407,8 +407,8 @@ public class TransactionStatementTest
                                  .withRead("row2", "SELECT * FROM ks.tbl2 WHERE k=2 AND c=2")
                                  .withWrite("UPDATE ks.tbl1 SET v=1 WHERE k=1 AND c=2")
                                  .withIsNotNullCondition(user("row1"), null)
-                                 .withEqualsCondition("row1", "ks.tbl1.v", bytes(3))
-                                 .withEqualsCondition("row2", "ks.tbl2.v", bytes(4))
+                                 .withEqualsCondition("row1", "v", bytes(3))
+                                 .withEqualsCondition("row2", "v", bytes(4))
                                  .build();
 
         TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
@@ -433,7 +433,7 @@ public class TransactionStatementTest
         Txn expected = TxnBuilder.builder()
                                  .withRead("row1", "SELECT * FROM ks.tbl3 WHERE k=1")
                                  .withWrite("INSERT INTO ks.tbl3 (k, \"with spaces\") VALUES (1, 2)")
-                                 .withIsNullCondition(user("row1"), "ks.tbl3.with spaces")
+                                 .withIsNullCondition(user("row1"), "with spaces")
                                  .build();
 
         TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
@@ -467,8 +467,8 @@ public class TransactionStatementTest
                                  .withRead("row2", "SELECT * FROM ks.tbl2 WHERE k=2 AND c=2")
                                  .withRead(TxnDataName.returning(), "SELECT v FROM ks.tbl1 WHERE k=1 AND c=2")
                                  .withWrite(emptyUpdate(TABLE1, 1, 2, false), referenceOps)
-                                 .withEqualsCondition("row1", "ks.tbl1.v", bytes(3))
-                                 .withEqualsCondition("row2", "ks.tbl2.v", bytes(4))
+                                 .withEqualsCondition("row1", "v", bytes(3))
+                                 .withEqualsCondition("row2", "v", bytes(4))
                                  .build();
 
         TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
@@ -497,8 +497,8 @@ public class TransactionStatementTest
                                  .withRead("row2", "SELECT * FROM ks.tbl2 WHERE k=2 AND c=2")
                                  .withRead(TxnDataName.returning(), "SELECT v FROM ks.tbl1 WHERE k=1 AND c=2")
                                  .withWrite(emptyUpdate(TABLE1, 1, 2, true), referenceOps)
-                                 .withEqualsCondition("row1", "ks.tbl1.v", bytes(3))
-                                 .withEqualsCondition("row2", "ks.tbl2.v", bytes(4))
+                                 .withEqualsCondition("row1", "v", bytes(3))
+                                 .withEqualsCondition("row2", "v", bytes(4))
                                  .build();
 
         TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(query);
@@ -531,7 +531,7 @@ public class TransactionStatementTest
                                             TxnReferenceOperations.empty(),
                                             new VariableSpecifications(Collections.singletonList(null)),
                                             updatedListBytes)
-                                 .withEqualsCondition("row1", "ks.tbl4.int_list", initialListBytes)
+                                 .withEqualsCondition("row1", "int_list", initialListBytes)
                                  .build();
 
         TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(update);
@@ -569,7 +569,7 @@ public class TransactionStatementTest
         Txn expected = TxnBuilder.builder()
                                  .withRead("row1", "SELECT * FROM ks.tbl4 WHERE k = 0")
                                  .withWrite(emptyUpdate(TABLE4, 1, Clustering.EMPTY, false), referenceOps)
-                                 .withEqualsCondition("row1", "ks.tbl4.int_list", initialListBytes)
+                                 .withEqualsCondition("row1", "int_list", initialListBytes)
                                  .build();
 
         TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(update);
diff --git a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
index 055e649dcf..fe335a03cf 100644
--- a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
@@ -21,14 +21,15 @@ package org.apache.cassandra.service.accord.txn;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import accord.primitives.Keys;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 
 import accord.api.Key;
+import accord.primitives.Keys;
 import accord.primitives.Txn;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -42,6 +43,7 @@ import org.apache.cassandra.db.SinglePartitionReadQuery;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -102,18 +104,20 @@ public class TxnBuilder
         return withWrite(query, TxnReferenceOperations.empty(), VariableSpecifications.empty());
     }
 
-    static TxnReference reference(TxnDataName name, String column)
-    {
-        ColumnMetadata metadata = null;
-        if (column != null)
-        {
-            String[] parts = column.split("\\.");
-            Preconditions.checkArgument(parts.length == 3);
-            TableMetadata table = Schema.instance.getTableMetadata(parts[0], parts[1]);
-            Preconditions.checkArgument(table != null);
-            metadata = table.getColumn(new ColumnIdentifier(parts[2], true));
-            Preconditions.checkArgument(metadata != null);
-        }
+    private TxnReference reference(TxnDataName name, String column)
+    {
+        // do any reads match the name?
+        Optional<TxnNamedRead> match = reads.stream().filter(n -> n.name().equals(name)).findFirst();
+        if (!match.isPresent())
+            throw new IllegalArgumentException("Attempted to create a reference for " + name + " but no read exists with that name");
+        TxnNamedRead read = match.get();
+        TableId tableID = read.key().tableId();
+        TableMetadata table = Schema.instance.getTableMetadata(tableID);
+        if (table == null)
+                throw new IllegalStateException("No table exists w/ ID " + tableID);
+        ColumnMetadata metadata = column == null
+                                  ? null
+                                  : table.getColumn(new ColumnIdentifier(column, true)); //TODO support a.b.c for UDT
         return new TxnReference(name, metadata);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org