You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2022/12/09 20:01:43 UTC
[cassandra] 05/05: Support CAS and serial read on Accord
This is an automated email from the ASF dual-hosted git repository.
aweisberg pushed a commit to branch cas-accord-v2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9b7cb56c4e368c08e5927c239724d6236d1699c3
Author: Ariel Weisberg <aw...@apple.com>
AuthorDate: Fri Dec 2 14:05:06 2022 -0500
Support CAS and serial read on Accord
patch by Ariel Weisberg; Reviewed by ? and ? for CASSANDRA-18100
---
src/java/org/apache/cassandra/config/Config.java | 31 +-
.../cassandra/config/DatabaseDescriptor.java | 11 +-
src/java/org/apache/cassandra/cql3/Lists.java | 24 +-
src/java/org/apache/cassandra/cql3/Operator.java | 38 ++
.../cassandra/cql3/conditions/ColumnCondition.java | 379 ++++++++++++++-
.../cassandra/cql3/statements/CQL3CasRequest.java | 134 +++++-
.../org/apache/cassandra/db/ClusteringPrefix.java | 2 +
.../org/apache/cassandra/db/marshal/ListType.java | 4 +-
.../org/apache/cassandra/db/marshal/MapType.java | 11 +-
.../org/apache/cassandra/db/marshal/SetType.java | 9 +-
.../apache/cassandra/db/marshal/ValueAccessor.java | 9 +-
.../serializers/CollectionSerializer.java | 2 +-
.../org/apache/cassandra/service/CASRequest.java | 7 +
.../org/apache/cassandra/service/StorageProxy.java | 51 ++-
.../service/accord/AccordPartialCommand.java | 4 +-
.../service/accord/AccordSerializers.java | 92 +++-
.../serializers/BeginInvalidationSerializers.java | 8 +-
.../accord/serializers/CheckStatusSerializers.java | 16 +-
.../accord/serializers/CommitSerializers.java | 8 +-
.../accord/serializers/PreacceptSerializers.java | 13 +-
.../accord/serializers/RecoverySerializers.java | 11 +-
.../cassandra/service/accord/txn/TxnCondition.java | 75 ++-
.../cassandra/service/accord/txn/TxnData.java | 1 +
.../cassandra/service/accord/txn/TxnDataName.java | 33 +-
.../cassandra/service/accord/txn/TxnNamedRead.java | 2 +-
.../cassandra/service/accord/txn/TxnQuery.java | 44 +-
.../cassandra/service/accord/txn/TxnRead.java | 14 +-
.../cassandra/service/accord/txn/TxnUpdate.java | 23 +-
.../service/paxos/PaxosPrepareRefresh.java | 5 +-
.../cassandra/service/paxos/PaxosRepair.java | 29 +-
.../apache/cassandra/service/paxos/PaxosState.java | 1 +
.../org/apache/cassandra/utils/ByteBufferUtil.java | 85 +++-
.../apache/cassandra/utils/NullableSerializer.java | 4 +-
.../distributed/test/ByteBuddyExamples.java | 1 -
.../distributed/test/accord/AccordCQLTest.java | 61 ++-
.../distributed/test/accord/AccordTestBase.java | 63 ++-
.../cql3/conditions/ColumnConditionTest.java | 506 ++++++++++++++++++---
.../cassandra/service/accord/txn/TxnBuilder.java | 17 +-
38 files changed, 1550 insertions(+), 278 deletions(-)
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index db1ed12c8c..eed967d68f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -21,19 +21,17 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
-
import javax.annotation.Nullable;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1054,6 +1052,8 @@ public class Config
*/
public volatile int paxos_repair_parallelism = -1;
+ public volatile LegacyPaxosStrategy legacy_paxos_strategy = LegacyPaxosStrategy.migration;
+
public volatile int max_top_size_partition_count = 10;
public volatile int max_top_tombstone_partition_count = 10;
public volatile DataStorageSpec.LongBytesBound min_tracked_partition_size = new DataStorageSpec.LongBytesBound("1MiB");
@@ -1149,6 +1149,31 @@ public class Config
exception
}
+ /*
+ * How to pick a consensus protocol for CAS
+ * and serial read operations. Transaction statements
+ * will always run on Accord. Legacy in this context includes PaxosV2.
+ */
+ public enum LegacyPaxosStrategy
+ {
+ /*
+ * Default setting
+ *
+ * Allow both Accord and PaxoV1/V2 to run on the same cluster
+ * Some keys and ranges might be running on Accord if they
+ * have been migrated and the rest will run on Paxos until
+ * they are migrated.
+ */
+ migration,
+
+ /*
+ * Everything will be run on Accord. Useful for new deployments
+ * that don't want to accidentally start using legacy Paxos
+ * requiring migration to Accord.
+ */
+ accord
+ }
+
private static final Set<String> SENSITIVE_KEYS = new HashSet<String>() {{
add("client_encryption_options");
add("server_encryption_options");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2955635694..7015382aca 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -43,7 +43,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
@@ -2838,6 +2837,16 @@ public class DatabaseDescriptor
return conf.paxos_topology_repair_strict_each_quorum;
}
+ public static Config.LegacyPaxosStrategy getLegacyPaxosStrategy()
+ {
+ return conf.legacy_paxos_strategy;
+ }
+
+ public static void setLegacyPaxosStrategy(Config.LegacyPaxosStrategy strategy)
+ {
+ conf.legacy_paxos_strategy = strategy;
+ }
+
public static void setNativeTransportMaxRequestDataInFlightPerIpInBytes(long maxRequestDataInFlightInBytes)
{
if (maxRequestDataInFlightInBytes == -1)
diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java
index f24e94aac3..2ad2790006 100644
--- a/src/java/org/apache/cassandra/cql3/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/Lists.java
@@ -17,10 +17,6 @@
*/
package org.apache.cassandra.cql3;
-import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
-import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
-import static org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -30,22 +26,30 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import org.apache.cassandra.db.guardrails.Guardrails;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.schema.ColumnMetadata;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.apache.cassandra.utils.TimeUUID.Generator.atUnixMillisAsBytes;
+
/**
* Static helper methods and classes for lists.
*/
@@ -210,8 +214,10 @@ public abstract class Lists
List<?> l = type.getSerializer().deserializeForNativeProtocol(value, ByteBufferAccessor.instance, version);
List<ByteBuffer> elements = new ArrayList<>(l.size());
for (Object element : l)
+ {
// elements can be null in lists that represent a set of IN values
elements.add(element == null ? null : type.getElementsType().decompose(element));
+ }
return new Value(elements);
}
catch (MarshalException e)
diff --git a/src/java/org/apache/cassandra/cql3/Operator.java b/src/java/org/apache/cassandra/cql3/Operator.java
index bcb5f63be6..8985a33f71 100644
--- a/src/java/org/apache/cassandra/cql3/Operator.java
+++ b/src/java/org/apache/cassandra/cql3/Operator.java
@@ -26,9 +26,14 @@ import java.util.Map;
import java.util.Set;
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.serializers.ListSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.primitives.Ints.checkedCast;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
public enum Operator
{
EQ(0)
@@ -285,6 +290,17 @@ public enum Operator
output.writeInt(b);
}
+ /**
+ * Write the serialized version of this <code>Operator</code> to the specified output.
+ *
+ * @param output the output to write to
+ * @throws IOException if an I/O problem occurs while writing to the specified output
+ */
+ public void writeToUnsignedVInt(DataOutputPlus output) throws IOException
+ {
+ output.writeUnsignedVInt(b);
+ }
+
public int getValue()
{
return b;
@@ -307,6 +323,23 @@ public enum Operator
throw new IOException(String.format("Cannot resolve Relation.Type from binary representation: %s", b));
}
+ /**
+ * Deserializes a <code>Operator</code> instance from the specified input.
+ *
+ * @param input the input to read from
+ * @return the <code>Operator</code> instance deserialized
+ * @throws IOException if a problem occurs while deserializing the <code>Type</code> instance.
+ */
+ public static Operator readFromUnsignedVInt(DataInputPlus input) throws IOException
+ {
+ int b = checkedCast(input.readUnsignedVInt());
+ for (Operator operator : values())
+ if (operator.b == b)
+ return operator;
+
+ throw new IOException(String.format("Cannot resolve Relation.Type from binary representation: %s", b));
+ }
+
/**
* Whether 2 values satisfy this operator (given the type they should be compared with).
*/
@@ -358,4 +391,9 @@ public enum Operator
{
return this == CONTAINS_KEY;
}
+
+ public long sizeAsUnsignedVInt()
+ {
+ return sizeofUnsignedVInt(b);
+ }
}
diff --git a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
index 9d9506ff53..ca45c2adf4 100644
--- a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
@@ -17,24 +17,73 @@
*/
package org.apache.cassandra.cql3.conditions;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.ColumnSpecification;
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
import org.apache.cassandra.cql3.Term.Terminal;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UserTypes;
+import org.apache.cassandra.cql3.VariableSpecifications;
import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.ComplexColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import static org.apache.cassandra.cql3.statements.RequestValidations.*;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.primitives.Ints.checkedCast;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cassandra.cql3.Constants.UNSET_VALUE;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
+import static org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER;
+import static org.apache.cassandra.utils.ByteBufferUtil.vintNullableSerializer;
+import static org.apache.cassandra.utils.ByteBufferUtil.vintSerializer;
+import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
/**
* A CQL3 condition on the value of a column or collection element. For example, "UPDATE .. IF a = 0".
@@ -73,7 +122,7 @@ public abstract class ColumnCondition
terms.collectMarkerSpecification(boundNames);
}
- public abstract ColumnCondition.Bound bind(QueryOptions options);
+ public abstract Bound bind(QueryOptions options);
protected final List<ByteBuffer> bindAndGetTerms(QueryOptions options)
{
@@ -107,7 +156,7 @@ public abstract class ColumnCondition
{
T value = values.get(i);
// The value can be ByteBuffer or Constants.Value so we need to check the 2 type of UNSET
- if (value != ByteBufferUtil.UNSET_BYTE_BUFFER && value != Constants.UNSET_VALUE)
+ if (value != UNSET_BYTE_BUFFER && value != UNSET_VALUE)
filtered.add(value);
}
return filtered;
@@ -210,13 +259,51 @@ public abstract class ColumnCondition
return new UDTFieldCondition(column, udtField, op, terms);
}
+ /*
+ * Don't change or remove entries (and ordinals) or it will break cross version compatibility
+ * as the ordinals are sent in messages and stored by Accord for in flight transactions
+ */
+ enum BoundKind
+ {
+ ELEMENT_ACCESS,
+ MULTI_CELL_COLLECTION,
+ MULTI_CELL_UDT,
+ SIMPLE,
+ UDT_FIELD_ACCESS;
+
+ @SuppressWarnings("rawtypes")
+ BoundSerializer serializer()
+ {
+ switch(this)
+ {
+ case ELEMENT_ACCESS:
+ return ElementAccessBound.serializer;
+ case MULTI_CELL_COLLECTION:
+ return MultiCellCollectionBound.serializer;
+ case MULTI_CELL_UDT:
+ return MultiCellUdtBound.serializer;
+ case SIMPLE:
+ return SimpleBound.serializer;
+ case UDT_FIELD_ACCESS:
+ return UDTFieldAccessBound.serializer;
+ default:
+ throw new AssertionError("Shouldn't have an enum with no serializer");
+ }
+ }
+ }
+
public static abstract class Bound
{
+ @Nonnull
public final ColumnMetadata column;
+
+ @Nonnull
public final Operator comparisonOperator;
protected Bound(ColumnMetadata column, Operator operator)
{
+ checkNotNull(column);
+ checkNotNull(operator);
this.column = column;
// If the operator is an IN we want to compare the value using an EQ.
this.comparisonOperator = operator.isIN() ? Operator.EQ : operator;
@@ -235,7 +322,7 @@ public abstract class ColumnCondition
/** Returns true if the operator is satisfied (i.e. "otherValue operator value == true"), false otherwise. */
public static boolean compareWithOperator(Operator operator, AbstractType<?> type, ByteBuffer value, ByteBuffer otherValue)
{
- if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ if (value == UNSET_BYTE_BUFFER)
throw invalidRequest("Invalid 'unset' value in condition");
if (value == null)
@@ -257,6 +344,55 @@ public abstract class ColumnCondition
}
return operator.isSatisfiedBy(type, otherValue, value);
}
+
+ void checkForUnsetValues(List<ByteBuffer> values)
+ {
+ for (ByteBuffer buffer : values)
+ if (buffer == UNSET_BYTE_BUFFER)
+ throw invalidRequest("Invalid 'unset' value in condition");
+ }
+ protected abstract BoundKind kind();
+
+ public static final IVersionedSerializer<Bound> serializer = new IVersionedSerializer<Bound>()
+ {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void serialize(Bound bound, DataOutputPlus out, int version) throws IOException
+ {
+ columnMetadataSerializer.serialize(bound.column, out, version);
+ bound.comparisonOperator.writeToUnsignedVInt(out);
+ BoundKind kind = bound.kind();
+ out.writeUnsignedVInt(kind.ordinal());
+ kind.serializer().serialize(bound, out, version);
+ }
+
+ @Override
+ public Bound deserialize(DataInputPlus in, int version) throws IOException
+ {
+ ColumnMetadata column = columnMetadataSerializer.deserialize(in, version);
+ Operator comparisonOperator = Operator.readFromUnsignedVInt(in);
+ BoundKind kind = BoundKind.values()[checkedCast(in.readUnsignedVInt())];
+ return kind.serializer().deserialize(in, version, column, comparisonOperator);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public long serializedSize(Bound bound, int version)
+ {
+ BoundKind kind = bound.kind();
+ return columnMetadataSerializer.serializedSize(bound.column, version)
+ + bound.comparisonOperator.sizeAsUnsignedVInt()
+ + sizeofUnsignedVInt(kind.ordinal())
+ + kind.serializer().serializedSize(bound, version);
+ }
+ };
+ }
+
+ private interface BoundSerializer<T extends Bound>
+ {
+ void serialize(T bound, DataOutputPlus out, int version) throws IOException;
+ Bound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException;
+ long serializedSize(T condition, int version);
}
protected static final Cell<?> getCell(Row row, ColumnMetadata column)
@@ -317,6 +453,7 @@ public abstract class ColumnCondition
private SimpleBound(ColumnMetadata column, Operator operator, List<ByteBuffer> values)
{
super(column, operator);
+ checkForUnsetValues(values);
this.values = values;
}
@@ -341,6 +478,34 @@ public abstract class ColumnCondition
}
return false;
}
+
+ @Override
+ protected BoundKind kind()
+ {
+ return BoundKind.SIMPLE;
+ }
+
+ private static final BoundSerializer<SimpleBound> serializer = new BoundSerializer<SimpleBound>()
+ {
+ @Override
+ public void serialize(SimpleBound bound, DataOutputPlus out, int version) throws IOException
+ {
+ serializeList(bound.values, out, version, vintNullableSerializer);
+ }
+
+ @Override
+ public SimpleBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+ {
+ List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+ return new SimpleBound(column, operator, values);
+ }
+
+ @Override
+ public long serializedSize(SimpleBound bound, int version)
+ {
+ return serializedListSize(bound.values, version, vintNullableSerializer);
+ }
+ };
}
/**
@@ -351,11 +516,13 @@ public abstract class ColumnCondition
/**
* The collection element
*/
+ @Nonnull
private final ByteBuffer collectionElement;
/**
* The conditions values.
*/
+ @Nonnull
private final List<ByteBuffer> values;
public ElementAccessBound(ColumnMetadata column,
@@ -364,6 +531,11 @@ public abstract class ColumnCondition
List<ByteBuffer> values)
{
super(column, operator);
+ checkForUnsetValues(values);
+
+ boolean isMap = column.type instanceof MapType;
+ if (collectionElement == null)
+ throw invalidRequest("Invalid null value for %s element access", isMap ? "map" : "list");
this.collectionElement = collectionElement;
this.values = values;
@@ -374,9 +546,6 @@ public abstract class ColumnCondition
{
boolean isMap = column.type instanceof MapType;
- if (collectionElement == null)
- throw invalidRequest("Invalid null value for %s element access", isMap ? "map" : "list");
-
if (isMap)
{
MapType<?, ?> mapType = (MapType<?, ?>) column.type;
@@ -445,6 +614,36 @@ public abstract class ColumnCondition
checkFalse(idx < 0, "Invalid negative list index %d", idx);
return idx;
}
+
+ @Override
+ protected BoundKind kind()
+ {
+ return BoundKind.ELEMENT_ACCESS;
+ }
+
+ private static final BoundSerializer<ElementAccessBound> serializer = new BoundSerializer<ElementAccessBound>()
+ {
+ @Override
+ public void serialize(ElementAccessBound bound, DataOutputPlus out, int version) throws IOException
+ {
+ vintSerializer.serialize(bound.collectionElement, out, version);
+ serializeList(bound.values, out, version, vintNullableSerializer);
+ }
+
+ @Override
+ public ElementAccessBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+ {
+ ByteBuffer collectionElement = vintSerializer.deserialize(in, version);
+ List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+ return new ElementAccessBound(column, collectionElement, operator, values);
+ }
+
+ @Override
+ public long serializedSize(ElementAccessBound bound , int version)
+ {
+ return vintSerializer.serializedSize(bound.collectionElement, version) + serializedListSize(bound.values, version, vintNullableSerializer);
+ }
+ };
}
/**
@@ -452,26 +651,44 @@ public abstract class ColumnCondition
*/
public static final class MultiCellCollectionBound extends Bound
{
- private final List<Term.Terminal> values;
+ @Nonnull
+ private final List<Terminal> values;
- public MultiCellCollectionBound(ColumnMetadata column, Operator operator, List<Term.Terminal> values)
+ @Nullable
+ private Supplier<List<ByteBuffer>> serializedValues;
+
+ public MultiCellCollectionBound(ColumnMetadata column, Operator operator, List<Terminal> values)
{
super(column, operator);
+ checkNotNull(values);
assert column.type.isMultiCell();
this.values = values;
}
+ public Supplier<List<ByteBuffer>> serializedValues()
+ {
+ if (serializedValues != null)
+ return serializedValues;
+
+ serializedValues = Suppliers.memoize(() ->
+ values.stream()
+ .map(v -> v == null ? null : v.get(ProtocolVersion.CURRENT))
+ .collect(toList()));
+
+ return serializedValues;
+ }
+
public boolean appliesTo(Row row)
{
return appliesTo(column, comparisonOperator, values, row);
}
- public static boolean appliesTo(ColumnMetadata column, Operator operator, List<Term.Terminal> values, Row row)
+ public static boolean appliesTo(ColumnMetadata column, Operator operator, List<Terminal> values, Row row)
{
CollectionType<?> type = (CollectionType<?>) column.type;
// copy iterator contents so that we can properly reuse them for each comparison with an IN value
- for (Term.Terminal value : values)
+ for (Terminal value : values)
{
Iterator<Cell<?>> iter = getCells(row, column);
if (value == null)
@@ -495,7 +712,7 @@ public abstract class ColumnCondition
return false;
}
- private static boolean valueAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, Term.Terminal value, Operator operator)
+ private static boolean valueAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, Terminal value, Operator operator)
{
if (value == null)
return !iter.hasNext();
@@ -527,7 +744,10 @@ public abstract class ColumnCondition
// for lists we use the cell value; for sets we use the cell name
ByteBuffer cellValue = isSet ? iter.next().path().get(0) : iter.next().buffer();
- int comparison = type.compare(cellValue, conditionIter.next());
+ ByteBuffer conditionValue = conditionIter.next();
+ if (conditionValue == null)
+ conditionValue = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ int comparison = type.compare(cellValue, conditionValue);
if (comparison != 0)
return evaluateComparisonWithOperator(comparison, operator);
}
@@ -546,6 +766,7 @@ public abstract class ColumnCondition
private static boolean setAppliesTo(SetType<?> type, Iterator<Cell<?>> iter, Set<ByteBuffer> elements, Operator operator)
{
+ // This is redundant since it is already a sorted set?
ArrayList<ByteBuffer> sortedElements = new ArrayList<>(elements);
Collections.sort(sortedElements, type.getElementsType());
return setOrListAppliesTo(type.getElementsType(), iter, sortedElements.iterator(), operator, true);
@@ -562,13 +783,17 @@ public abstract class ColumnCondition
Map.Entry<ByteBuffer, ByteBuffer> conditionEntry = conditionIter.next();
Cell<?> c = iter.next();
+ ByteBuffer conditionEntryKey = conditionEntry.getKey();
+
// compare the keys
- int comparison = type.getKeysType().compare(c.path().get(0), conditionEntry.getKey());
+ int comparison = type.getKeysType().compare(c.path().get(0), conditionEntryKey);
if (comparison != 0)
return evaluateComparisonWithOperator(comparison, operator);
+ ByteBuffer conditionEntryValue = conditionEntry.getValue();
+
// compare the values
- comparison = type.getValuesType().compare(c.buffer(), conditionEntry.getValue());
+ comparison = type.getValuesType().compare(c.buffer(), conditionEntryValue);
if (comparison != 0)
return evaluateComparisonWithOperator(comparison, operator);
}
@@ -579,6 +804,43 @@ public abstract class ColumnCondition
// they're equal
return operator == Operator.EQ || operator == Operator.LTE || operator == Operator.GTE;
}
+
+ @Override
+ protected BoundKind kind()
+ {
+ return BoundKind.MULTI_CELL_COLLECTION;
+ }
+
+ private static final BoundSerializer<MultiCellCollectionBound> serializer = new BoundSerializer<MultiCellCollectionBound>()
+ {
+ @Override
+ public void serialize(MultiCellCollectionBound bound, DataOutputPlus out, int version) throws IOException
+ {
+ serializeList(bound.serializedValues().get(), out, version, vintNullableSerializer);
+ }
+
+ @Override
+ public MultiCellCollectionBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+ {
+ List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+ List<Terminal> terminals;
+ if (operator.isContains() || operator.isContainsKey())
+ {
+ terminals = values.stream().map(Constants.Value::new).collect(toList());
+ }
+ else
+ {
+ terminals = values.stream().map(b -> deserializeCqlCollectionAsTerm(b, column.type)).collect(toList());
+ }
+ return new MultiCellCollectionBound(column, operator, terminals);
+ }
+
+ @Override
+ public long serializedSize(MultiCellCollectionBound bound, int version)
+ {
+ return serializedListSize(bound.serializedValues().get(), version, vintNullableSerializer);
+ }
+ };
}
private static boolean containsAppliesTo(CollectionType<?> type, Iterator<Cell<?>> iter, ByteBuffer value, Operator operator)
@@ -622,16 +884,21 @@ public abstract class ColumnCondition
/**
* The UDT field.
*/
+ @Nonnull
private final FieldIdentifier field;
/**
* The conditions values.
*/
+ @Nonnull
private final List<ByteBuffer> values;
private UDTFieldAccessBound(ColumnMetadata column, FieldIdentifier field, Operator operator, List<ByteBuffer> values)
{
super(column, operator);
+ checkNotNull(field);
+ checkNotNull(values);
+ checkForUnsetValues(values);
assert column.type.isUDT() && field != null;
this.field = field;
this.values = values;
@@ -647,6 +914,7 @@ public abstract class ColumnCondition
{
UserType userType = (UserType) column.type;
+
if (column.type.isMultiCell())
{
Cell<?> cell = getCell(row, column, userType.cellPathForField(field));
@@ -677,6 +945,37 @@ public abstract class ColumnCondition
{
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
+
+ @Override
+ protected BoundKind kind()
+ {
+ return BoundKind.UDT_FIELD_ACCESS;
+ }
+
+ private static final BoundSerializer<UDTFieldAccessBound> serializer = new BoundSerializer<UDTFieldAccessBound>()
+ {
+ @Override
+ public void serialize(UDTFieldAccessBound bound, DataOutputPlus out, int version) throws IOException
+ {
+ vintNullableSerializer.serialize(bound.field.bytes, out, version);
+ serializeList(bound.values, out, version, vintNullableSerializer);
+ }
+
+ @Override
+ public UDTFieldAccessBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+ {
+ FieldIdentifier field = new FieldIdentifier(vintNullableSerializer.deserialize(in, version));
+ List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+ return new UDTFieldAccessBound(column, field, operator, values);
+ }
+
+ @Override
+ public long serializedSize(UDTFieldAccessBound bound, int version)
+ {
+
+ return vintNullableSerializer.serializedSize(bound.field.bytes, version) + serializedListSize(bound.values, version, vintNullableSerializer);
+ }
+ };
}
/**
@@ -687,16 +986,21 @@ public abstract class ColumnCondition
/**
* The conditions values.
*/
+ @Nonnull
private final List<ByteBuffer> values;
/**
* The protocol version
*/
+ @Nonnull
private final ProtocolVersion protocolVersion;
public MultiCellUdtBound(ColumnMetadata column, Operator op, List<ByteBuffer> values, ProtocolVersion protocolVersion)
{
super(column, op);
+ checkNotNull(values);
+ checkNotNull(protocolVersion);
+ checkForUnsetValues(values);
assert column.type.isMultiCell();
this.values = values;
this.protocolVersion = protocolVersion;
@@ -730,6 +1034,37 @@ public abstract class ColumnCondition
{
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
+
+ @Override
+ public BoundKind kind()
+ {
+ return BoundKind.MULTI_CELL_UDT;
+ }
+
+ private static final BoundSerializer<MultiCellUdtBound> serializer = new BoundSerializer<MultiCellUdtBound>()
+ {
+ @Override
+ public void serialize(MultiCellUdtBound bound, DataOutputPlus out, int version) throws IOException
+ {
+ serializeList(bound.values, out, version, vintNullableSerializer);
+ out.writeUnsignedVInt(bound.protocolVersion.asInt());
+ }
+
+ @Override
+ public MultiCellUdtBound deserialize(DataInputPlus in, int version, ColumnMetadata column, Operator operator) throws IOException
+ {
+ List<ByteBuffer> values = deserializeList(in, version, vintNullableSerializer);
+ int protocolVersion = in.readUnsignedVIntChecked();
+ // Does decode actually do what we want?
+ return new MultiCellUdtBound(column, operator, values, ProtocolVersion.decode(protocolVersion, true));
+ }
+
+ @Override
+ public long serializedSize(MultiCellUdtBound bound, int version)
+ {
+ return serializedListSize(bound.values, version, vintNullableSerializer) + sizeofUnsignedVInt(bound.protocolVersion.asInt());
+ }
+ };
}
public static class Raw
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 02931005a7..fe68304598 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -18,30 +18,61 @@
package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.cassandra.db.marshal.TimeUUIDType;
-import org.apache.cassandra.index.IndexRegistry;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.cql3.*;
+import accord.api.Update;
+import accord.primitives.Txn;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.UpdateParameters;
import org.apache.cassandra.cql3.conditions.ColumnCondition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.CASRequest;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+import org.apache.cassandra.service.accord.txn.TxnUpdate;
+import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.cassandra.service.accord.txn.TxnDataName.Kind.USER;
/**
* Processed CAS conditions and update on potentially multiple rows of the same partition.
@@ -343,6 +374,8 @@ public class CQL3CasRequest implements CASRequest
}
public abstract boolean appliesTo(FilteredPartition current) throws InvalidRequestException;
+
+ public abstract TxnCondition asTxnCondition();
}
private static class NotExistCondition extends RowCondition
@@ -356,6 +389,14 @@ public class CQL3CasRequest implements CASRequest
{
return current.getRow(clustering) == null;
}
+
+ @Override
+ public TxnCondition asTxnCondition()
+ {
+ TxnDataName txnDataName = new TxnDataName(USER, clustering, TxnRead.DUMMY_NAME);
+ TxnReference txnReference = new TxnReference(txnDataName, null);
+ return new TxnCondition.Exists(txnReference, TxnCondition.Kind.IS_NULL);
+ }
}
private static class ExistCondition extends RowCondition
@@ -369,6 +410,14 @@ public class CQL3CasRequest implements CASRequest
{
return current.getRow(clustering) != null;
}
+
+ @Override
+ public TxnCondition asTxnCondition()
+ {
+ TxnDataName txnDataName = new TxnDataName(USER, clustering, TxnRead.DUMMY_NAME);
+ TxnReference txnReference = new TxnReference(txnDataName, null);
+ return new TxnCondition.Exists(txnReference, TxnCondition.Kind.IS_NOT_NULL);
+ }
}
private static class ColumnsConditions extends RowCondition
@@ -399,6 +448,12 @@ public class CQL3CasRequest implements CASRequest
}
return true;
}
+
+ @Override
+ public TxnCondition asTxnCondition()
+ {
+ return new TxnCondition.ColumnConditionsAdapter(clustering, conditions.values());
+ }
}
@Override
@@ -406,4 +461,59 @@ public class CQL3CasRequest implements CASRequest
{
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
+
+ @Override
+ public Txn toAccordTxn(ClientState clientState, int nowInSecs) {
+ SinglePartitionReadCommand readCommand = readCommand(nowInSecs);
+ Update update = createUpdate(clientState);
+ // In a CAS request only one key is supported and writes
+ // can't be dependent on any data that is read (only conditions)
+ // so the only relevant keys are the read key
+ TxnRead read = TxnRead.createRead(readCommand);
+ return new Txn.InMemory(read.keys(), read, TxnQuery.CONDITION, update);
+ }
+
+ private Update createUpdate(ClientState clientState)
+ {
+ return new TxnUpdate(createWriteFragments(clientState), createCondition());
+ }
+
+ private TxnCondition createCondition()
+ {
+ List<TxnCondition> txnConditions = new ArrayList<>();
+ if (staticConditions != null)
+ {
+ txnConditions.add(staticConditions.asTxnCondition());
+ }
+ conditions.values()
+ .stream()
+ .map(RowCondition::asTxnCondition)
+ .forEach(txnConditions::add);
+ // CAS forbids empty conditions
+ checkState(!txnConditions.isEmpty());
+ return conditions.size() == 1 ? txnConditions.get(0) : new TxnCondition.BooleanGroup(TxnCondition.Kind.AND, txnConditions);
+ }
+
+ private List<TxnWrite.Fragment> createWriteFragments(ClientState state)
+ {
+ List<TxnWrite.Fragment> fragments = new ArrayList<>();
+ int idx = 0;
+ for (RowUpdate update : updates)
+ {
+ ModificationStatement modification = update.stmt;
+ QueryOptions options = update.options;
+ TxnWrite.Fragment fragment = modification.getTxnWriteFragment(idx++, state, options);
+ fragments.add(fragment);
+ }
+ return fragments;
+ }
+
+ @Override
+ public RowIterator toCasResult(TxnData txnData)
+ {
+ FilteredPartition partition = txnData.get(TxnRead.DUMMY);
+ if (partition != null)
+ return partition.rowIterator();
+ return null;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index c7a2782ece..7c6fdd04bd 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* A clustering prefix is the unit of what a {@link ClusteringComparator} can compare.
* <p>
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 99ae73cc8f..0f39180448 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -32,8 +32,8 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.serializers.ListSerializer;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
@@ -123,7 +123,7 @@ public class ListType<T> extends CollectionType<List<T>>
}
@Override
- public AbstractType<?> freeze()
+ public ListType<T> freeze()
{
if (isMultiCell)
return getInstance(this.elements, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index be74ff1626..89b6b3423e 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -18,7 +18,12 @@
package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.cassandra.cql3.Json;
@@ -31,11 +36,11 @@ import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MapSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.transport.ProtocolVersion;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteComparable.Version;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
-import org.apache.cassandra.utils.Pair;
public class MapType<K, V> extends CollectionType<Map<K, V>>
{
@@ -141,7 +146,7 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
}
@Override
- public AbstractType<?> freeze()
+ public MapType<K, V> freeze()
{
if (isMultiCell)
return getInstance(this.keys, this.values, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 67699ac3da..c831e0d5e5 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -18,7 +18,12 @@
package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.cassandra.cql3.Json;
@@ -114,7 +119,7 @@ public class SetType<T> extends CollectionType<Set<T>>
}
@Override
- public AbstractType<?> freeze()
+ public SetType<T> freeze()
{
if (isMultiCell)
return getInstance(this.elements, false);
diff --git a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
index d454c5e188..5f46e45c0f 100644
--- a/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
+++ b/src/java/org/apache/cassandra/db/marshal/ValueAccessor.java
@@ -40,7 +40,12 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.utils.TimeUUID;
-import static org.apache.cassandra.db.ClusteringPrefix.Kind.*;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_END_INCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.EXCL_START_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_BOUND;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_END_EXCL_START_BOUNDARY;
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.INCL_START_BOUND;
/**
* ValueAccessor allows serializers and other code dealing with raw bytes to operate on different backing types
@@ -130,6 +135,8 @@ public interface ValueAccessor<V>
*/
default boolean isEmpty(V value)
{
+ if (value == null)
+ System.out.println("oops");
return size(value) == 0;
}
diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
index 36e346c3e7..a9cc57e75d 100644
--- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java
@@ -25,10 +25,10 @@ import java.util.List;
import com.google.common.collect.Range;
import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class CollectionSerializer<T> extends TypeSerializer<T>
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java
index 19966c883c..b61f7ab79c 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,10 +17,13 @@
*/
package org.apache.cassandra.service;
+import accord.primitives.Txn;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.txn.TxnData;
import org.apache.cassandra.service.paxos.Ballot;
/**
@@ -44,4 +47,8 @@ public interface CASRequest
* are passed as argument.
*/
public PartitionUpdate makeUpdates(FilteredPartition current, ClientState clientState, Ballot ballot) throws InvalidRequestException;
+
+ public Txn toAccordTxn(ClientState clientState, int nowInSecs);
+
+ RowIterator toCasResult(TxnData data);
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0e873506d4..f417b0344b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -46,6 +46,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.primitives.Txn;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
@@ -122,6 +123,8 @@ import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.txn.TxnData;
+import org.apache.cassandra.service.accord.txn.TxnQuery;
+import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.service.paxos.Ballot;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.ContentionStrategy;
@@ -145,12 +148,10 @@ import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+import static com.google.common.collect.Iterables.concat;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-import static com.google.common.collect.Iterables.concat;
-import static org.apache.commons.lang3.StringUtils.join;
-
+import static org.apache.cassandra.config.Config.LegacyPaxosStrategy.accord;
import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
@@ -178,6 +179,7 @@ import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
+import static org.apache.commons.lang3.StringUtils.join;
public class StorageProxy implements StorageProxyMBean
{
@@ -318,9 +320,17 @@ public class StorageProxy implements StorageProxyMBean
key.toString(), keyspaceName, cfName));
}
- return Paxos.useV2()
- ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState)
- : legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime);
+ if (DatabaseDescriptor.getLegacyPaxosStrategy() == accord)
+ {
+ TxnData data = AccordService.instance().coordinate(request.toAccordTxn(clientState, nowInSeconds));
+ return request.toCasResult(data);
+ }
+ else
+ {
+ return Paxos.useV2()
+ ? Paxos.cas(key, request, consistencyForPaxos, consistencyForCommit, clientState)
+ : legacyCas(keyspaceName, cfName, key, request, consistencyForPaxos, consistencyForCommit, clientState, nowInSeconds, queryStartNanoTime);
+ }
}
public static RowIterator legacyCas(String keyspaceName,
@@ -1847,7 +1857,7 @@ public class StorageProxy implements StorageProxyMBean
}
return consistencyLevel.isSerialConsistency()
- ? readWithPaxos(group, consistencyLevel, queryStartNanoTime)
+ ? readWithConsensus(group, consistencyLevel, queryStartNanoTime)
: readRegular(group, consistencyLevel, queryStartNanoTime);
}
@@ -1861,12 +1871,29 @@ public class StorageProxy implements StorageProxyMBean
return !StorageService.instance.isBootstrapMode();
}
- private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
+ private static PartitionIterator readWithConsensus(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
- return Paxos.useV2()
- ? Paxos.read(group, consistencyLevel)
- : legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime);
+ if (DatabaseDescriptor.getLegacyPaxosStrategy() == accord)
+ {
+ return readWithAccord(group);
+ }
+ else
+ {
+ return Paxos.useV2()
+ ? Paxos.read(group, consistencyLevel)
+ : legacyReadWithPaxos(group, consistencyLevel, queryStartNanoTime);
+ }
+ }
+
+ private static PartitionIterator readWithAccord(SinglePartitionReadCommand.Group group)
+ {
+ if (group.queries.size() > 1)
+ throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
+ TxnRead read = TxnRead.createRead(group.queries.get(0));
+ Txn txn = new Txn.InMemory(read.keys(), read, TxnQuery.ALL, null);
+ TxnData data = AccordService.instance().coordinate(txn);
+ return PartitionIterators.singletonIterator(data.get(TxnRead.DUMMY).rowIterator());
}
private static PartitionIterator legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 5036e2b57c..b8da523ebf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -43,7 +43,7 @@ import static org.apache.cassandra.utils.CollectionSerializers.serializeCollecti
import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
{
@@ -180,7 +180,7 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
{
int size = Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version));
size += CommandSerializers.txnId.serializedSize();
- size += serializedSizeNullable(command.executeAt(), version.msgVersion, CommandSerializers.timestamp);
+ size += serializedNullableSize(command.executeAt(), version.msgVersion, CommandSerializers.timestamp);
size += CommandSerializers.status.serializedSize(command.status(), version.msgVersion);
size += CommandSerializers.kind.serializedSize(command.kind(), version.msgVersion);
size += serializedCollectionSize(command.deps, version.msgVersion, CommandSerializers.txnId);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
index a0faaef0ff..a41759a911 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
@@ -20,18 +20,22 @@ package org.apache.cassandra.service.accord;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.cassandra.cql3.Lists;
import org.apache.cassandra.cql3.Maps;
import org.apache.cassandra.cql3.Sets;
import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.ArrayClustering;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringPrefix;
import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -47,6 +51,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.apache.cassandra.db.TypeSizes.sizeof;
import static com.google.common.primitives.Ints.checkedCast;
import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
import static org.apache.cassandra.db.marshal.CollectionType.Kind.LIST;
@@ -71,6 +76,14 @@ public class AccordSerializers
}
}
+ public static <T> ByteBuffer[] serialize(List<T> items, IVersionedSerializer<T> serializer)
+ {
+ ByteBuffer[] result = new ByteBuffer[items.size()];
+ for (int i = 0, mi = items.size(); i < mi; i++)
+ result[i] = serialize(items.get(i), serializer);
+ return result;
+ }
+
public static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> serializer)
{
try (DataInputBuffer in = new DataInputBuffer(bytes, true))
@@ -157,7 +170,7 @@ public class AccordSerializers
String table = in.readUTF();
ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table);
-
+
// TODO: Can the metadata be null if the table has been dropped?
return metadata.getColumn(name);
}
@@ -166,8 +179,8 @@ public class AccordSerializers
public long serializedSize(ColumnMetadata column, int version)
{
long size = 0;
- size += TypeSizes.sizeof(column.ksName);
- size += TypeSizes.sizeof(column.cfName);
+ size += sizeof(column.ksName);
+ size += sizeof(column.cfName);
size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes);
return size;
}
@@ -193,4 +206,73 @@ public class AccordSerializers
return TableId.serializedSize();
}
};
-}
+
+ public static final IVersionedSerializer<Clustering<?>> clusteringPrefixSerializer = new IVersionedSerializer<Clustering<?>>()
+ {
+ @Override
+ public void serialize(Clustering<?> clustering, DataOutputPlus out, int version) throws IOException
+ {
+ doSerialize(clustering, out);
+ }
+
+ public <V> void doSerialize(Clustering<V> clustering, DataOutputPlus out) throws IOException
+ {
+ if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
+ {
+ out.writeBoolean(true);
+ }
+ else
+ {
+ out.writeBoolean(false);
+ out.writeUnsignedVInt(clustering.size());
+ ValueAccessor<V> accessor = clustering.accessor();
+ for (int i = 0; i < clustering.size(); i++)
+ {
+ accessor.writeWithVIntLength(clustering.get(i), out);
+ }
+ }
+ }
+
+ @Override
+ public Clustering<?> deserialize(DataInputPlus in, int version) throws IOException
+ {
+ Clustering<?> clustering;
+ if (in.readBoolean())
+ {
+ clustering = Clustering.STATIC_CLUSTERING;
+ }
+ else
+ {
+ int numComponents = in.readUnsignedVIntChecked();
+ byte[][] components = new byte[numComponents][];
+ for (int ci = 0; ci < numComponents; ci++)
+ {
+ int componentLength = in.readUnsignedVIntChecked();
+ components[ci] = new byte[componentLength];
+ in.readFully(components[ci]);
+ }
+ clustering = new ArrayClustering(components);
+ }
+ return clustering;
+ }
+
+ @Override
+ public long serializedSize(Clustering<?> clustering, int version)
+ {
+ return computeSerializedSize(clustering);
+ }
+
+ private <V> long computeSerializedSize(Clustering<V> clustering)
+ {
+ int size = sizeof(true) + sizeofUnsignedVInt(clustering.size());
+ ValueAccessor<V> accessor = clustering.accessor();
+ for (int i = 0; i < clustering.size(); i++)
+ {
+ int valueSize = accessor.size(clustering.get(i));
+ size += valueSize;
+ size += sizeofUnsignedVInt(valueSize);
+ }
+ return size;
+ }
+ };
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index e793b14eec..5568b9b01e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
public class BeginInvalidationSerializers
{
@@ -92,12 +92,12 @@ public class BeginInvalidationSerializers
@Override
public long serializedSize(InvalidateReply reply, int version)
{
- return serializedSizeNullable(reply.supersededBy, version, CommandSerializers.ballot)
+ return serializedNullableSize(reply.supersededBy, version, CommandSerializers.ballot)
+ CommandSerializers.ballot.serializedSize(reply.accepted, version)
+ CommandSerializers.status.serializedSize(reply.status, version)
+ TypeSizes.BOOL_SIZE
- + serializedSizeNullable(reply.route, version, KeySerializers.route)
- + serializedSizeNullable(reply.homeKey, version, KeySerializers.routingKey);
+ + serializedNullableSize(reply.route, version, KeySerializers.route)
+ + serializedNullableSize(reply.homeKey, version, KeySerializers.routingKey);
}
};
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index 0741c1e662..ce8ef96b48 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -47,7 +47,7 @@ import org.apache.cassandra.service.accord.txn.TxnData;
import static accord.messages.CheckStatus.SerializationSupport.createOk;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
public class CheckStatusSerializers
{
@@ -166,20 +166,20 @@ public class CheckStatusSerializers
size += CommandSerializers.saveStatus.serializedSize(ok.saveStatus, version);
size += CommandSerializers.ballot.serializedSize(ok.promised, version);
size += CommandSerializers.ballot.serializedSize(ok.accepted, version);
- size += serializedSizeNullable(ok.executeAt, version, CommandSerializers.timestamp);
+ size += serializedNullableSize(ok.executeAt, version, CommandSerializers.timestamp);
size += TypeSizes.BOOL_SIZE;
size += CommandSerializers.durability.serializedSize(ok.durability, version);
- size += serializedSizeNullable(ok.homeKey, version, KeySerializers.routingKey);
- size += serializedSizeNullable(ok.route, version, KeySerializers.route);
+ size += serializedNullableSize(ok.homeKey, version, KeySerializers.routingKey);
+ size += serializedNullableSize(ok.route, version, KeySerializers.route);
if (!(reply instanceof CheckStatusOkFull))
return size;
CheckStatusOkFull okFull = (CheckStatusOkFull) ok;
- size += serializedSizeNullable(okFull.partialTxn, version, CommandSerializers.partialTxn);
- size += serializedSizeNullable(okFull.committedDeps, version, DepsSerializer.partialDeps);
- size += serializedSizeNullable(okFull.writes, version, CommandSerializers.writes);
- size += serializedSizeNullable((TxnData) okFull.result, version, TxnData.serializer);
+ size += serializedNullableSize(okFull.partialTxn, version, CommandSerializers.partialTxn);
+ size += serializedNullableSize(okFull.committedDeps, version, DepsSerializer.partialDeps);
+ size += serializedNullableSize(okFull.writes, version, CommandSerializers.writes);
+ size += serializedNullableSize((TxnData) okFull.result, version, TxnData.serializer);
return size;
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
index b49b898804..ed251016dd 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
public class CommitSerializers
{
@@ -64,10 +64,10 @@ public class CommitSerializers
public long serializedBodySize(Commit msg, int version)
{
return CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
- + serializedSizeNullable(msg.partialTxn, version, CommandSerializers.partialTxn)
+ + serializedNullableSize(msg.partialTxn, version, CommandSerializers.partialTxn)
+ DepsSerializer.partialDeps.serializedSize(msg.partialDeps, version)
- + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
- + serializedSizeNullable(msg.read, version, ReadDataSerializers.request);
+ + serializedNullableSize(msg.route, version, KeySerializers.fullRoute)
+ + serializedNullableSize(msg.read, version, ReadDataSerializers.request);
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
index 251b281621..71d3d69ba1 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
@@ -18,10 +18,6 @@
package org.apache.cassandra.service.accord.serializers;
-import java.io.IOException;
-
-import javax.annotation.Nullable;
-
import accord.messages.PreAccept;
import accord.messages.PreAccept.PreAcceptOk;
import accord.messages.PreAccept.PreAcceptReply;
@@ -35,9 +31,10 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.accord.serializers.TxnRequestSerializer.WithUnsyncedSerializer;
-import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static org.apache.cassandra.utils.NullableSerializer.*;
public class PreacceptSerializers
{
@@ -67,7 +64,7 @@ public class PreacceptSerializers
public long serializedBodySize(PreAccept msg, int version)
{
return CommandSerializers.partialTxn.serializedSize(msg.partialTxn, version)
- + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
+ + serializedNullableSize(msg.route, version, KeySerializers.fullRoute)
+ TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - msg.minEpoch);
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index e9907911a9..fbd00f637e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
-
import javax.annotation.Nullable;
import accord.api.Result;
@@ -45,7 +44,7 @@ import org.apache.cassandra.service.accord.txn.TxnData;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
public class RecoverySerializers
{
@@ -73,7 +72,7 @@ public class RecoverySerializers
{
return CommandSerializers.partialTxn.serializedSize(recover.partialTxn, version)
+ CommandSerializers.ballot.serializedSize(recover.ballot, version)
- + serializedSizeNullable(recover.route, version, KeySerializers.fullRoute);
+ + serializedNullableSize(recover.route, version, KeySerializers.fullRoute);
}
};
@@ -149,13 +148,13 @@ public class RecoverySerializers
long size = CommandSerializers.txnId.serializedSize(recoverOk.txnId, version);
size += CommandSerializers.status.serializedSize(recoverOk.status, version);
size += CommandSerializers.ballot.serializedSize(recoverOk.accepted, version);
- size += serializedSizeNullable(recoverOk.executeAt, version, CommandSerializers.timestamp);
+ size += serializedNullableSize(recoverOk.executeAt, version, CommandSerializers.timestamp);
size += DepsSerializer.partialDeps.serializedSize(recoverOk.deps, version);
size += DepsSerializer.deps.serializedSize(recoverOk.earlierCommittedWitness, version);
size += DepsSerializer.deps.serializedSize(recoverOk.earlierAcceptedNoWitness, version);
size += TypeSizes.sizeof(recoverOk.rejectsFastPath);
- size += serializedSizeNullable(recoverOk.writes, version, CommandSerializers.writes);
- size += serializedSizeNullable((TxnData) recoverOk.result, version, TxnData.serializer);
+ size += serializedNullableSize(recoverOk.writes, version, CommandSerializers.writes);
+ size += serializedNullableSize((TxnData) recoverOk.result, version, TxnData.serializer);
return size;
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
index 120a04fbbe..244efe167e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
@@ -32,6 +35,8 @@ import com.google.common.collect.Iterables;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.cql3.Term;
import org.apache.cassandra.cql3.conditions.ColumnCondition;
+import org.apache.cassandra.cql3.conditions.ColumnCondition.Bound;
+import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
@@ -47,11 +52,17 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.cassandra.service.accord.AccordSerializers.clusteringPrefixSerializer;
import static org.apache.cassandra.service.accord.AccordSerializers.deserializeCqlCollectionAsTerm;
+import static org.apache.cassandra.service.accord.txn.TxnRead.DUMMY;
import static org.apache.cassandra.utils.CollectionSerializers.deserializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
import static org.apache.cassandra.utils.CollectionSerializers.serializeList;
+import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
import static org.apache.cassandra.utils.CollectionSerializers.serializedListSize;
+
public abstract class TxnCondition
{
private interface ConditionSerializer<T extends TxnCondition>
@@ -73,9 +84,12 @@ public abstract class TxnCondition
GREATER_THAN(">", Operator.GT),
GREATER_THAN_OR_EQUAL(">=", Operator.GTE),
LESS_THAN("<", Operator.LT),
- LESS_THAN_OR_EQUAL("<=", Operator.LTE);
+ LESS_THAN_OR_EQUAL("<=", Operator.LTE),
+ COLUMN_CONDITIONS("COLUMN_CONDITIONS", null);
+ @Nonnull
private final String symbol;
+ @Nullable
private final Operator operator;
Kind(String symbol, Operator operator)
@@ -104,6 +118,8 @@ public abstract class TxnCondition
return BooleanGroup.serializer;
case NONE:
return None.serializer;
+ case COLUMN_CONDITIONS:
+ return ColumnConditionsAdapter.serializer;
default:
throw new IllegalArgumentException();
}
@@ -300,6 +316,62 @@ public abstract class TxnCondition
};
}
+ public static class ColumnConditionsAdapter extends TxnCondition {
+ @Nonnull
+ public final Collection<Bound> bounds;
+
+ @Nonnull
+ public final Clustering<?> clustering;
+
+ public ColumnConditionsAdapter(Clustering<?> clustering, Collection<Bound> bounds)
+ {
+ super(Kind.COLUMN_CONDITIONS);
+ checkNotNull(bounds);
+ checkNotNull(clustering);
+ this.bounds = bounds;
+ this.clustering = clustering;
+ }
+
+ @Override
+ public boolean applies(@Nonnull TxnData data)
+ {
+ checkNotNull(data);
+ FilteredPartition partition = data.get(DUMMY);
+ Row row = partition.getRow(clustering);
+ for (Bound bound : bounds)
+ {
+ if (!bound.appliesTo(row))
+ return false;
+ }
+ return true;
+ }
+
+ private static final ConditionSerializer<ColumnConditionsAdapter> serializer = new ConditionSerializer<ColumnConditionsAdapter>()
+ {
+ @Override
+ public void serialize(ColumnConditionsAdapter condition, DataOutputPlus out, int version) throws IOException
+ {
+ clusteringPrefixSerializer.serialize(condition.clustering, out, version);
+ serializeCollection(condition.bounds, out, version, Bound.serializer);
+ }
+
+ @Override
+ public ColumnConditionsAdapter deserialize(DataInputPlus in, int version, Kind ignored) throws IOException
+ {
+ Clustering<?> clustering = clusteringPrefixSerializer.deserialize(in, version);
+ List<Bound> bounds = deserializeList(in, version, Bound.serializer);
+ return new ColumnConditionsAdapter(clustering, bounds);
+ }
+
+ @Override
+ public long serializedSize(ColumnConditionsAdapter condition, int version)
+ {
+ return clusteringPrefixSerializer.serializedSize(condition.clustering, version)
+ + serializedCollectionSize(condition.bounds, version, Bound.serializer);
+ }
+ };
+ }
+
public static class Value extends TxnCondition
{
private static final Set<Kind> KINDS = ImmutableSet.of(Kind.EQUAL, Kind.NOT_EQUAL,
@@ -503,7 +575,6 @@ public abstract class TxnCondition
public static final IVersionedSerializer<TxnCondition> serializer = new IVersionedSerializer<TxnCondition>()
{
- @SuppressWarnings("unchecked")
@Override
public void serialize(TxnCondition condition, DataOutputPlus out, int version) throws IOException
{
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 50bdfbbada..fee1361a85 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -75,6 +75,7 @@ public class TxnData implements Data, Result, Iterable<FilteredPartition>
return data.entrySet();
}
+ // This is inadequate
@Override
public Data merge(Data data)
{
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
index e4526beabd..57b33eed61 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnDataName.java
@@ -24,7 +24,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -37,6 +40,13 @@ import org.apache.cassandra.utils.ObjectSizes;
import static com.google.common.primitives.Ints.checkedCast;
import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+import static org.apache.cassandra.service.accord.AccordSerializers.clusteringPrefixSerializer;
+import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
+
public class TxnDataName implements Comparable<TxnDataName>
{
private static final TxnDataName RETURNING = new TxnDataName(Kind.RETURNING);
@@ -71,13 +81,27 @@ public class TxnDataName implements Comparable<TxnDataName>
}
}
+ @Nonnull
private final Kind kind;
+
+ @Nonnull
private final String[] parts;
- public TxnDataName(Kind kind, String... parts)
+ @Nullable
+ private final Clustering<?> clustering;
+
+ public TxnDataName(@Nonnull Kind kind, @Nonnull String... parts)
+ {
+ this(kind, null, parts);
+ }
+
+ public TxnDataName(@Nonnull Kind kind, @Nullable Clustering<?> clustering, @Nonnull String... parts)
{
+ checkNotNull(kind);
+ checkNotNull(parts);
this.kind = kind;
this.parts = parts;
+ this.clustering = clustering;
}
public static TxnDataName user(String name)
@@ -204,17 +228,19 @@ public class TxnDataName implements Comparable<TxnDataName>
out.writeUnsignedVInt(t.parts.length);
for (String part : t.parts)
out.writeUTF(part);
+ serializeNullable(t.clustering, out, version, clusteringPrefixSerializer);
}
@Override
public TxnDataName deserialize(DataInputPlus in, int version) throws IOException
{
Kind kind = Kind.from(in.readByte());
- int length = checkedCast(in.readUnsignedVInt());
+ int length = in.readUnsignedVIntChecked();
String[] parts = new String[length];
for (int i = 0; i < length; i++)
parts[i] = in.readUTF();
- return new TxnDataName(kind, parts);
+ Clustering<?> clustering = deserializeNullable(in, version, clusteringPrefixSerializer);
+ return new TxnDataName(kind, clustering, parts);
}
@Override
@@ -223,6 +249,7 @@ public class TxnDataName implements Comparable<TxnDataName>
int size = Byte.BYTES + sizeofUnsignedVInt(t.parts.length);
for (String part : t.parts)
size += TypeSizes.sizeof(part);
+ size += serializedNullableSize(t.clustering, version, clusteringPrefixSerializer);
return size;
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 667df37d8c..93100eba3e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -100,7 +100,7 @@ public class TxnNamedRead extends AbstractSerialized<SinglePartitionReadCommand>
return "TxnNamedRead{name='" + name + '\'' + ", key=" + key + ", update=" + get() + '}';
}
- public TxnDataName name()
+ public TxnDataName txnDataName()
{
return name;
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
index d00da39735..3ceb6e79af 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
-
import javax.annotation.Nullable;
import com.google.common.base.Preconditions;
@@ -40,6 +39,12 @@ public abstract class TxnQuery implements Query
{
public static final TxnQuery ALL = new TxnQuery()
{
+ @Override
+ protected byte type()
+ {
+ return 1;
+ }
+
@Override
public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
{
@@ -49,6 +54,12 @@ public abstract class TxnQuery implements Query
public static final TxnQuery NONE = new TxnQuery()
{
+ @Override
+ protected byte type()
+ {
+ return 2;
+ }
+
@Override
public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
{
@@ -56,10 +67,34 @@ public abstract class TxnQuery implements Query
}
};
+ public static final TxnQuery CONDITION = new TxnQuery()
+ {
+ @Override
+ protected byte type()
+ {
+ return 3;
+ }
+
+ @Override
+ public Result compute(TxnId txnId, Data data, @Nullable Read read, @Nullable Update update)
+ {
+ TxnUpdate txnUpdate = (TxnUpdate)update;
+ boolean conditionCheck = txnUpdate.checkCondition(data);
+ // If the condition applied an empty result indicates success
+ if (conditionCheck)
+ return new TxnData();
+ else
+ // If it failed to apply the partition contents (if present) are returned and it indicates failure
+ return (TxnData)data;
+ }
+ };
+
private static final long SIZE = ObjectSizes.measure(ALL);
private TxnQuery() {}
+ abstract protected byte type();
+
public long estimatedSizeOnHeap()
{
return SIZE;
@@ -70,8 +105,8 @@ public abstract class TxnQuery implements Query
@Override
public void serialize(TxnQuery query, DataOutputPlus out, int version) throws IOException
{
- Preconditions.checkArgument(query == null || query == ALL || query == NONE);
- out.writeByte(query == null ? 0 : query == ALL ? 1 : 2);
+ Preconditions.checkArgument(query == null || query == ALL || query == NONE || query == CONDITION);
+ out.writeByte(query == null ? 0 : query.type());
}
@Override
@@ -83,13 +118,14 @@ public abstract class TxnQuery implements Query
case 0: return null;
case 1: return ALL;
case 2: return NONE;
+ case 3: return CONDITION;
}
}
@Override
public long serializedSize(TxnQuery query, int version)
{
- Preconditions.checkArgument(query == null || query == ALL || query == NONE);
+ Preconditions.checkArgument(query == null || query == ALL || query == NONE || query == CONDITION);
return TypeSizes.sizeof((byte)2);
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 2aa91be7e6..5811c8787d 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -24,6 +24,8 @@ import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
+import com.google.common.collect.ImmutableList;
+
import accord.api.Data;
import accord.api.DataStore;
import accord.api.Key;
@@ -33,6 +35,7 @@ import accord.primitives.Keys;
import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -49,6 +52,9 @@ import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
{
+ // There is only potentially one partition in a CAS and SERIAL/LOCAL_SERIAL read
+ public static final String DUMMY_NAME = "";
+ public static final TxnDataName DUMMY = TxnDataName.user(DUMMY_NAME);
private static final long EMPTY_SIZE = ObjectSizes.measure(new TxnRead(new TxnNamedRead[0], null));
private final Keys txnKeys;
@@ -65,6 +71,12 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
this.txnKeys = txnKeys;
}
+ public static TxnRead createRead(SinglePartitionReadCommand readCommand)
+ {
+ TxnNamedRead read = new TxnNamedRead(DUMMY, readCommand);
+ return new TxnRead(ImmutableList.of(read), Keys.of(read.key()));
+ }
+
public long estimatedSizeOnHeap()
{
long size = EMPTY_SIZE;
@@ -76,7 +88,7 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read
@Override
int compareNonKeyFields(TxnNamedRead left, TxnNamedRead right)
{
- return left.name().compareTo(right.name());
+ return left.txnDataName().compareTo(right.txnDataName());
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index 7b26b2acdd..723dbb30df 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -47,14 +47,13 @@ import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
-import static java.lang.Math.toIntExact;
import static org.apache.cassandra.service.accord.AccordSerializers.serialize;
import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
+import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
import static org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
import static org.apache.cassandra.utils.ByteBufferUtil.writeWithVIntLength;
-import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
public class TxnUpdate implements Update
{
@@ -64,6 +63,9 @@ public class TxnUpdate implements Update
private final ByteBuffer[] fragments;
private final ByteBuffer condition;
+ // Memoize computation of condition
+ private Boolean conditionResult;
+
public TxnUpdate(List<TxnWrite.Fragment> fragments, TxnCondition condition)
{
// TODO: Figure out a way to shove keys into TxnCondition, and have it implement slice/merge.
@@ -169,9 +171,7 @@ public class TxnUpdate implements Update
@Override
public Write apply(Data data)
{
- TxnCondition condition = AccordSerializers.deserialize(this.condition, TxnCondition.serializer);
-
- if (!condition.applies((TxnData) data))
+ if (!checkCondition(data))
return TxnWrite.EMPTY;
List<TxnWrite.Fragment> fragments = deserialize(this.fragments, TxnWrite.Fragment.serializer);
@@ -185,6 +185,7 @@ public class TxnUpdate implements Update
return new TxnWrite(updates);
}
+ // Should we serialize the conditionResult?
public static final IVersionedSerializer<TxnUpdate> serializer = new IVersionedSerializer<TxnUpdate>()
{
@Override
@@ -210,6 +211,7 @@ public class TxnUpdate implements Update
long size = KeySerializers.keys.serializedSize(update.keys, version);
size += serializedSizeWithVIntLength(update.condition);
size += serializedArraySize(update.fragments, version, ByteBufferUtil.vintSerializer);
+ assert(ByteBufferUtil.serialized(this, update, version).remaining() == size);
return size;
}
};
@@ -288,4 +290,15 @@ public class TxnUpdate implements Update
result.addAll(deserialize(bytes, serializer));
return result;
}
+
+ // maybeCheckCondition? checkConditionMemoized?
+ public boolean checkCondition(Data data)
+ {
+ // Assert data that was memoized is same as data that is provided?
+ if (conditionResult != null)
+ return conditionResult;
+ TxnCondition condition = AccordSerializers.deserialize(this.condition, TxnCondition.serializer);
+ conditionResult = condition.applies((TxnData) data);
+ return conditionResult;
+ }
}
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
index d25ca2fdd5..dcbd8be587 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosPrepareRefresh.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.service.paxos.Commit.Agreed;
import org.apache.cassandra.service.paxos.Commit.Committed;
@@ -47,7 +46,7 @@ import static org.apache.cassandra.service.paxos.PaxosRequestCallback.shouldExec
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
/**
* Nodes that have promised in response to our prepare, may be missing the latestCommit, meaning we cannot be sure the
@@ -238,7 +237,7 @@ public class PaxosPrepareRefresh implements RequestCallbackWithFailure<PaxosPrep
public long serializedSize(Response response, int version)
{
- return serializedSizeNullable(response.isSupersededBy, version, Ballot.Serializer.instance);
+ return serializedNullableSize(response.isSupersededBy, version, Ballot.Serializer.instance);
}
}
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
index ab757cbdc5..94fa9dc3a4 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepair.java
@@ -20,7 +20,12 @@ package org.apache.cassandra.service.paxos;
import java.io.IOException;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
@@ -63,20 +68,30 @@ import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MonotonicClock;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS;
import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
import static org.apache.cassandra.net.Verb.PAXOS2_REPAIR_REQ;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.service.paxos.Commit.*;
+import static org.apache.cassandra.service.paxos.Commit.Accepted;
+import static org.apache.cassandra.service.paxos.Commit.Committed;
+import static org.apache.cassandra.service.paxos.Commit.Proposal;
+import static org.apache.cassandra.service.paxos.Commit.isAfter;
+import static org.apache.cassandra.service.paxos.Commit.latest;
+import static org.apache.cassandra.service.paxos.Commit.timestampsClash;
import static org.apache.cassandra.service.paxos.ContentionStrategy.Type.REPAIR;
import static org.apache.cassandra.service.paxos.ContentionStrategy.waitUntilForContention;
-import static org.apache.cassandra.service.paxos.Paxos.*;
-import static org.apache.cassandra.service.paxos.PaxosPrepare.*;
+import static org.apache.cassandra.service.paxos.Paxos.Participants;
+import static org.apache.cassandra.service.paxos.Paxos.isInRangeAndShouldProcess;
+import static org.apache.cassandra.service.paxos.Paxos.staleBallotNewerThan;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteAccepted;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.FoundIncompleteCommitted;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.Status;
+import static org.apache.cassandra.service.paxos.PaxosPrepare.prepareWithBallot;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
-import static org.apache.cassandra.utils.NullableSerializer.serializedSizeNullable;
+import static org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
/**
* Facility to finish any in-progress paxos transaction, and ensure that a quorum of nodes agree on the most recent operation.
@@ -629,7 +644,7 @@ public class PaxosRepair extends AbstractPaxosRepair
public long serializedSize(Response response, int version)
{
return Ballot.sizeInBytes()
- + serializedSizeNullable(response.acceptedButNotCommitted, version, Accepted.serializer)
+ + serializedNullableSize(response.acceptedButNotCommitted, version, Accepted.serializer)
+ Committed.serializer.serializedSize(response.committed, version);
}
}
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index e802cd070f..0afb7cae01 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -723,6 +723,7 @@ public class PaxosState implements PaxosOperationLock
{
synchronized (unsafeState.key)
{
+ // Unused return value?
unsafeState.maybeLoad();
assert unsafeState.current != null;
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 062d65f6e1..6a05b0ef7f 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -23,37 +23,28 @@ package org.apache.cassandra.utils;
* afterward, and ensure the tests still pass.
*/
-import java.io.*;
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.Collectors;
-import net.nicoulaj.compilecommand.annotations.Inline;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.BooleanType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.DateType;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.TimestampType;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
+import static com.google.common.primitives.Ints.checkedCast;
/**
* Utility methods to make ByteBuffers less painful
@@ -359,6 +350,17 @@ public class ByteBufferUtil
out.writeUnsignedVInt(bytes.remaining());
out.write(bytes);
}
+ public static void writeWithVIntLengthAndNull(ByteBuffer bytes, DataOutputPlus out) throws IOException
+ {
+ if (bytes == null)
+ {
+ out.writeVInt(-1);
+ return;
+ }
+
+ out.writeVInt(bytes.remaining());
+ out.write(bytes);
+ }
public static void writeWithShortLength(ByteBuffer buffer, DataOutputPlus out) throws IOException
{
@@ -389,6 +391,17 @@ public class ByteBufferUtil
return ByteBufferUtil.read(in, length);
}
+ public static ByteBuffer readWithVIntLengthAndNull(DataInputPlus in) throws IOException
+ {
+ int length = checkedCast(in.readVInt());
+ if (length < -1)
+ throw new IOException("Corrupt (negative) value length encountered");
+ if (length == -1)
+ return null;
+
+ return ByteBufferUtil.read(in, length);
+ }
+
public static int serializedSizeWithLength(ByteBuffer buffer)
{
int size = buffer.remaining();
@@ -401,6 +414,15 @@ public class ByteBufferUtil
return TypeSizes.sizeofUnsignedVInt(size) + size;
}
+ public static int serializedSizeWithVIntLengthAndNull(ByteBuffer buffer)
+ {
+ if (buffer == null)
+ return TypeSizes.sizeofVInt(-1);
+
+ int size = buffer.remaining();
+ return TypeSizes.sizeofUnsignedVInt(size) + size;
+ }
+
public static long estimatedSizeOnHeap(ByteBuffer buffer)
{
return EMPTY_SIZE_ON_HEAP + buffer.remaining();
@@ -927,6 +949,19 @@ public class ByteBufferUtil
return true;
}
+ public static <T> ByteBuffer serialized(IVersionedSerializer<T> serializer, T value, int version)
+ {
+ try (DataOutputBuffer dob = new DataOutputBuffer())
+ {
+ serializer.serialize(value, dob, version);
+ return dob.buffer();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public static final IVersionedSerializer<ByteBuffer> vintSerializer = new IVersionedSerializer<ByteBuffer>()
{
@Override
@@ -947,4 +982,6 @@ public class ByteBufferUtil
return serializedSizeWithVIntLength(bytes);
}
};
+
+ public static final IVersionedSerializer<ByteBuffer> vintNullableSerializer = NullableSerializer.wrap(vintSerializer);
}
diff --git a/src/java/org/apache/cassandra/utils/NullableSerializer.java b/src/java/org/apache/cassandra/utils/NullableSerializer.java
index a286fe5765..7d834be995 100644
--- a/src/java/org/apache/cassandra/utils/NullableSerializer.java
+++ b/src/java/org/apache/cassandra/utils/NullableSerializer.java
@@ -39,7 +39,7 @@ public class NullableSerializer
return in.readBoolean() ? serializer.deserialize(in, version) : null;
}
- public static <T> long serializedSizeNullable(T value, int version, IVersionedSerializer<T> serializer)
+ public static <T> long serializedNullableSize(T value, int version, IVersionedSerializer<T> serializer)
{
return value != null
? TypeSizes.sizeof(true) + serializer.serializedSize(value, version)
@@ -61,7 +61,7 @@ public class NullableSerializer
public long serializedSize(T t, int version)
{
- return serializedSizeNullable(t, version, wrap);
+ return serializedNullableSize(t, version, wrap);
}
};
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
index b49572dc4b..e6d7ff8496 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ByteBuddyExamples.java
@@ -119,5 +119,4 @@ public class ByteBuddyExamples extends TestBaseImpl
return r.call();
}
}
-
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 1c981693e0..383800aa4c 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -18,38 +18,33 @@
package org.apache.cassandra.distributed.test.accord;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.functions.types.utils.Bytes;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.service.accord.AccordService;
import org.assertj.core.api.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.SimpleQueryResult;
-import org.apache.cassandra.service.accord.AccordService;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
-
public class AccordCQLTest extends AccordTestBase
{
private static final Logger logger = LoggerFactory.getLogger(AccordCQLTest.class);
@@ -197,7 +192,7 @@ public class AccordCQLTest extends AccordTestBase
cluster ->
{
cluster.coordinator(1).execute("INSERT INTO " + currentTable + " (k, c, v) VALUES (0, 0, " + lhs + ");", ConsistencyLevel.ALL);
-
+
String query = "BEGIN TRANSACTION\n" +
" LET row1 = (SELECT v FROM " + currentTable + " WHERE k = ? LIMIT 1);\n" +
" SELECT row1.v;\n" +
@@ -206,7 +201,7 @@ public class AccordCQLTest extends AccordTestBase
" END IF\n" +
"COMMIT TRANSACTION";
assertRowEqualsWithPreemptedRetry(cluster, new Object[] { lhs }, query, 0, rhs, 1, 0, 1);
-
+
String check = "BEGIN TRANSACTION\n" +
" SELECT * FROM " + currentTable + " WHERE k = ? AND c = ?;\n" +
"COMMIT TRANSACTION";
@@ -1990,4 +1985,30 @@ public class AccordCQLTest extends AccordTestBase
}
);
}
+
+ @Test
+ public void testCASAndSerialRead() throws Exception
+ {
+ test("CREATE TABLE " + currentTable + " (id int, c int, v int, s int static, PRIMARY KEY ((id), c));",
+ cluster -> {
+ ICoordinator coordinator = cluster.coordinator(1);
+ int startingAccordCoordinateCount = getAccordCoordinateCount();
+ coordinator.execute("INSERT INTO " + currentTable + " (id, c, v, s) VALUES (1, 2, 3, 5);", ConsistencyLevel.ALL);
+ assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 3, 5);
+ assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET v = 4 WHERE id = 1 AND c = 2 IF v = 3");
+ assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+ assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ false, 4 }, "UPDATE " + currentTable + " SET v = 4 WHERE id = 1 AND c = 2 IF v = 3");
+ assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+
+ // Test working with a static column
+ assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ false, 5 }, "UPDATE " + currentTable + " SET v = 5 WHERE id = 1 AND c = 2 IF s = 4");
+ assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 4, 5);
+ assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET v = 5 WHERE id = 1 AND c = 2 IF s = 5");
+ assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 5, 5);
+ assertRowEqualsWithPreemptedRetry(cluster, new Object[]{true}, "UPDATE " + currentTable + " SET s = 6 WHERE id = 1 IF s = 5");
+ assertRowSerial(cluster, "SELECT id, c, v, s FROM " + currentTable + " WHERE id = 1 AND c = 2", 1, 2, 5, 6);
+ // Make sure all the consensus using queries actually were run on Accord
+ assertEquals( 11, getAccordCoordinateCount() - startingAccordCoordinateCount);
+ });
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 60b583b40a..d1d16b66fa 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -18,23 +18,13 @@
package org.apache.cassandra.distributed.test.accord;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import accord.coordinate.Preempted;
+import accord.primitives.Txn;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import net.bytebuddy.implementation.bind.annotation.This;
-import org.assertj.core.api.Assertions;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-
-import accord.coordinate.Preempted;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.TransactionStatement;
import org.apache.cassandra.distributed.Cluster;
@@ -43,10 +33,22 @@ import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.txn.TxnData;
import org.apache.cassandra.utils.AssertionUtils;
import org.apache.cassandra.utils.FailingConsumer;
+import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.junit.Assert.assertArrayEquals;
public abstract class AccordTestBase extends TestBaseImpl
@@ -76,10 +78,10 @@ public abstract class AccordTestBase extends TestBaseImpl
currentTable = KEYSPACE + ".tbl" + COUNTER.getAndIncrement();
}
- protected static void assertRow(Cluster cluster, String query, int k, int c, int v)
+ protected static void assertRowSerial(Cluster cluster, String query, int k, int c, int v, int s)
{
- Object[][] result = cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM);
- assertArrayEquals(new Object[]{new Object[] {k, c, v}}, result);
+ Object[][] result = cluster.coordinator(1).execute(query, ConsistencyLevel.SERIAL);
+ assertArrayEquals(new Object[]{new Object[] {k, c, v, s}}, result);
}
protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws Exception
@@ -105,14 +107,20 @@ public abstract class AccordTestBase extends TestBaseImpl
test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary key (k, c))", fn);
}
+ protected int getAccordCoordinateCount()
+ {
+ return sharedCluster.get(1).callOnInstance(() -> BBAccordCoordinateCountHelper.count.get());
+ }
+
private static Cluster createCluster() throws IOException
{
// need to up the timeout else tests get flaky
// disable vnode for now, but should enable before trunk
return init(Cluster.build(2)
.withoutVNodes()
- .withConfig(c -> c.with(Feature.NETWORK).set("write_request_timeout", "10s").set("transaction_timeout", "15s"))
+ .withConfig(c -> c.with(Feature.NETWORK).set("write_request_timeout", "10s").set("transaction_timeout", "15s").set("legacy_paxos_strategy", "accord"))
.withInstanceInitializer(EnforceUpdateDoesNotPerformRead::install)
+ .withInstanceInitializer(BBAccordCoordinateCountHelper::install)
.start());
}
@@ -164,6 +172,27 @@ public abstract class AccordTestBase extends TestBaseImpl
return map;
}
}
-
+
+ public static class BBAccordCoordinateCountHelper
+ {
+ static AtomicInteger count = new AtomicInteger();
+ static void install(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber != 1)
+ return;
+ new ByteBuddy().rebase(AccordService.class)
+ .method(named("coordinate").and(takesArguments(1)))
+ .intercept(MethodDelegation.to(BBAccordCoordinateCountHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static TxnData coordinate(Txn txn, @SuperCall Callable<TxnData> actual) throws Exception
+ {
+ count.incrementAndGet();
+ return actual.call();
+ }
+ }
+
protected abstract Logger logger();
}
diff --git a/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java b/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
index dd40cf81db..0dc6383637 100644
--- a/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/conditions/ColumnConditionTest.java
@@ -17,162 +17,421 @@
*/
package org.apache.cassandra.cql3.conditions;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.Constants;
+import org.apache.cassandra.cql3.Lists;
+import org.apache.cassandra.cql3.Maps;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.Sets;
+import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.cql3.Terms;
+import org.apache.cassandra.cql3.UserTypes;
import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.serializers.ListSerializer;
+import org.apache.cassandra.serializers.MapSerializer;
+import org.apache.cassandra.serializers.SetSerializer;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;
+import static org.apache.cassandra.cql3.Operator.CONTAINS;
+import static org.apache.cassandra.cql3.Operator.CONTAINS_KEY;
+import static org.apache.cassandra.cql3.Operator.EQ;
+import static org.apache.cassandra.cql3.Operator.GT;
+import static org.apache.cassandra.cql3.Operator.GTE;
+import static org.apache.cassandra.cql3.Operator.LT;
+import static org.apache.cassandra.cql3.Operator.LTE;
+import static org.apache.cassandra.cql3.Operator.NEQ;
+import static org.apache.cassandra.transport.ProtocolVersion.CURRENT;
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.apache.cassandra.cql3.Operator.*;
-import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-public class ColumnConditionTest
+public class ColumnConditionTest extends CQLTester
{
public static final ByteBuffer ZERO = Int32Type.instance.fromString("0");
public static final ByteBuffer ONE = Int32Type.instance.fromString("1");
public static final ByteBuffer TWO = Int32Type.instance.fromString("2");
- private static Row newRow(ColumnMetadata definition, ByteBuffer value)
+ private static final ListType<Integer> listType = ListType.getInstance(Int32Type.instance, true);
+ private static final MapType<Integer, Integer> mapType = MapType.getInstance(Int32Type.instance, Int32Type.instance, true);
+ private static final SetType<Integer> setType = SetType.getInstance(Int32Type.instance, true);
+
+ private Row newRow(ColumnMetadata definition, ByteBuffer value)
{
- BufferCell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, null);
+ BufferCell cell;
+ if (definition.type.isUDT() )
+ {
+ if (definition.type.isMultiCell()) {
+ CellPath cellPath = udtType.cellPathForField(udtType.fieldName(0));
+ cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, cellPath);
+ }
+ else
+ {
+ ByteBuffer udtValue = UserType.buildValue(value, TWO);
+ cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, udtValue, null);
+ }
+ }
+ else
+ {
+ cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, value, null);
+ }
return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
}
private static Row newRow(ColumnMetadata definition, List<ByteBuffer> values)
{
- Row.Builder builder = BTreeRow.sortedBuilder();
- builder.newRow(Clustering.EMPTY);
- long now = System.currentTimeMillis();
- if (values != null)
+ AbstractType<?> type = definition.type;
+ if (type.isFrozenCollection())
+ {
+ ByteBuffer cellValue = ListSerializer.pack(values, values.size(), CURRENT);
+ Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+ return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+ }
+ else
{
- for (int i = 0, m = values.size(); i < m; i++)
+ Row.Builder builder = BTreeRow.sortedBuilder();
+ builder.newRow(Clustering.EMPTY);
+ long now = System.currentTimeMillis();
+ if (values != null)
{
- TimeUUID uuid = TimeUUID.Generator.atUnixMillis(now, i);
- ByteBuffer key = uuid.toBytes();
- ByteBuffer value = values.get(i);
- BufferCell cell = new BufferCell(definition,
- 0L,
- Cell.NO_TTL,
- Cell.NO_DELETION_TIME,
- value,
- CellPath.create(key));
- builder.addCell(cell);
+ for (int i = 0, m = values.size(); i < m; i++)
+ {
+ BufferCell cell;
+ if (type.isUDT())
+ {
+ cell = new BufferCell(definition,
+ 0L,
+ Cell.NO_TTL,
+ Cell.NO_DELETION_TIME,
+ values.get(i),
+ ((UserType)type).cellPathForField(((UserType) type).fieldName(i)));
+ }
+ else
+ {
+ TimeUUID uuid = TimeUUID.Generator.atUnixMillis(now, i);
+ ByteBuffer key = uuid.toBytes();
+ ByteBuffer value = values.get(i);
+ cell = new BufferCell(definition,
+ 0L,
+ Cell.NO_TTL,
+ Cell.NO_DELETION_TIME,
+ value,
+ CellPath.create(key));
+ }
+ builder.addCell(cell);
+ }
}
+ return builder.build();
}
- return builder.build();
}
private static Row newRow(ColumnMetadata definition, SortedSet<ByteBuffer> values)
{
- Row.Builder builder = BTreeRow.sortedBuilder();
- builder.newRow(Clustering.EMPTY);
- if (values != null)
+ if (definition.type.isFrozenCollection())
{
- for (ByteBuffer value : values)
+ ByteBuffer cellValue = SetSerializer.pack(values, values.size(), CURRENT);
+ Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+ return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+ }
+ else
+ {
+ Row.Builder builder = BTreeRow.sortedBuilder();
+ builder.newRow(Clustering.EMPTY);
+ if (values != null)
{
- BufferCell cell = new BufferCell(definition,
- 0L,
- Cell.NO_TTL,
- Cell.NO_DELETION_TIME,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- CellPath.create(value));
- builder.addCell(cell);
+ for (ByteBuffer value : values)
+ {
+ BufferCell cell = new BufferCell(definition,
+ 0L,
+ Cell.NO_TTL,
+ Cell.NO_DELETION_TIME,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ CellPath.create(value));
+ builder.addCell(cell);
+ }
}
+ return builder.build();
}
- return builder.build();
}
- private static Row newRow(ColumnMetadata definition, Map<ByteBuffer, ByteBuffer> values)
+ private static Row newRow(ColumnMetadata definition, SortedMap<ByteBuffer, ByteBuffer> values)
{
- Row.Builder builder = BTreeRow.sortedBuilder();
- builder.newRow(Clustering.EMPTY);
- if (values != null)
+ if (definition.type.isFrozenCollection())
+ {
+ List<ByteBuffer> packableValues = values.entrySet().stream().flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())).collect(Collectors.toList());
+ ByteBuffer cellValue = MapSerializer.pack(packableValues, values.size(), CURRENT);
+ Cell cell = new BufferCell(definition, 0L, Cell.NO_TTL, Cell.NO_DELETION_TIME, cellValue, null);
+ return BTreeRow.singleCellRow(Clustering.EMPTY, cell);
+ }
+ else
{
- for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+ Row.Builder builder = BTreeRow.sortedBuilder();
+ builder.newRow(Clustering.EMPTY);
+ if (values != null)
{
- BufferCell cell = new BufferCell(definition,
- 0L,
- Cell.NO_TTL,
- Cell.NO_DELETION_TIME,
- entry.getValue(),
- CellPath.create(entry.getKey()));
- builder.addCell(cell);
+ for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+ {
+ BufferCell cell = new BufferCell(definition,
+ 0L,
+ Cell.NO_TTL,
+ Cell.NO_DELETION_TIME,
+ entry.getValue(),
+ CellPath.create(entry.getKey()));
+ builder.addCell(cell);
+ }
}
+ return builder.build();
+ }
+ }
+
+ private static boolean testRoundtripped(ColumnCondition.Bound bound, Row row)
+ {
+ DataOutputBuffer dab = new DataOutputBuffer();
+ IVersionedSerializer<ColumnCondition.Bound> serializer = ColumnCondition.Bound.serializer;
+ int version = MessagingService.current_version;
+ try
+ {
+ serializer.serialize(bound, dab, version);
+ assertEquals(serializer.serializedSize(bound, version), dab.position());
+ ColumnCondition.Bound deserializedBound = serializer.deserialize(new DataInputBuffer(dab.buffer(), false), version);
+ boolean originalResult = bound.appliesTo(row);
+ assertEquals(originalResult, deserializedBound.appliesTo(row));
+ return originalResult;
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
- return builder.build();
}
- private static boolean conditionApplies(ByteBuffer rowValue, Operator op, ByteBuffer conditionValue)
+ private boolean conditionApplies(ByteBuffer rowValue, Operator op, ByteBuffer conditionValue)
{
- ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", Int32Type.instance);
+ AbstractType<?> columnType = Int32Type.instance;
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, false), "c", columnType);
ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
- return bound.appliesTo(newRow(definition, rowValue));
+ boolean regularColumnResult = testRoundtripped(bound, newRow(definition, rowValue));
+ // Every simple bound test is also a valid test of the UDT access path
+ boolean conditionUDTApplies = conditionUDTApplies(rowValue, op, conditionValue);
+ assertEquals(regularColumnResult, conditionUDTApplies);
+ return regularColumnResult;
}
- private static boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
+ private boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
{
- ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
+ boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, listType);
+ boolean frozenResult = conditionApplies(rowValue, op, conditionValue, listType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return nonFrozenResult;
+ }
+
+ private boolean conditionApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue, ListType<Integer> columnType)
+ {
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Lists.Value(conditionValue)));
ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
- return bound.appliesTo(newRow(definition, rowValue));
+ return testRoundtripped(bound, newRow(definition, rowValue));
+ }
+
+ private boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+ {
+ boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, listType);
+ boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, listType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return frozenResult;
}
- private static boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+ private boolean conditionContainsApplies(List<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, ListType<Integer> columnType)
{
- ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", ListType.getInstance(Int32Type.instance, true));
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
- return bound.appliesTo(newRow(definition, rowValue));
+ return testRoundtripped(bound, newRow(definition, rowValue));
}
- private static boolean conditionContainsApplies(Map<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+ private boolean conditionContainsApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
{
- ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
+ boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, mapType);
+ boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, mapType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return frozenResult;
+ }
+
+ private boolean conditionContainsApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, MapType<Integer, Integer> columnType)
+ {
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
- return bound.appliesTo(newRow(definition, rowValue));
+ return testRoundtripped(bound, newRow(definition, rowValue));
+ }
+
+ private boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue)
+ {
+ boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, setType);
+ boolean frozenResult = conditionApplies(rowValue, op, conditionValue, setType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return nonFrozenResult;
}
- private static boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue)
+ private boolean conditionApplies(SortedSet<ByteBuffer> rowValue, Operator op, SortedSet<ByteBuffer> conditionValue, SetType<Integer> columnType)
{
- ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", SetType.getInstance(Int32Type.instance, true));
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Sets.Value(conditionValue)));
ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
- return bound.appliesTo(newRow(definition, rowValue));
+ return testRoundtripped(bound, newRow(definition, rowValue));
}
- private static boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
+ private boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue)
{
- ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", SetType.getInstance(Int32Type.instance, true));
+ boolean nonFrozenResult = conditionContainsApplies(rowValue, op, conditionValue, setType);
+ boolean frozenResult = conditionContainsApplies(rowValue, op, conditionValue, setType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return frozenResult;
+ }
+
+ private boolean conditionContainsApplies(SortedSet<ByteBuffer> rowValue, Operator op, ByteBuffer conditionValue, SetType<Integer> columnType)
+ {
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Constants.Value(conditionValue)));
ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
- return bound.appliesTo(newRow(definition, rowValue));
+ return testRoundtripped(bound, newRow(definition, rowValue));
+ }
+
+ private boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue)
+ {
+ boolean nonFrozenResult = conditionApplies(rowValue, op, conditionValue, mapType);
+ boolean frozenResult = conditionApplies(rowValue, op, conditionValue, mapType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return nonFrozenResult;
}
- private static boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue)
+ private boolean conditionApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, Operator op, SortedMap<ByteBuffer, ByteBuffer> conditionValue, MapType<Integer, Integer> columnType)
{
- ColumnMetadata definition = ColumnMetadata.regularColumn("ks", "cf", "c", MapType.getInstance(Int32Type.instance, Int32Type.instance, true));
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(new Maps.Value(conditionValue)));
ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
- return bound.appliesTo(newRow(definition, rowValue));
+ return testRoundtripped(bound, newRow(definition, rowValue));
+ }
+
+ private boolean conditionElementApplies(List<ByteBuffer> values, ByteBuffer elementIndex, Operator op, ByteBuffer elementValue)
+ {
+ boolean nonFrozenResult = conditionElementApplies(values, elementIndex, op, elementValue, listType);
+ boolean frozenResult = conditionElementApplies(values, elementIndex, op, elementValue, listType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return nonFrozenResult;
+ }
+
+ private boolean conditionElementApplies(List<ByteBuffer> values, ByteBuffer elementIndex, Operator op, ByteBuffer elementValue, ListType<Integer> columnType)
+ {
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
+ ColumnCondition condition = ColumnCondition.condition(definition, new Constants.Value(elementIndex), op, Terms.of(new Constants.Value(elementValue)));
+ ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+ return testRoundtripped(bound, newRow(definition, values));
+ }
+
+ private boolean conditionElementApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, ByteBuffer elementKey, Operator op, ByteBuffer elementValue)
+ {
+ boolean nonFrozenResult = conditionElementApplies(rowValue, elementKey, op, elementValue, mapType);
+ boolean frozenResult = conditionElementApplies(rowValue, elementKey, op, elementValue, mapType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return nonFrozenResult;
+ }
+
+ private boolean conditionElementApplies(SortedMap<ByteBuffer, ByteBuffer> rowValue, ByteBuffer elementKey, Operator op, ByteBuffer elementValue, MapType<Integer, Integer> columnType)
+ {
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(columnType, columnType.isFrozenCollection()), "c", columnType);
+ ColumnCondition condition = ColumnCondition.condition(definition, new Constants.Value(elementKey), op, Terms.of(new Constants.Value(elementValue)));
+ ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+ return testRoundtripped(bound, newRow(definition, rowValue));
+ }
+
+ private boolean conditionUDTApplies(ByteBuffer fieldValue, Operator op, ByteBuffer elementValue)
+ {
+ boolean nonFrozenResult = conditionUDTApplies(fieldValue, op, elementValue, udtType);
+ boolean frozenResult = conditionUDTApplies(fieldValue, op, elementValue, udtType.freeze());
+ assertEquals(nonFrozenResult, frozenResult);
+ return nonFrozenResult;
+ }
+
+ private boolean conditionUDTApplies(ByteBuffer fieldValue, Operator op, ByteBuffer elementValue, UserType type)
+ {
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(type, !type.isMultiCell()), "c", type);
+ ColumnCondition condition = ColumnCondition.condition(definition, type.fieldName(0), op, Terms.of(new Constants.Value(elementValue)));
+ ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+ return testRoundtripped(bound, newRow(definition, fieldValue));
+ }
+
+ private boolean conditionUDTApplies(List<ByteBuffer> rowValue, Operator op, List<ByteBuffer> conditionValue)
+ {
+ ColumnMetadata definition = ColumnMetadata.regularColumn(KEYSPACE, maybeCreateTable(udtType, false), "c", udtType);
+ Term term = conditionValue == null ? Constants.NULL_VALUE : new UserTypes.Value(udtType, conditionValue.toArray(new ByteBuffer[0]));
+ ColumnCondition condition = ColumnCondition.condition(definition, op, Terms.of(term));
+ ColumnCondition.Bound bound = condition.bind(QueryOptions.DEFAULT);
+ return testRoundtripped(bound, newRow(definition, rowValue));
+ }
+
+ public void beforeTest() throws Throwable
+ {
+ super.beforeTest();
+ String typeName = createType("CREATE TYPE %s (a int, b int);");
+ udtType = Schema.instance.getKeyspaceMetadata(KEYSPACE).types.get(ByteBufferUtil.bytes(typeName)).get();
+ }
+ @Override
+ public void afterTest() throws Throwable
+ {
+ super.afterTest();
+ typeToTable.clear();
+ udtType = null;
+ }
+
+ // Is this useful enough to have in CQL tester?
+ private UserType udtType;
+ private final Map<Pair<AbstractType<?>, Boolean>, String> typeToTable = new HashMap<>();
+
+ private String maybeCreateTable(AbstractType<?> columnType, boolean frozen)
+ {
+ String columnTypeCQL = columnType.asCQL3Type().toString();
+ String maybeFrozenColumnTypeCQL = frozen ? columnTypeCQL : "frozen<%s>".format(columnTypeCQL);
+ return typeToTable.computeIfAbsent(Pair.create(columnType, frozen), type -> createTable("CREATE TABLE %s (id int primary key, c " + maybeFrozenColumnTypeCQL + ")"));
}
@FunctionalInterface
@@ -194,7 +453,7 @@ public class ColumnConditionTest
}
@Test
- public void testSimpleBoundIsSatisfiedByValue() throws InvalidRequestException
+ public void testSimpleAndUDTBoundIsSatisfiedByValue() throws InvalidRequestException
{
// EQ
assertTrue(conditionApplies(ONE, EQ, ONE));
@@ -359,6 +618,18 @@ public class ColumnConditionTest
assertFalse(conditionContainsApplies(list(ZERO, ONE, TWO), CONTAINS, ByteBufferUtil.EMPTY_BYTE_BUFFER));
assertFalse(conditionContainsApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS, ONE));
assertTrue(conditionContainsApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+
+ //ELEMENT
+ assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, ZERO));
+ assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, ONE));
+ assertTrue(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, TWO));
+
+ assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, ONE));
+ assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ZERO, EQ, TWO));
+ assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, ZERO));
+ assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), ONE, EQ, TWO));
+ assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, ZERO));
+ assertFalse(conditionElementApplies(list(ZERO, ONE, TWO), TWO, EQ, ONE));
}
private static SortedSet<ByteBuffer> set(ByteBuffer... values)
@@ -611,5 +882,96 @@ public class ColumnConditionTest
assertTrue(conditionContainsApplies(map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS_KEY, ONE));
assertFalse(conditionContainsApplies(map(ONE, ByteBufferUtil.EMPTY_BYTE_BUFFER), CONTAINS_KEY, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ // Element access
+ assertTrue(conditionElementApplies(map(ZERO, ONE), ZERO, EQ, ONE));
+ assertFalse(conditionElementApplies(map(ZERO, ONE), ZERO, EQ, TWO));
+ assertFalse(conditionElementApplies(map(ZERO, ONE), ONE, EQ, TWO));
+ assertFalse(conditionElementApplies(map(), ZERO, EQ, TWO));
+ }
+
+ @Test
+ public void testMultiCellUDTBound() throws InvalidRequestException
+ {
+ // EQ
+ assertTrue(conditionUDTApplies(list(ONE), EQ, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), EQ, list(ZERO)));
+ assertFalse(conditionUDTApplies(list(ZERO), EQ, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE, ONE), EQ, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), EQ, list(ONE, ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), EQ, list()));
+
+ assertFalse(conditionUDTApplies(list(ONE), EQ, (List<ByteBuffer>)null));
+ assertFalse(conditionUDTApplies(null, EQ, list(ONE)));
+ assertTrue(conditionUDTApplies((List<ByteBuffer>) null, EQ, (List<ByteBuffer>) null));
+
+ assertFalse(conditionUDTApplies(list(ONE), EQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+ assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), EQ, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), EQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+ // NEQ
+ assertFalse(conditionUDTApplies(list(ONE), NEQ, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ZERO)));
+ assertTrue(conditionUDTApplies(list(ZERO), NEQ, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE, ONE), NEQ, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ONE, ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), NEQ, list()));
+ assertTrue(conditionUDTApplies(list(), NEQ, list(ONE)));
+
+ assertTrue(conditionUDTApplies(list(ONE), NEQ, (List<ByteBuffer>)null));
+ assertTrue(conditionUDTApplies(null, NEQ, list(ONE)));
+ assertFalse(conditionUDTApplies((List<ByteBuffer>) null, NEQ, (List<ByteBuffer>) null));
+
+ assertTrue(conditionUDTApplies(list(ONE), NEQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+ assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), NEQ, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), NEQ, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+ // LT
+ assertFalse(conditionUDTApplies(list(ONE), LT, list(ONE)));
+ assertFalse(conditionUDTApplies(list(), LT, list()));
+ assertFalse(conditionUDTApplies(list(ONE), LT, list(ZERO)));
+ assertTrue(conditionUDTApplies(list(ZERO), LT, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE, ONE), LT, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), LT, list(ONE, ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), LT, list()));
+
+ assertFalse(conditionUDTApplies(list(ONE), LT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+ assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LT, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+ // LTE
+ assertTrue(conditionUDTApplies(list(ONE), LTE, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), LTE, list(ZERO)));
+ assertTrue(conditionUDTApplies(list(ZERO), LTE, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE, ONE), LTE, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), LTE, list(ONE, ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), LTE, list()));
+
+ assertFalse(conditionUDTApplies(list(ONE), LTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+ assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LTE, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), LTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+ // GT
+ assertFalse(conditionUDTApplies(list(ONE), GT, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), GT, list(ZERO)));
+ assertFalse(conditionUDTApplies(list(ZERO), GT, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE, ONE), GT, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), GT, list(ONE, ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), GT, list()));
+
+ assertTrue(conditionUDTApplies(list(ONE), GT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+ assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GT, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GT, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+
+ // GTE
+ assertTrue(conditionUDTApplies(list(ONE), GTE, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), GTE, list(ZERO)));
+ assertFalse(conditionUDTApplies(list(ZERO), GTE, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ONE, ONE), GTE, list(ONE)));
+ assertFalse(conditionUDTApplies(list(ONE), GTE, list(ONE, ONE)));
+ assertTrue(conditionUDTApplies(list(ONE), GTE, list()));
+
+ assertTrue(conditionUDTApplies(list(ONE), GTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
+ assertFalse(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GTE, list(ONE)));
+ assertTrue(conditionUDTApplies(list(ByteBufferUtil.EMPTY_BYTE_BUFFER), GTE, list(ByteBufferUtil.EMPTY_BYTE_BUFFER)));
}
}
diff --git a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
index 8edb06afd2..17088c8ad2 100644
--- a/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnBuilder.java
@@ -18,19 +18,11 @@
package org.apache.cassandra.service.accord.txn;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-
import accord.api.Key;
import accord.primitives.Keys;
import accord.primitives.Txn;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -48,6 +40,9 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.accord.api.PartitionKey;
+import java.nio.ByteBuffer;
+import java.util.*;
+
public class TxnBuilder
{
private final List<TxnNamedRead> reads = new ArrayList<>();
@@ -107,7 +102,7 @@ public class TxnBuilder
private TxnReference reference(TxnDataName name, String column)
{
// do any reads match the name?
- Optional<TxnNamedRead> match = reads.stream().filter(n -> n.name().equals(name)).findFirst();
+ Optional<TxnNamedRead> match = reads.stream().filter(n -> n.txnDataName().equals(name)).findFirst();
if (!match.isPresent())
throw new IllegalArgumentException("Attempted to create a reference for " + name + " but no read exists with that name");
TxnNamedRead read = match.get();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org