You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/11/22 11:51:29 UTC

[cassandra] branch trunk updated: [CEP-10] Phase 4: Support CAS Add Operations

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 951d72c  [CEP-10] Phase 4: Support CAS Add Operations
951d72c is described below

commit 951d72cd929d1f6c9329becbdd7604a9e709587b
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Wed Apr 14 22:54:53 2021 +0100

    [CEP-10] Phase 4: Support CAS Add Operations
    
    Co-authored-by: Benedict Elliott Smith <be...@apache.org>
    Co-authored-by: Sam Tunnicliffe <sa...@apache.org>
    Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
 src/java/org/apache/cassandra/cql3/Constants.java  | 58 +++++++++++++--
 src/java/org/apache/cassandra/cql3/Operation.java  | 30 +++++---
 .../cql3/statements/ModificationStatement.java     |  7 +-
 .../cassandra/cql3/statements/UpdateStatement.java |  6 +-
 .../org/apache/cassandra/db/marshal/AsciiType.java |  2 +-
 .../apache/cassandra/db/marshal/StringType.java    | 27 +++++++
 .../org/apache/cassandra/db/marshal/UTF8Type.java  |  2 +-
 .../cassandra/distributed/test/CASAddTest.java     | 83 ++++++++++++++++++++++
 8 files changed, 192 insertions(+), 23 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 6dce3a3..3457e33 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -23,6 +23,9 @@ import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.*;
@@ -30,6 +33,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FastByteOperations;
 
 /**
  * Static helper methods and classes for constants.
@@ -446,16 +450,56 @@ public abstract class Constants
             super(column, t);
         }
 
+        public boolean requiresRead()
+        {
+            return !(column.type instanceof CounterColumnType);
+        }
+
         public void execute(DecoratedKey partitionKey, UpdateParameters params) throws InvalidRequestException
         {
-            ByteBuffer bytes = t.bindAndGet(params.options);
-            if (bytes == null)
-                throw new InvalidRequestException("Invalid null value for counter increment");
-            if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
-                return;
+            if (column.type instanceof CounterColumnType)
+            {
+                ByteBuffer bytes = t.bindAndGet(params.options);
+                if (bytes == null)
+                    throw new InvalidRequestException("Invalid null value for counter increment");
+                if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
+                    return;
+
+                long increment = ByteBufferUtil.toLong(bytes);
+                params.addCounter(column, increment);
+            }
+            else if (column.type instanceof NumberType<?>)
+            {
+                @SuppressWarnings("unchecked") NumberType<Number> type = (NumberType<Number>) column.type;
+                ByteBuffer increment = t.bindAndGet(params.options);
+                ByteBuffer current = getCurrentCellBuffer(partitionKey, params);
+                ByteBuffer newValue = type.add(type, current, type, increment);
+                params.addCell(column, newValue);
+            }
+            else if (column.type instanceof StringType)
+            {
+                ByteBuffer append = t.bindAndGet(params.options);
+                ByteBuffer current = getCurrentCellBuffer(partitionKey, params);
+                ByteBuffer newValue;
+                if (current == null)
+                {
+                    newValue = append;
+                }
+                else
+                {
+                    newValue = ByteBuffer.allocate(current.remaining() + append.remaining());
+                    FastByteOperations.copy(current, current.position(), newValue, newValue.position(), current.remaining());
+                    FastByteOperations.copy(append, append.position(), newValue, newValue.position() + current.remaining(), append.remaining());
+                }
+                params.addCell(column, newValue);
+            }
+        }
 
-            long increment = ByteBufferUtil.toLong(bytes);
-            params.addCounter(column, increment);
+        private ByteBuffer getCurrentCellBuffer(DecoratedKey key, UpdateParameters params)
+        {
+            Row currentRow = params.getPrefetchedRow(key, column.isStatic() ? Clustering.STATIC_CLUSTERING : params.currentClustering());
+            Cell<?> currentCell = currentRow == null ? null : currentRow.getCell(column);
+            return currentCell == null ? null : currentCell.buffer();
         }
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index d52d10e..51f5d13 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -97,7 +97,7 @@ public abstract class Operation
      * This can be one of:
      *   - Setting a value: c = v
      *   - Setting an element of a collection: c[x] = v
-     *   - An addition/subtraction to a variable: c = c +/- v (where v can be a collection literal)
+     *   - An addition/subtraction to a variable: c = c +/- v (where v can be a collection literal, scalar, or string)
      *   - An prepend operation: c = v + c
      */
     public interface RawUpdate
@@ -112,9 +112,11 @@ public abstract class Operation
          *
          * @param metadata
          * @param receiver the column this operation applies to.
+         * @param canReadExistingState whether the update depends on existing state
+         *                 
          * @return the prepared update operation.
          */
-        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException;
+        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException;
 
         /**
          * @return whether this operation can be applied alongside the {@code
@@ -161,7 +163,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
         {
             Term v = value.prepare(metadata.keyspace, receiver);
 
@@ -213,7 +215,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
                 throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver.name));
@@ -260,7 +262,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
         {
             if (!receiver.type.isUDT())
                 throw new InvalidRequestException(String.format("Invalid operation (%s) for non-UDT column %s", toString(receiver), receiver.name));
@@ -298,15 +300,23 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
             {
                 if (receiver.type instanceof TupleType)
                     throw new InvalidRequestException(String.format("Invalid operation (%s) for tuple column %s", toString(receiver), receiver.name));
 
-                if (!(receiver.type instanceof CounterColumnType))
-                    throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver.name));
+                if (canReadExistingState)
+                {
+                    if (!(receiver.type instanceof NumberType<?>) && !(receiver.type instanceof StringType))
+                        throw new InvalidRequestException(String.format("Invalid operation (%s) for non-numeric and non-text type %s", toString(receiver), receiver.name));
+                }
+                else
+                {
+                    if (!(receiver.type instanceof CounterColumnType))
+                        throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver.name));
+                }
                 return new Constants.Adder(receiver, value.prepare(metadata.keyspace, receiver));
             }
             else if (!(receiver.type.isMultiCell()))
@@ -354,7 +364,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
         {
             if (!(receiver.type instanceof CollectionType))
             {
@@ -411,7 +421,7 @@ public abstract class Operation
             this.value = value;
         }
 
-        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+        public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
         {
             Term v = value.prepare(metadata.keyspace, receiver);
 
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 9378d0a..21ddf52 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -367,7 +367,12 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
 
     public boolean requiresRead()
     {
-        // Lists SET operation incurs a read.
+        // A subset of operations require a read before write:
+        // * Setting list element by index
+        // * Deleting list element by index
+        // * Deleting list element by value
+        // * Performing addition on a StringType (i.e. concatenation, only supported for CAS operations)
+        // * Performing addition on a NumberType, again only supported for CAS operations.
         return !requiresRead.isEmpty();
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index f67db14..d8b4685 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -168,7 +168,7 @@ public class UpdateStatement extends ModificationStatement
                 }
                 else
                 {
-                    Operation operation = new Operation.SetValue(value).prepare(metadata, def);
+                    Operation operation = new Operation.SetValue(value).prepare(metadata, def, !conditions.isEmpty());
                     operation.collectMarkerSpecification(bindVariables);
                     operations.add(operation);
                 }
@@ -236,7 +236,7 @@ public class UpdateStatement extends ModificationStatement
                 }
                 else
                 {
-                    Operation operation = new Operation.SetValue(raw).prepare(metadata, def);
+                    Operation operation = new Operation.SetValue(raw).prepare(metadata, def, !conditions.isEmpty());
                     operation.collectMarkerSpecification(bindVariables);
                     operations.add(operation);
                 }
@@ -304,7 +304,7 @@ public class UpdateStatement extends ModificationStatement
 
                 checkFalse(def.isPrimaryKeyColumn(), "PRIMARY KEY part %s found in SET part", def.name);
 
-                Operation operation = entry.right.prepare(metadata, def);
+                Operation operation = entry.right.prepare(metadata, def, !conditions.isEmpty());
                 operation.collectMarkerSpecification(bindVariables);
                 operations.add(operation);
             }
diff --git a/src/java/org/apache/cassandra/db/marshal/AsciiType.java b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
index 05077ee..2d78c1a 100644
--- a/src/java/org/apache/cassandra/db/marshal/AsciiType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.serializers.AsciiSerializer;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class AsciiType extends AbstractType<String>
+public class AsciiType extends StringType
 {
     public static final AsciiType instance = new AsciiType();
 
diff --git a/src/java/org/apache/cassandra/db/marshal/StringType.java b/src/java/org/apache/cassandra/db/marshal/StringType.java
new file mode 100644
index 0000000..f9ce444
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/StringType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.marshal;
+
+public abstract class StringType extends AbstractType<String>
+{
+    protected StringType(ComparisonType comparisonType)
+    {
+        super(comparisonType);
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/marshal/UTF8Type.java b/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
index db62b57..e256070 100644
--- a/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
+++ b/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.serializers.UTF8Serializer;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class UTF8Type extends AbstractType<String>
+public class UTF8Type extends StringType
 {
     public static final UTF8Type instance = new UTF8Type();
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASAddTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASAddTest.java
new file mode 100644
index 0000000..02f9fd8
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASAddTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CASAddTest extends TestBaseImpl
+{
+    private static final Logger logger = LoggerFactory.getLogger(CASAddTest.class);
+
+    /**
+     * The {@code cas_contention_timeout_in_ms} used during the tests
+     */
+    private static final long CONTENTION_TIMEOUT = 1000L;
+
+    /**
+     * The {@code write_request_timeout_in_ms} used during the tests
+     */
+    private static final long REQUEST_TIMEOUT = 1000L;
+
+    @Test
+    public void testAddition() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v int)");
+
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, v) VALUES (1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, 1));
+            
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 1 IF v = 2", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, 1));
+
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 1 IF v = 1", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, 2));
+        }
+    }
+
+    @Test
+    public void testConcat() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v text)");
+
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, v) VALUES (1, 'foo') IF NOT EXISTS", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, "foo"));
+
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 'bar' WHERE pk = 1 IF v = 'foobar'", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, "foo"));
+
+            cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 'bar' WHERE pk = 1 IF v = 'foo'", ConsistencyLevel.QUORUM);
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, "foobar"));
+        }
+    }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org