You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/11/22 11:51:29 UTC
[cassandra] branch trunk updated: [CEP-10] Phase 4: Support CAS Add Operations
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 951d72c [CEP-10] Phase 4: Support CAS Add Operations
951d72c is described below
commit 951d72cd929d1f6c9329becbdd7604a9e709587b
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Wed Apr 14 22:54:53 2021 +0100
[CEP-10] Phase 4: Support CAS Add Operations
Co-authored-by: Benedict Elliott Smith <be...@apache.org>
Co-authored-by: Sam Tunnicliffe <sa...@apache.org>
Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
src/java/org/apache/cassandra/cql3/Constants.java | 58 +++++++++++++--
src/java/org/apache/cassandra/cql3/Operation.java | 30 +++++---
.../cql3/statements/ModificationStatement.java | 7 +-
.../cassandra/cql3/statements/UpdateStatement.java | 6 +-
.../org/apache/cassandra/db/marshal/AsciiType.java | 2 +-
.../apache/cassandra/db/marshal/StringType.java | 27 +++++++
.../org/apache/cassandra/db/marshal/UTF8Type.java | 2 +-
.../cassandra/distributed/test/CASAddTest.java | 83 ++++++++++++++++++++++
8 files changed, 192 insertions(+), 23 deletions(-)
diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java
index 6dce3a3..3457e33 100644
--- a/src/java/org/apache/cassandra/cql3/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/Constants.java
@@ -23,6 +23,9 @@ import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.*;
@@ -30,6 +33,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FastByteOperations;
/**
* Static helper methods and classes for constants.
@@ -446,16 +450,56 @@ public abstract class Constants
super(column, t);
}
+ public boolean requiresRead()
+ {
+ return !(column.type instanceof CounterColumnType);
+ }
+
public void execute(DecoratedKey partitionKey, UpdateParameters params) throws InvalidRequestException
{
- ByteBuffer bytes = t.bindAndGet(params.options);
- if (bytes == null)
- throw new InvalidRequestException("Invalid null value for counter increment");
- if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
- return;
+ if (column.type instanceof CounterColumnType)
+ {
+ ByteBuffer bytes = t.bindAndGet(params.options);
+ if (bytes == null)
+ throw new InvalidRequestException("Invalid null value for counter increment");
+ if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ return;
+
+ long increment = ByteBufferUtil.toLong(bytes);
+ params.addCounter(column, increment);
+ }
+ else if (column.type instanceof NumberType<?>)
+ {
+ @SuppressWarnings("unchecked") NumberType<Number> type = (NumberType<Number>) column.type;
+ ByteBuffer increment = t.bindAndGet(params.options);
+ ByteBuffer current = getCurrentCellBuffer(partitionKey, params);
+ ByteBuffer newValue = type.add(type, current, type, increment);
+ params.addCell(column, newValue);
+ }
+ else if (column.type instanceof StringType)
+ {
+ ByteBuffer append = t.bindAndGet(params.options);
+ ByteBuffer current = getCurrentCellBuffer(partitionKey, params);
+ ByteBuffer newValue;
+ if (current == null)
+ {
+ newValue = append;
+ }
+ else
+ {
+ newValue = ByteBuffer.allocate(current.remaining() + append.remaining());
+ FastByteOperations.copy(current, current.position(), newValue, newValue.position(), current.remaining());
+ FastByteOperations.copy(append, append.position(), newValue, newValue.position() + current.remaining(), append.remaining());
+ }
+ params.addCell(column, newValue);
+ }
+ }
- long increment = ByteBufferUtil.toLong(bytes);
- params.addCounter(column, increment);
+ private ByteBuffer getCurrentCellBuffer(DecoratedKey key, UpdateParameters params)
+ {
+ Row currentRow = params.getPrefetchedRow(key, column.isStatic() ? Clustering.STATIC_CLUSTERING : params.currentClustering());
+ Cell<?> currentCell = currentRow == null ? null : currentRow.getCell(column);
+ return currentCell == null ? null : currentCell.buffer();
}
}
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index d52d10e..51f5d13 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -97,7 +97,7 @@ public abstract class Operation
* This can be one of:
* - Setting a value: c = v
* - Setting an element of a collection: c[x] = v
- * - An addition/subtraction to a variable: c = c +/- v (where v can be a collection literal)
+ * - An addition/subtraction to a variable: c = c +/- v (where v can be a collection literal, scalar, or string)
* - An prepend operation: c = v + c
*/
public interface RawUpdate
@@ -112,9 +112,11 @@ public abstract class Operation
*
* @param metadata
* @param receiver the column this operation applies to.
+ * @param canReadExistingState whether the update depends on existing state
+ *
* @return the prepared update operation.
*/
- public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException;
+ public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException;
/**
* @return whether this operation can be applied alongside the {@code
@@ -161,7 +163,7 @@ public abstract class Operation
this.value = value;
}
- public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+ public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
{
Term v = value.prepare(metadata.keyspace, receiver);
@@ -213,7 +215,7 @@ public abstract class Operation
this.value = value;
}
- public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+ public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
{
if (!(receiver.type instanceof CollectionType))
throw new InvalidRequestException(String.format("Invalid operation (%s) for non collection column %s", toString(receiver), receiver.name));
@@ -260,7 +262,7 @@ public abstract class Operation
this.value = value;
}
- public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+ public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
{
if (!receiver.type.isUDT())
throw new InvalidRequestException(String.format("Invalid operation (%s) for non-UDT column %s", toString(receiver), receiver.name));
@@ -298,15 +300,23 @@ public abstract class Operation
this.value = value;
}
- public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+ public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
{
if (!(receiver.type instanceof CollectionType))
{
if (receiver.type instanceof TupleType)
throw new InvalidRequestException(String.format("Invalid operation (%s) for tuple column %s", toString(receiver), receiver.name));
- if (!(receiver.type instanceof CounterColumnType))
- throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver.name));
+ if (canReadExistingState)
+ {
+ if (!(receiver.type instanceof NumberType<?>) && !(receiver.type instanceof StringType))
+ throw new InvalidRequestException(String.format("Invalid operation (%s) for non-numeric and non-text type %s", toString(receiver), receiver.name));
+ }
+ else
+ {
+ if (!(receiver.type instanceof CounterColumnType))
+ throw new InvalidRequestException(String.format("Invalid operation (%s) for non counter column %s", toString(receiver), receiver.name));
+ }
return new Constants.Adder(receiver, value.prepare(metadata.keyspace, receiver));
}
else if (!(receiver.type.isMultiCell()))
@@ -354,7 +364,7 @@ public abstract class Operation
this.value = value;
}
- public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+ public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
{
if (!(receiver.type instanceof CollectionType))
{
@@ -411,7 +421,7 @@ public abstract class Operation
this.value = value;
}
- public Operation prepare(TableMetadata metadata, ColumnMetadata receiver) throws InvalidRequestException
+ public Operation prepare(TableMetadata metadata, ColumnMetadata receiver, boolean canReadExistingState) throws InvalidRequestException
{
Term v = value.prepare(metadata.keyspace, receiver);
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 9378d0a..21ddf52 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -367,7 +367,12 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
public boolean requiresRead()
{
- // Lists SET operation incurs a read.
+ // A subset of operations require a read before write:
+ // * Setting list element by index
+ // * Deleting list element by index
+ // * Deleting list element by value
+ // * Performing addition on a StringType (i.e. concatenation, only supported for CAS operations)
+ // * Performing addition on a NumberType, again only supported for CAS operations.
return !requiresRead.isEmpty();
}
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index f67db14..d8b4685 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -168,7 +168,7 @@ public class UpdateStatement extends ModificationStatement
}
else
{
- Operation operation = new Operation.SetValue(value).prepare(metadata, def);
+ Operation operation = new Operation.SetValue(value).prepare(metadata, def, !conditions.isEmpty());
operation.collectMarkerSpecification(bindVariables);
operations.add(operation);
}
@@ -236,7 +236,7 @@ public class UpdateStatement extends ModificationStatement
}
else
{
- Operation operation = new Operation.SetValue(raw).prepare(metadata, def);
+ Operation operation = new Operation.SetValue(raw).prepare(metadata, def, !conditions.isEmpty());
operation.collectMarkerSpecification(bindVariables);
operations.add(operation);
}
@@ -304,7 +304,7 @@ public class UpdateStatement extends ModificationStatement
checkFalse(def.isPrimaryKeyColumn(), "PRIMARY KEY part %s found in SET part", def.name);
- Operation operation = entry.right.prepare(metadata, def);
+ Operation operation = entry.right.prepare(metadata, def, !conditions.isEmpty());
operation.collectMarkerSpecification(bindVariables);
operations.add(operation);
}
diff --git a/src/java/org/apache/cassandra/db/marshal/AsciiType.java b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
index 05077ee..2d78c1a 100644
--- a/src/java/org/apache/cassandra/db/marshal/AsciiType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
@@ -35,7 +35,7 @@ import org.apache.cassandra.serializers.AsciiSerializer;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class AsciiType extends AbstractType<String>
+public class AsciiType extends StringType
{
public static final AsciiType instance = new AsciiType();
diff --git a/src/java/org/apache/cassandra/db/marshal/StringType.java b/src/java/org/apache/cassandra/db/marshal/StringType.java
new file mode 100644
index 0000000..f9ce444
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/marshal/StringType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.marshal;
+
+public abstract class StringType extends AbstractType<String>
+{
+ protected StringType(ComparisonType comparisonType)
+ {
+ super(comparisonType);
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/marshal/UTF8Type.java b/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
index db62b57..e256070 100644
--- a/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
+++ b/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.serializers.UTF8Serializer;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class UTF8Type extends AbstractType<String>
+public class UTF8Type extends StringType
{
public static final UTF8Type instance = new UTF8Type();
diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASAddTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASAddTest.java
new file mode 100644
index 0000000..02f9fd8
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASAddTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CASAddTest extends TestBaseImpl
+{
+ private static final Logger logger = LoggerFactory.getLogger(CASAddTest.class);
+
+ /**
+ * The {@code cas_contention_timeout_in_ms} used during the tests
+ */
+ private static final long CONTENTION_TIMEOUT = 1000L;
+
+ /**
+ * The {@code write_request_timeout_in_ms} used during the tests
+ */
+ private static final long REQUEST_TIMEOUT = 1000L;
+
+ @Test
+ public void testAddition() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v int)");
+
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, v) VALUES (1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, 1));
+
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 1 IF v = 2", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, 1));
+
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 1 WHERE pk = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, 2));
+ }
+ }
+
+ @Test
+ public void testConcat() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v text)");
+
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, v) VALUES (1, 'foo') IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, "foo"));
+
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 'bar' WHERE pk = 1 IF v = 'foobar'", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, "foo"));
+
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = v + 'bar' WHERE pk = 1 IF v = 'foo'", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), row(1, "foobar"));
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org