You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/12/02 22:49:33 UTC
[cassandra] branch cep-15-accord updated: CEP-15: Routables - Integrate accord-core changes for CASSANDRA-18087
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new aef9979502 CEP-15: Routables - Integrate accord-core changes for CASSANDRA-18087
aef9979502 is described below
commit aef9979502347ad0784749dafe2a8b9279e6a0fc
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Sat Oct 22 00:05:54 2022 +0100
CEP-15: Routables
- Integrate accord-core changes for CASSANDRA-18087
patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18087
---
.build/include-accord.sh | 4 +-
.../cassandra/db/SinglePartitionReadCommand.java | 2 -
.../cassandra/service/accord/AccordCommand.java | 25 +-
.../service/accord/AccordCommandStore.java | 149 ++++++++--
.../service/accord/AccordCommandsForKey.java | 2 +-
.../cassandra/service/accord/AccordKeyspace.java | 13 +-
.../service/accord/AccordObjectSizes.java | 98 +++++--
.../cassandra/service/accord/AccordTxnBuilder.java | 16 +-
.../cassandra/service/accord/ListenerProxy.java | 18 +-
.../cassandra/service/accord/TokenRange.java | 5 +-
.../cassandra/service/accord/api/AccordAgent.java | 9 +-
.../cassandra/service/accord/api/AccordKey.java | 217 ---------------
.../service/accord/api/AccordRoutableKey.java | 80 ++++++
.../service/accord/api/AccordRoutingKey.java | 129 ++-------
.../cassandra/service/accord/api/PartitionKey.java | 177 ++++++++++++
.../service/accord/async/AsyncContext.java | 2 +-
.../service/accord/async/AsyncLoader.java | 2 +-
.../service/accord/async/AsyncOperation.java | 16 +-
.../service/accord/async/AsyncWriter.java | 10 +-
.../service/accord/db/AbstractKeyIndexed.java | 14 +-
.../cassandra/service/accord/db/AccordData.java | 6 +-
.../cassandra/service/accord/db/AccordRead.java | 10 +-
.../cassandra/service/accord/db/AccordUpdate.java | 9 +-
.../cassandra/service/accord/db/AccordWrite.java | 5 +-
.../accord/serializers/AcceptSerializers.java | 8 +-
.../serializers/BeginInvalidationSerializers.java | 15 +-
.../accord/serializers/CheckStatusSerializers.java | 19 +-
.../accord/serializers/CommandSerializers.java | 12 +-
.../accord/serializers/CommitSerializers.java | 19 +-
.../service/accord/serializers/DepsSerializer.java | 4 +-
.../accord/serializers/GetDepsSerializers.java | 10 +-
.../service/accord/serializers/KeySerializers.java | 307 ++++++++++++++++-----
.../accord/serializers/PreacceptSerializers.java | 12 +-
.../accord/serializers/ReadDataSerializers.java | 7 +-
.../accord/serializers/RecoverySerializers.java | 10 +-
.../accord/serializers/TxnRequestSerializer.java | 8 +-
.../accord/serializers/WaitOnCommitSerializer.java | 9 +-
.../test/accord/AccordIntegrationTest.java | 6 +-
.../service/accord/AccordCommandStoreTest.java | 8 +-
.../service/accord/AccordCommandTest.java | 30 +-
.../cassandra/service/accord/AccordTestUtils.java | 45 ++-
.../service/accord/AccordTopologyTest.java | 6 +-
.../service/accord/api/AccordKeyTest.java | 1 -
.../service/accord/async/AsyncLoaderTest.java | 19 +-
.../service/accord/async/AsyncOperationTest.java | 13 +-
.../service/accord/async/AsyncWriterTest.java | 12 +-
.../accord/serializers/CommandSerializersTest.java | 10 +-
47 files changed, 946 insertions(+), 662 deletions(-)
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index ea1e0544df..0dc01bf2ba 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -24,8 +24,8 @@ set -o nounset
bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
-accord_repo='https://github.com/belliottsmith/cassandra-accord.git'
-accord_branch='bugfix-invalidation'
+accord_repo='https://github.com/apache/cassandra-accord.git'
+accord_branch='trunk'
accord_src="$bin/cassandra-accord"
checkout() {
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 96af39708a..b9e595dcfa 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -48,10 +48,8 @@ import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.*;
-import org.apache.cassandra.service.accord.api.AccordKey;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.btree.BTreeSet;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 45885f657d..878e4209f9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -1,3 +1,4 @@
+/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -32,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Data;
-import accord.api.Key;
import accord.api.Result;
import accord.api.RoutingKey;
import accord.local.Command;
@@ -45,18 +45,18 @@ import accord.local.SaveStatus;
import accord.local.Status;
import accord.local.Status.Durability;
import accord.local.Status.Known;
-import accord.primitives.AbstractRoute;
import accord.primitives.Ballot;
-import accord.primitives.KeyRanges;
-import accord.primitives.Keys;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.DeterministicIdentitySet;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.async.AsyncContext;
import org.apache.cassandra.service.accord.db.AccordData;
import org.apache.cassandra.service.accord.store.StoredNavigableMap;
@@ -114,7 +114,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
private final TxnId txnId;
private final int instanceCount = INSTANCE_COUNTER.getAndIncrement();
- public final StoredValue<AbstractRoute> route;
+ public final StoredValue<Route<?>> route;
public final StoredValue<RoutingKey> homeKey;
public final StoredValue<RoutingKey> progressKey;
public final StoredValue<PartialTxn> partialTxn;
@@ -475,13 +475,13 @@ public class AccordCommand extends Command implements AccordState<TxnId>
}
@Override
- public AbstractRoute route()
+ public Route<?> route()
{
return route.get();
}
@Override
- protected void setRoute(AbstractRoute newRoute)
+ protected void setRoute(Route<?> newRoute)
{
route.set(newRoute);
}
@@ -632,16 +632,17 @@ public class AccordCommand extends Command implements AccordState<TxnId>
private boolean canApplyWithCurrentScope(SafeCommandStore safeStore)
{
- KeyRanges ranges = safeStore.ranges().at(executeAt().epoch);
- Keys keys = partialTxn().keys();
+ Ranges ranges = safeStore.ranges().at(executeAt().epoch);
+ Seekables<?, ?> keys = partialTxn().keys();
for (int i=0,mi=keys.size(); i<mi; i++)
{
- Key key = keys.get(i);
- if (((AccordCommandStore)safeStore).isCommandsForKeyInContext((AccordKey.PartitionKey) key))
+ PartitionKey key = (PartitionKey) keys.get(i);
+ if (((AccordCommandStore)safeStore).isCommandsForKeyInContext(key))
continue;
if (!safeStore.commandStore().hashIntersects(key))
continue;
+
if (!ranges.contains(key))
continue;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index c7348a149f..068b08eff8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -40,9 +41,14 @@ import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.Routables;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
+import accord.primitives.AbstractKeys;
import accord.primitives.TxnId;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.async.AsyncContext;
import org.apache.cassandra.service.accord.async.AsyncOperation;
import org.apache.cassandra.utils.Clock;
@@ -254,6 +260,60 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
});
}
+ public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+ {
+ switch (keysOrRanges.kindOfContents()) {
+ default:
+ throw new AssertionError();
+ case Key:
+ // TODO: efficiency
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+ return keys.stream()
+ .filter(slice::contains)
+ .filter(this::hashIntersects)
+ .map(this::commandsForKey)
+ .map(map)
+ .reduce(initialValue, reduce);
+ case Range:
+ // TODO:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach)
+ {
+ switch (keysOrRanges.kindOfContents()) {
+ default:
+ throw new AssertionError();
+ case Key:
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+ keys.forEach(slice, key -> {
+ if (hashIntersects(key))
+ forEach.accept(commandsForKey(key));
+ });
+ break;
+ case Range:
+ // TODO:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach)
+ {
+ switch (keyOrRange.kind())
+ {
+ default: throw new AssertionError();
+ case Key:
+ Key key = (Key) keyOrRange;
+ if (slice.contains(key))
+ forEach.accept(commandsForKey(key));
+ break;
+ case Range:
+ // TODO:
+ throw new UnsupportedOperationException();
+ }
+ }
+
@Override
public CommandStore commandStore()
{
@@ -266,22 +326,6 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
return dataStore;
}
- public void processBlocking(Runnable runnable)
- {
- try
- {
- executor.submit(runnable).get();
- }
- catch (InterruptedException e)
- {
- throw new UncheckedInterruptedException(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e.getCause());
- }
- }
-
@Override
public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
{
@@ -315,7 +359,7 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
}
@Override
- public Timestamp preaccept(TxnId txnId, Keys keys)
+ public Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys)
{
Timestamp max = maxConflict(keys);
long epoch = latestEpoch();
@@ -331,10 +375,11 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
return time;
}
- public Timestamp maxConflict(Keys keys)
+ public Timestamp maxConflict(Seekables<?, ?> keys)
{
+ // TODO: Seekables
// TODO: efficiency
- return keys.stream()
+ return ((Keys)keys).stream()
.map(this::maybeCommandsForKey)
.filter(Objects::nonNull)
.map(CommandsForKey::max)
@@ -350,6 +395,70 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
return operation;
}
+ public void executeBlocking(Runnable runnable)
+ {
+ try
+ {
+ executor.submit(runnable).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) {
+ switch (keysOrRanges.kindOfContents()) {
+ default:
+ throw new AssertionError();
+ case Key:
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+ return keys.stream()
+ .filter(this::hashIntersects)
+ .map(this::commandsForKey)
+ .map(map)
+ .reduce(initialValue, reduce);
+ case Range:
+ // TODO: implement
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public void forEach(Routables<?, ?> keysOrRanges, Consumer<CommandsForKey> forEach)
+ {
+ switch (keysOrRanges.kindOfContents()) {
+ default:
+ throw new AssertionError();
+ case Key:
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+ keys.forEach(key -> {
+ if (hashIntersects(key))
+ forEach.accept(commandsForKey(key));
+ });
+ break;
+ case Range:
+ // TODO: implement
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public void forEach(Routable keyOrRange, Consumer<CommandsForKey> forEach)
+ {
+ switch (keyOrRange.kind())
+ {
+ default: throw new AssertionError();
+ case Key:
+ forEach.accept(commandsForKey((Key) keyOrRange));
+ break;
+ case Range:
+ // TODO: implement
+ throw new UnsupportedOperationException();
+ }
+ }
@Override
public void shutdown()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index 49941dbfb3..8a1715384e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -40,7 +40,7 @@ import accord.local.CommandsForKey;
import accord.local.Status;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.store.StoredLong;
import org.apache.cassandra.service.accord.store.StoredNavigableMap;
import org.apache.cassandra.service.accord.store.StoredSet;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 8677dd8fea..3313567ee2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -41,10 +41,10 @@ import accord.local.CommandStore;
import accord.local.Node;
import accord.local.SaveStatus;
import accord.local.Status;
-import accord.primitives.AbstractRoute;
import accord.primitives.Ballot;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
+import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -97,8 +97,7 @@ import org.apache.cassandra.schema.Types;
import org.apache.cassandra.schema.Views;
import org.apache.cassandra.serializers.UUIDSerializer;
import org.apache.cassandra.service.accord.AccordCommandsForKey.SeriesKind;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.api.AccordRoutingKey;
import org.apache.cassandra.service.accord.db.AccordData;
import org.apache.cassandra.service.accord.serializers.CommandSerializers;
@@ -161,7 +160,7 @@ public class AccordKeyspace
// TODO: naming is not very clearly distinct from the base serializers
private static class CommandsSerializers
{
- static final LocalVersionedSerializer<AbstractRoute> abstractRoute = localSerializer(KeySerializers.abstractRoute);
+ static final LocalVersionedSerializer<Route<?>> route = localSerializer(KeySerializers.route);
static final LocalVersionedSerializer<AccordRoutingKey> routingKey = localSerializer(AccordRoutingKey.serializer);
static final LocalVersionedSerializer<PartialTxn> partialTxn = localSerializer(CommandSerializers.partialTxn);
static final LocalVersionedSerializer<PartialDeps> partialDeps = localSerializer(DepsSerializer.partialDeps);
@@ -442,7 +441,7 @@ public class AccordKeyspace
builder.addCell(live(CommandsColumns.progress_key, timestampMicros, serializeOrNull((AccordRoutingKey) command.progressKey.get(), CommandsSerializers.routingKey)));
if (command.route.hasModifications())
- builder.addCell(live(CommandsColumns.route, timestampMicros, serializeOrNull(command.route.get(), CommandsSerializers.abstractRoute)));
+ builder.addCell(live(CommandsColumns.route, timestampMicros, serializeOrNull(command.route.get(), CommandsSerializers.route)));
if (command.durability.hasModifications())
builder.addCell(live(CommandsColumns.durability, timestampMicros, accessor.valueOf(command.durability.get().ordinal())));
@@ -450,7 +449,7 @@ public class AccordKeyspace
if (command.partialTxn.hasModifications())
builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.partialTxn.get(), CommandsSerializers.partialTxn)));
- if (command.kind.hasModifications())
+ if (command.kind.hasModifications() && command.kind.get() != null) // initialize sets hasModification(), and don't want to persist null
builder.addCell(live(CommandsColumns.kind, timestampMicros, accessor.valueOf(command.kind.get().ordinal())));
if (command.executeAt.hasModifications())
@@ -595,7 +594,7 @@ public class AccordKeyspace
command.status.load(SaveStatus.values()[row.getInt("status")]);
command.homeKey.load(deserializeOrNull(row.getBlob("home_key"), CommandsSerializers.routingKey));
command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"), CommandsSerializers.routingKey));
- command.route.load(deserializeOrNull(row.getBlob("route"), CommandsSerializers.abstractRoute));
+ command.route.load(deserializeOrNull(row.getBlob("route"), CommandsSerializers.route));
// TODO: something less brittle than ordinal, more efficient than values()
command.durability.load(Status.Durability.values()[row.getInt("durability", 0)]);
command.partialTxn.load(deserializeOrNull(row.getBlob("txn"), CommandsSerializers.partialTxn));
diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 85da64ae8c..1f5263ede6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -23,20 +23,25 @@ import java.util.Map;
import accord.api.Key;
import accord.api.RoutingKey;
import accord.local.Node;
-import accord.primitives.AbstractRoute;
+import accord.primitives.AbstractKeys;
+import accord.primitives.AbstractRanges;
import accord.primitives.Deps;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.FullRangeRoute;
import accord.primitives.Keys;
-import accord.primitives.PartialRoute;
+import accord.primitives.PartialKeyRoute;
+import accord.primitives.PartialRangeRoute;
import accord.primitives.PartialTxn;
-import accord.primitives.Route;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
import accord.primitives.RoutingKeys;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import accord.primitives.Writes;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.api.AccordRoutingKey;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.accord.db.AccordQuery;
@@ -49,7 +54,7 @@ public class AccordObjectSizes
{
public static long key(Key key)
{
- return ((AccordKey.PartitionKey) key).estimatedSizeOnHeap();
+ return ((PartitionKey) key).estimatedSizeOnHeap();
}
public static long key(RoutingKey key)
@@ -58,13 +63,13 @@ public class AccordObjectSizes
}
private static final long EMPTY_KEY_RANGE_SIZE = ObjectSizes.measure(TokenRange.fullRange(TableId.generate()));
- public static long range(KeyRange range)
+ public static long range(Range range)
{
return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end());
}
- private static final long EMPTY_KEY_RANGES_SIZE = ObjectSizes.measure(KeyRanges.of());
- public static long ranges(KeyRanges ranges)
+ private static final long EMPTY_KEY_RANGES_SIZE = ObjectSizes.measure(Ranges.of());
+ public static long ranges(Ranges ranges)
{
long size = EMPTY_KEY_RANGES_SIZE;
size += ObjectSizes.sizeOfReferenceArray(ranges.size());
@@ -84,7 +89,17 @@ public class AccordObjectSizes
return size;
}
- private static long routingKeysOnly(RoutingKeys keys)
+ public static long seekables(Seekables<?, ?> seekables)
+ {
+ switch (seekables.kindOfContents())
+ {
+ default: throw new AssertionError();
+ case Key: return keys((Keys) seekables);
+ case Range: return ranges((Ranges) seekables);
+ }
+ }
+
+ private static long routingKeysOnly(AbstractKeys<RoutingKey, ?> keys)
{
// TODO: many routing keys are fixed size, can compute by multiplication
long size = ObjectSizes.sizeOfReferenceArray(keys.size());
@@ -96,37 +111,70 @@ public class AccordObjectSizes
private static final long EMPTY_ROUTING_KEYS_SIZE = ObjectSizes.measure(RoutingKeys.of());
public static long routingKeys(RoutingKeys keys)
{
- return routingKeysOnly(keys) + EMPTY_ROUTING_KEYS_SIZE;
+ return EMPTY_ROUTING_KEYS_SIZE + routingKeysOnly(keys);
}
- private static final long EMPTY_ROUTE_SIZE = ObjectSizes.measure(new Route(new TokenKey(null, null), new RoutingKey[0]));
- public static long route(Route route)
+ private static final long EMPTY_FULL_KEY_ROUTE_SIZE = ObjectSizes.measure(new FullKeyRoute(new TokenKey(null, null), new RoutingKey[0]));
+ public static long fullKeyRoute(FullKeyRoute route)
{
- return EMPTY_ROUTE_SIZE
+ return EMPTY_FULL_KEY_ROUTE_SIZE
+ routingKeysOnly(route)
- + key(route.homeKey); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error
+ + key(route.homeKey()); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error
}
- private static final long EMPTY_PARTIAL_ROUTE_KEYS_SIZE = ObjectSizes.measure(new PartialRoute(KeyRanges.EMPTY, new TokenKey(null, null), new RoutingKey[0]));
- public static long route(PartialRoute route)
+ private static final long EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE = ObjectSizes.measure(new PartialKeyRoute(Ranges.EMPTY, new TokenKey(null, null), new RoutingKey[0]));
+ public static long partialKeyRoute(PartialKeyRoute route)
{
- return EMPTY_PARTIAL_ROUTE_KEYS_SIZE
+ return EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE
+ routingKeysOnly(route)
- + ranges(route.covering)
- + key(route.homeKey);
+ + ranges(route.covering())
+ + key(route.homeKey());
+ }
+
+ private static long rangesOnly(AbstractRanges<?> ranges)
+ {
+ long size = ObjectSizes.sizeOfReferenceArray(ranges.size());
+ for (int i=0, mi=ranges.size(); i<mi; i++)
+ size += range(ranges.get(i));
+ return size;
}
- public static long route(AbstractRoute route)
+ private static final long EMPTY_FULL_RANGE_ROUTE_SIZE = ObjectSizes.measure(new FullRangeRoute(new TokenKey(null, null), new Range[0]));
+ public static long fullRangeRoute(FullRangeRoute route)
{
- if (route instanceof Route) return route((Route) route);
- else return route((PartialRoute) route);
+ return EMPTY_FULL_RANGE_ROUTE_SIZE
+ + rangesOnly(route)
+ + key(route.homeKey()); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error
+ }
+
+ private static final long EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE = ObjectSizes.measure(new PartialRangeRoute(Ranges.EMPTY, new TokenKey(null, null), new Range[0]));
+ public static long partialRangeRoute(PartialRangeRoute route)
+ {
+ return EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE
+ + rangesOnly(route)
+ + ranges(route.covering())
+ + key(route.homeKey());
+ }
+
+ public static long route(Unseekables<?, ?> unseekables)
+ {
+ switch (unseekables.kind())
+ {
+ default: throw new AssertionError();
+ case RoutingKeys: return routingKeys((RoutingKeys) unseekables);
+ case PartialKeyRoute: return partialKeyRoute((PartialKeyRoute) unseekables);
+ case FullKeyRoute: return fullKeyRoute((FullKeyRoute) unseekables);
+ case RoutingRanges: return ranges((Ranges) unseekables);
+ case PartialRangeRoute: return partialRangeRoute((PartialRangeRoute) unseekables);
+ case FullRangeRoute: return fullRangeRoute((FullRangeRoute) unseekables);
+ }
}
private static final long EMPTY_TXN = ObjectSizes.measure(new PartialTxn.InMemory(null, null, null, null, null, null));
public static long txn(PartialTxn txn)
{
long size = EMPTY_TXN;
- size += keys(txn.keys());
+ size += seekables(txn.keys());
size += ((AccordRead) txn.read()).estimatedSizeOnHeap();
if (txn.update() != null)
size += ((AccordUpdate) txn.update()).estimatedSizeOnHeap();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java b/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java
index f32940345e..680a8ba52b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java
@@ -51,8 +51,8 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.api.AccordRoutableKey;
import org.apache.cassandra.service.accord.db.AccordQuery;
import org.apache.cassandra.service.accord.db.AccordRead;
import org.apache.cassandra.service.accord.db.AccordUpdate;
@@ -61,7 +61,7 @@ import org.apache.cassandra.service.accord.db.AccordUpdate.UpdatePredicate.Type;
public class AccordTxnBuilder
{
- private Set<AccordKey.PartitionKey> keys = new HashSet<>();
+ private Set<PartitionKey> keys = new HashSet<>();
private List<SinglePartitionReadCommand> reads = new ArrayList<>();
private AccordQuery query = AccordQuery.ALL;
private List<AccordUpdate.AbstractUpdate> updates = new ArrayList<>();
@@ -77,7 +77,7 @@ public class AccordTxnBuilder
SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
for (SinglePartitionReadCommand command : selectQuery.queries)
{
- keys.add(AccordKey.of(command));
+ keys.add(PartitionKey.of(command));
reads.add(command);
}
return this;
@@ -95,7 +95,7 @@ public class AccordTxnBuilder
{
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
- keys.add(new AccordKey.PartitionKey(update.metadata().id, update.partitionKey()));
+ keys.add(new PartitionKey(update.metadata().id, update.partitionKey()));
updates.add(new AccordUpdate.SimpleUpdate(update));
}
}
@@ -108,7 +108,7 @@ public class AccordTxnBuilder
Preconditions.checkNotNull(metadata);
DecoratedKey partitionKey = metadata.partitioner.decorateKey(decompose(metadata.partitionKeyType, key));
- AccordKey.PartitionKey accordKey = new AccordKey.PartitionKey(metadata.id, partitionKey);
+ PartitionKey accordKey = new PartitionKey(metadata.id, partitionKey);
keys.add(accordKey);
updates.add(new AccordUpdate.AppendingUpdate(accordKey, appends));
@@ -129,7 +129,7 @@ public class AccordTxnBuilder
Preconditions.checkNotNull(metadata);
DecoratedKey partitionKey = metadata.partitioner.decorateKey(decompose(metadata.partitionKeyType, key));
- AccordKey.PartitionKey accordKey = new AccordKey.PartitionKey(metadata.id, partitionKey);
+ PartitionKey accordKey = new PartitionKey(metadata.id, partitionKey);
keys.add(accordKey);
updates.add(new AccordUpdate.IncrementingUpdate(accordKey, increments));
@@ -185,7 +185,7 @@ public class AccordTxnBuilder
public Txn build()
{
Key[] keyArray = keys.toArray(new Key[0]);
- Arrays.sort(keyArray, AccordRoutingKey::compareKeys);
+ Arrays.sort(keyArray, Key::compareTo);
predicates.sort(Comparator.comparing(UpdatePredicate::partitionKey));
if (updates.isEmpty())
{
diff --git a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
index e0397e2651..bd9061d350 100644
--- a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
+++ b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.Objects;
import com.google.common.collect.ImmutableList;
@@ -31,10 +30,11 @@ import accord.local.Command;
import accord.local.CommandListener;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
import accord.primitives.TxnId;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.async.AsyncContext;
import org.apache.cassandra.service.accord.serializers.CommandSerializers;
import org.apache.cassandra.utils.ObjectSizes;
@@ -130,7 +130,7 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
AccordCommand command = (AccordCommand) c;
AccordCommandStore commandStore = (AccordCommandStore) safeStore;
AsyncContext context = commandStore.getContext();
- PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId(), txnId), Collections.emptyList());
+ PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId(), txnId), Keys.EMPTY);
if (context.containsScopedItems(loadCtx))
{
// TODO (soon): determine if this can break anything by not waiting for the current operation to denormalize it's data
@@ -163,9 +163,9 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
static class CommandsForKeyListenerProxy extends ListenerProxy
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new CommandsForKeyListenerProxy(null));
- private final AccordKey.PartitionKey key;
+ private final PartitionKey key;
- public CommandsForKeyListenerProxy(AccordKey.PartitionKey key)
+ public CommandsForKeyListenerProxy(PartitionKey key)
{
this.key = key;
}
@@ -218,9 +218,9 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
@Override
public ByteBuffer identifier()
{
- ByteBuffer bytes = ByteBuffer.allocate((int) (1 + AccordKey.PartitionKey.serializer.serializedSize(key)));
+ ByteBuffer bytes = ByteBuffer.allocate((int) (1 + PartitionKey.serializer.serializedSize(key)));
ByteBufferAccessor.instance.putByte(bytes, 0, (byte) kind().ordinal());
- AccordKey.PartitionKey.serializer.serialize(key, bytes, ByteBufferAccessor.instance, 1);
+ PartitionKey.serializer.serialize(key, bytes, ByteBufferAccessor.instance, 1);
return bytes;
}
@@ -230,7 +230,7 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
AccordCommand command = (AccordCommand) c;
AccordCommandStore commandStore = (AccordCommandStore) safeStore;
AsyncContext context = commandStore.getContext();
- PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId()), ImmutableList.of(key));
+ PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId()), Keys.of(key));
if (context.containsScopedItems(loadCtx))
{
logger.trace("{}: synchronously updating listening cfk {}", c.txnId(), key);
@@ -265,7 +265,7 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
TxnId txnId = CommandSerializers.txnId.deserialize(src, accessor, offset);
return new CommandListenerProxy(txnId);
case COMMANDS_FOR_KEY:
- AccordKey.PartitionKey key = AccordKey.PartitionKey.serializer.deserialize(src, accessor, offset);
+ PartitionKey key = PartitionKey.serializer.deserialize(src, accessor, offset);
return new CommandsForKeyListenerProxy(key);
default:
throw new IOException("Unknown kind ordinal " + ordinal);
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 020197d995..22683d4b8e 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -21,16 +21,15 @@ package org.apache.cassandra.service.accord;
import java.io.IOException;
import accord.api.RoutingKey;
-import accord.primitives.KeyRange;
+import accord.primitives.Range;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.accord.api.AccordKey;
import org.apache.cassandra.service.accord.api.AccordRoutingKey;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
-public class TokenRange extends KeyRange.EndInclusive
+public class TokenRange extends Range.EndInclusive
{
public TokenRange(AccordRoutingKey start, AccordRoutingKey end)
{
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 6375bdc7b9..f8a8832a5d 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.service.accord.api;
+import java.util.concurrent.TimeUnit;
+
import accord.api.Agent;
import accord.api.Result;
import accord.local.Command;
@@ -25,6 +27,9 @@ import accord.local.Node;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static org.apache.cassandra.config.DatabaseDescriptor.getReadRpcTimeout;
+
public class AccordAgent implements Agent
{
@Override
@@ -54,7 +59,7 @@ public class AccordAgent implements Agent
@Override
public boolean isExpired(TxnId initiated, long now)
{
- // TODO: this
- return false;
+ // TODO: should distinguish between reads and writes
+ return now - initiated.real > getReadRpcTimeout(MICROSECONDS);
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordKey.java
deleted file mode 100644
index 05a47fb443..0000000000
--- a/src/java/org/apache/cassandra/service/accord/api/AccordKey.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord.api;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Preconditions;
-
-import accord.api.Key;
-import accord.api.RoutingKey;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.db.marshal.ValueAccessor;
-import org.apache.cassandra.db.partitions.Partition;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.ObjectSizes;
-
-public interface AccordKey extends Key
-{
- TableId tableId();
- PartitionPosition partitionKey();
-
- static AccordKey of(Key key)
- {
- return (AccordKey) key;
- }
-
- static PartitionKey of(Partition partition)
- {
- return new PartitionKey(partition.metadata().id, partition.partitionKey());
- }
-
- static PartitionKey of(SinglePartitionReadCommand command)
- {
- return new PartitionKey(command.metadata().id, command.partitionKey());
- }
-
- abstract class AbstractKey<T extends PartitionPosition> extends AccordRoutingKey.AbstractRoutingKey implements AccordKey
- {
- private final T key;
-
- public AbstractKey(TableId tableId, T key)
- {
- super(tableId);
- this.key = key;
- }
-
- @Override
- public Token token()
- {
- return partitionKey().getToken();
- }
-
- @Override
- public Kind kind()
- {
- return Kind.TOKEN;
- }
-
- @Override
- public T partitionKey()
- {
- return key;
- }
-
- @Override
- public RoutingKey toRoutingKey()
- {
- return this;
- }
- }
-
- class PartitionKey extends AbstractKey<DecoratedKey>
- {
- private static final long EMPTY_SIZE;
-
- static
- {
- DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER);
- EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, key));
- }
-
- public PartitionKey(TableId tableId, DecoratedKey key)
- {
- super(tableId, key);
- }
-
- @Override
- public String toString()
- {
- return "PartitionKey{" +
- "tableId=" + tableId() +
- ", key=" + partitionKey() +
- '}';
- }
-
- @Override
- public RoutingKey toRoutingKey()
- {
- return new TokenKey(tableId(), token());
- }
-
- public long estimatedSizeOnHeap()
- {
- return EMPTY_SIZE + ByteBufferAccessor.instance.size(partitionKey().getKey());
- }
-
- public static final Serializer serializer = new Serializer();
- public static class Serializer implements IVersionedSerializer<PartitionKey>
- {
- // TODO: add vint to value accessor and use vints
- private Serializer() {}
-
- @Override
- public void serialize(PartitionKey key, DataOutputPlus out, int version) throws IOException
- {
- key.tableId().serialize(out);
- ByteBufferUtil.writeWithShortLength(key.partitionKey().getKey(), out);
- }
-
- public <V> int serialize(PartitionKey key, V dst, ValueAccessor<V> accessor, int offset)
- {
- int position = offset;
- position += key.tableId().serialize(dst, accessor, position);
- ByteBuffer bytes = key.partitionKey().getKey();
- int numBytes = ByteBufferAccessor.instance.size(bytes);
- Preconditions.checkState(numBytes <= Short.MAX_VALUE);
- position += accessor.putShort(dst, position, (short) numBytes);
- position += accessor.copyByteBufferTo(bytes, 0, dst, position, numBytes);
- return position - offset;
-
- }
-
- @Override
- public PartitionKey deserialize(DataInputPlus in, int version) throws IOException
- {
- TableId tableId = TableId.deserialize(in);
- TableMetadata metadata = Schema.instance.getExistingTableMetadata(tableId);
- DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
- return new PartitionKey(tableId, key);
- }
-
- public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, int offset) throws IOException
- {
- TableId tableId = TableId.deserialize(src, accessor, offset);
- offset += TableId.serializedSize();
- TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
- int numBytes = accessor.getShort(src, offset);
- offset += TypeSizes.SHORT_SIZE;
- ByteBuffer bytes = ByteBuffer.allocate(numBytes);
- accessor.copyTo(src, offset, bytes, ByteBufferAccessor.instance, 0, numBytes);
- DecoratedKey key = metadata.partitioner.decorateKey(bytes);
- return new PartitionKey(tableId, key);
- }
-
- @Override
- public long serializedSize(PartitionKey key, int version)
- {
- return serializedSize(key);
- }
-
- public long serializedSize(PartitionKey key)
- {
- return key.tableId().serializedSize() + ByteBufferUtil.serializedSizeWithShortLength(key.partitionKey().getKey());
- }
- }
- }
-
- IVersionedSerializer<AccordKey> serializer = new IVersionedSerializer<AccordKey>()
- {
- @Override
- public void serialize(AccordKey key, DataOutputPlus out, int version) throws IOException
- {
- PartitionKey.serializer.serialize((PartitionKey) key, out, version);
- }
-
- @Override
- public AccordKey deserialize(DataInputPlus in, int version) throws IOException
- {
- return PartitionKey.serializer.deserialize(in, version);
- }
-
- @Override
- public long serializedSize(AccordKey key, int version)
- {
- return PartitionKey.serializer.serializedSize((PartitionKey) key, version);
- }
- };
-}
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
new file mode 100644
index 0000000000..f4066e9c1c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.api;
+
+import java.util.Objects;
+
+import accord.primitives.RoutableKey;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.TableId;
+
+public abstract class AccordRoutableKey implements RoutableKey
+{
+ final TableId tableId;
+
+ protected AccordRoutableKey(TableId tableId)
+ {
+ this.tableId = tableId;
+ }
+
+ public final TableId tableId() { return tableId; }
+ public abstract Token token();
+
+ @Override
+ public final int routingHash()
+ {
+ return token().tokenHash();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(tableId, routingHash());
+ }
+
+ public final int compareTo(RoutableKey that)
+ {
+ return compareTo((AccordRoutableKey) that);
+ }
+
+ public final int compareTo(AccordRoutableKey that)
+ {
+ int cmp = this.tableId().compareTo(that.tableId());
+ if (cmp != 0)
+ return cmp;
+
+ if (this instanceof AccordRoutingKey.SentinelKey || that instanceof AccordRoutingKey.SentinelKey)
+ {
+ int leftInt = this instanceof AccordRoutingKey.SentinelKey ? ((AccordRoutingKey.SentinelKey) this).asInt() : 0;
+ int rightInt = that instanceof AccordRoutingKey.SentinelKey ? ((AccordRoutingKey.SentinelKey) that).asInt() : 0;
+ return Integer.compare(leftInt, rightInt);
+ }
+
+ return this.token().compareTo(that.token());
+ }
+
+ @Override
+ public final boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AccordRoutableKey that = (AccordRoutableKey) o;
+ return compareTo(that) == 0;
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 543e497c07..ed9924395d 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -35,77 +35,38 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
-public interface AccordRoutingKey extends RoutingKey
+public abstract class AccordRoutingKey extends AccordRoutableKey implements RoutingKey
{
- enum Kind
+ enum RoutingKeyKind
{
TOKEN, SENTINEL;
}
- TableId tableId();
- Token token();
- Kind kind();
- long estimatedSizeOnHeap();
-
- static AccordRoutingKey of(Key key)
+ protected AccordRoutingKey(TableId tableId)
{
- return (AccordRoutingKey) key;
+ super(tableId);
}
- static int compare(AccordRoutingKey left, AccordRoutingKey right)
- {
- int cmp = left.tableId().compareTo(right.tableId());
- if (cmp != 0)
- return cmp;
-
- if (left instanceof SentinelKey || right instanceof SentinelKey)
- {
- int leftInt = left instanceof SentinelKey ? ((SentinelKey) left).asInt() : 0;
- int rightInt = right instanceof SentinelKey ? ((SentinelKey) right).asInt() : 0;
- return Integer.compare(leftInt, rightInt);
- }
-
- return left.token().compareTo(right.token());
- }
-
- static int compareKeys(Key left, Key right)
- {
- return compare((AccordRoutingKey) left, (AccordRoutingKey) right);
- }
-
- default int compareTo(AccordRoutingKey that)
- {
- return compare(this, that);
- }
+ public abstract RoutingKeyKind kindOfRoutingKey();
+ public abstract long estimatedSizeOnHeap();
- @Override
- default int routingHash()
+ public static AccordRoutingKey of(Key key)
{
- return token().tokenHash();
+ return (AccordRoutingKey) key;
}
- class SentinelKey implements AccordRoutingKey
+ public static class SentinelKey extends AccordRoutingKey
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new SentinelKey(null, true));
- private final TableId tableId;
private final boolean isMin;
private SentinelKey(TableId tableId, boolean isMin)
{
- this.tableId = tableId;
+ super(tableId);
this.isMin = isMin;
}
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- SentinelKey that = (SentinelKey) o;
- return isMin == that.isMin && tableId.equals(that.tableId);
- }
-
@Override
public int hashCode()
{
@@ -113,15 +74,9 @@ public interface AccordRoutingKey extends RoutingKey
}
@Override
- public int compareTo(RoutingKey that)
+ public RoutingKeyKind kindOfRoutingKey()
{
- return compare(this, (AccordRoutingKey) that);
- }
-
- @Override
- public Kind kind()
- {
- return Kind.SENTINEL;
+ return RoutingKeyKind.SENTINEL;
}
@Override
@@ -140,12 +95,6 @@ public interface AccordRoutingKey extends RoutingKey
return new SentinelKey(tableId, false);
}
- @Override
- public TableId tableId()
- {
- return tableId;
- }
-
@Override
public Token token()
{
@@ -191,44 +140,7 @@ public interface AccordRoutingKey extends RoutingKey
};
}
- abstract class AbstractRoutingKey implements AccordRoutingKey
- {
- private final TableId tableId;
-
- public AbstractRoutingKey(TableId tableId)
- {
- this.tableId = tableId;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- AbstractRoutingKey that = (AbstractRoutingKey) o;
- return tableId.equals(that.tableId) && token().equals(that.token());
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(tableId, token());
- }
-
- @Override
- public int compareTo(RoutingKey that)
- {
- return compare(this, (AccordRoutingKey) that);
- }
-
- @Override
- public TableId tableId()
- {
- return tableId;
- }
- }
-
- class TokenKey extends AbstractRoutingKey
+ public static class TokenKey extends AccordRoutingKey
{
private static final long EMPTY_SIZE;
@@ -252,9 +164,9 @@ public interface AccordRoutingKey extends RoutingKey
}
@Override
- public Kind kind()
+ public RoutingKeyKind kindOfRoutingKey()
{
- return Kind.TOKEN;
+ return RoutingKeyKind.TOKEN;
}
@Override
@@ -300,13 +212,14 @@ public interface AccordRoutingKey extends RoutingKey
}
}
- IVersionedSerializer<AccordRoutingKey> serializer = new IVersionedSerializer<AccordRoutingKey>()
+ public static final IVersionedSerializer<AccordRoutingKey> serializer = new IVersionedSerializer<AccordRoutingKey>()
{
+ final RoutingKeyKind[] kinds = RoutingKeyKind.values();
@Override
public void serialize(AccordRoutingKey key, DataOutputPlus out, int version) throws IOException
{
- out.write(key.kind().ordinal());
- switch (key.kind())
+ out.write(key.kindOfRoutingKey().ordinal());
+ switch (key.kindOfRoutingKey())
{
case TOKEN:
TokenKey.serializer.serialize((TokenKey) key, out, version);
@@ -322,7 +235,7 @@ public interface AccordRoutingKey extends RoutingKey
@Override
public AccordRoutingKey deserialize(DataInputPlus in, int version) throws IOException
{
- Kind kind = Kind.values()[in.readByte()];
+ RoutingKeyKind kind = kinds[in.readByte()];
switch (kind)
{
case TOKEN:
@@ -338,7 +251,7 @@ public interface AccordRoutingKey extends RoutingKey
public long serializedSize(AccordRoutingKey key, int version)
{
long size = TypeSizes.BYTE_SIZE; // kind ordinal
- switch (key.kind())
+ switch (key.kindOfRoutingKey())
{
case TOKEN:
size += TokenKey.serializer.serializedSize((TokenKey) key, version);
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
new file mode 100644
index 0000000000..18296d4adc
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+
+import accord.api.Key;
+import accord.api.RoutingKey;
+import accord.primitives.Routable;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class PartitionKey extends AccordRoutableKey implements Key
+{
+ private static final long EMPTY_SIZE;
+
+ static
+ {
+ DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, key));
+ }
+
+ final DecoratedKey key;
+
+ public PartitionKey(TableId tableId, DecoratedKey key)
+ {
+ super(tableId);
+ this.key = key;
+ }
+
+ public static PartitionKey of(Key key)
+ {
+ return (PartitionKey) key;
+ }
+
+ public static PartitionKey of(Partition partition)
+ {
+ return new PartitionKey(partition.metadata().id, partition.partitionKey());
+ }
+
+ public static PartitionKey of(SinglePartitionReadCommand command)
+ {
+ return new PartitionKey(command.metadata().id, command.partitionKey());
+ }
+
+ @Override
+ public Token token()
+ {
+ return partitionKey().getToken();
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return key;
+ }
+
+ @Override
+ public RoutingKey toUnseekable()
+ {
+ return new TokenKey(tableId(), token());
+ }
+
+ public long estimatedSizeOnHeap()
+ {
+ return EMPTY_SIZE + ByteBufferAccessor.instance.size(partitionKey().getKey());
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PartitionKey{" +
+ "tableId=" + tableId() +
+ ", key=" + partitionKey() +
+ '}';
+ }
+
+ // TODO: callers to this method are not correctly handling ranges
+ public static PartitionKey toPartitionKey(Routable routable)
+ {
+ return (PartitionKey) routable;
+ }
+
+ public static final Serializer serializer = new Serializer();
+ public static class Serializer implements IVersionedSerializer<PartitionKey>
+ {
+ // TODO: add vint to value accessor and use vints
+ private Serializer() {}
+
+ @Override
+ public void serialize(PartitionKey key, DataOutputPlus out, int version) throws IOException
+ {
+ key.tableId().serialize(out);
+ ByteBufferUtil.writeWithShortLength(key.partitionKey().getKey(), out);
+ }
+
+ public <V> int serialize(PartitionKey key, V dst, ValueAccessor<V> accessor, int offset)
+ {
+ int position = offset;
+ position += key.tableId().serialize(dst, accessor, position);
+ ByteBuffer bytes = key.partitionKey().getKey();
+ int numBytes = ByteBufferAccessor.instance.size(bytes);
+ Preconditions.checkState(numBytes <= Short.MAX_VALUE);
+ position += accessor.putShort(dst, position, (short) numBytes);
+ position += accessor.copyByteBufferTo(bytes, 0, dst, position, numBytes);
+ return position - offset;
+
+ }
+
+ @Override
+ public PartitionKey deserialize(DataInputPlus in, int version) throws IOException
+ {
+ TableId tableId = TableId.deserialize(in);
+ TableMetadata metadata = Schema.instance.getExistingTableMetadata(tableId);
+ DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
+ return new PartitionKey(tableId, key);
+ }
+
+ public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, int offset) throws IOException
+ {
+ TableId tableId = TableId.deserialize(src, accessor, offset);
+ offset += TableId.serializedSize();
+ TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
+ int numBytes = accessor.getShort(src, offset);
+ offset += TypeSizes.SHORT_SIZE;
+ ByteBuffer bytes = ByteBuffer.allocate(numBytes);
+ accessor.copyTo(src, offset, bytes, ByteBufferAccessor.instance, 0, numBytes);
+ DecoratedKey key = metadata.partitioner.decorateKey(bytes);
+ return new PartitionKey(tableId, key);
+ }
+
+ @Override
+ public long serializedSize(PartitionKey key, int version)
+ {
+ return serializedSize(key);
+ }
+
+ public long serializedSize(PartitionKey key)
+ {
+ return key.tableId().serializedSize() + ByteBufferUtil.serializedSizeWithShortLength(key.partitionKey().getKey());
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java b/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java
index 0b3c87536f..4a307333a4 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.service.accord.AccordPartialCommand;
import org.apache.cassandra.service.accord.AccordState;
import org.apache.cassandra.service.accord.AccordStateCache;
import org.apache.cassandra.service.accord.AccordState.WriteOnly;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
public class AsyncContext
{
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 74c25739e1..3bbc9c2828 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.service.accord.AccordCommandsForKey;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordStateCache;
import org.apache.cassandra.service.accord.AccordState;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 7f5bc1a1c5..d52fcb1439 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -27,13 +27,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import accord.api.Key;
import accord.local.CommandStore;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
+import accord.primitives.Seekables;
import accord.primitives.TxnId;
import org.apache.cassandra.service.accord.AccordCommandStore;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runnable, Function<SafeCommandStore, R>, BiConsumer<Object, Throwable>
@@ -208,9 +208,17 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna
}
}
- private static Iterable<PartitionKey> toPartitionKeys(Iterable<? extends Key> iterable)
+ private static Iterable<PartitionKey> toPartitionKeys(Seekables<?, ?> keys)
{
- return (Iterable<PartitionKey>) iterable;
+ switch (keys.kindOfContents())
+ {
+ default: throw new AssertionError();
+ case Key:
+ return (Iterable<PartitionKey>) keys;
+ case Range:
+ // TODO: implement
+ throw new UnsupportedOperationException();
+ }
}
static class ForFunction<R> extends AsyncOperation<R>
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index 64f6bdeab0..cd8aca5ab5 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -32,7 +32,8 @@ import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Key;
+import accord.primitives.Routable;
+import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import org.apache.cassandra.concurrent.Stage;
@@ -44,7 +45,7 @@ import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordPartialCommand;
import org.apache.cassandra.service.accord.AccordStateCache;
import org.apache.cassandra.service.accord.AccordState;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.store.StoredSet;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
@@ -259,8 +260,11 @@ public class AsyncWriter
// might not reflect the update timestamp in the map? Probably best addressed following Blake's refactor.
if (command.known().isDefinitionKnown() && AccordPartialCommand.serializer.needsUpdate(command))
{
- for (Key key : command.partialTxn().keys())
+ for (Seekable key : command.partialTxn().keys())
{
+ // TODO: implement
+ if (key.kind() == Routable.Kind.Range)
+ throw new UnsupportedOperationException();
PartitionKey partitionKey = (PartitionKey) key;
AccordCommandsForKey cfk = cfkForDenormalization(partitionKey, context);
cfk.updateSummaries(command);
diff --git a/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java b/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java
index 9e1fb75a9e..ec4b260934 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java
@@ -22,23 +22,15 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Objects;
-import java.util.TreeMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import com.google.common.base.Preconditions;
-
import accord.api.Key;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
import accord.primitives.Keys;
+import accord.primitives.Ranges;
import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -46,7 +38,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.accord.AccordObjectSizes;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -148,7 +140,7 @@ public abstract class AbstractKeyIndexed<T>
serialized[this.keys.indexOf(keyFunction.apply(items.get(i)))] = serialize(items.get(i));
}
- protected <V> V slice(KeyRanges ranges, BiFunction<Keys, ByteBuffer[], V> constructor)
+ protected <V> V slice(Ranges ranges, BiFunction<Keys, ByteBuffer[], V> constructor)
{
// TODO: Routables patch permits us to do this more efficiently
Keys keys = this.keys.slice(ranges);
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordData.java b/src/java/org/apache/cassandra/service/accord/db/AccordData.java
index b5feba8901..59fefe930b 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordData.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordData.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.TreeMap;
import com.google.common.base.Preconditions;
@@ -43,8 +42,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ObjectSizes;
public class AccordData extends AbstractKeyIndexed<FilteredPartition> implements Data, Result, Iterable<FilteredPartition>
@@ -58,7 +56,7 @@ public class AccordData extends AbstractKeyIndexed<FilteredPartition> implements
public AccordData(FilteredPartition partition)
{
- this(Keys.of(AccordKey.of(partition)), new ByteBuffer[] { serialize(partition, partitionSerializer) });
+ this(Keys.of(PartitionKey.of(partition)), new ByteBuffer[] { serialize(partition, partitionSerializer) });
}
public AccordData(List<FilteredPartition> items)
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordRead.java b/src/java/org/apache/cassandra/service/accord/db/AccordRead.java
index a0a7532df3..4df86b07c9 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordRead.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordRead.java
@@ -30,10 +30,9 @@ import accord.api.Data;
import accord.api.DataStore;
import accord.api.Key;
import accord.api.Read;
-import accord.local.CommandStore;
import accord.local.SafeCommandStore;
-import accord.primitives.KeyRanges;
import accord.primitives.Keys;
+import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import org.apache.cassandra.concurrent.Stage;
@@ -48,8 +47,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.accord.AccordCommandsForKey;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -60,7 +58,7 @@ public class AccordRead extends AbstractKeyIndexed<SinglePartitionReadCommand> i
public AccordRead(List<SinglePartitionReadCommand> items)
{
- super(items, AccordKey::of);
+ super(items, PartitionKey::of);
}
public AccordRead(Keys keys, ByteBuffer[] serialized)
@@ -128,7 +126,7 @@ public class AccordRead extends AbstractKeyIndexed<SinglePartitionReadCommand> i
}
@Override
- public Read slice(KeyRanges ranges)
+ public Read slice(Ranges ranges)
{
return super.slice(ranges, AccordRead::new);
}
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java b/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java
index 6ea2c4de41..a25ce6c874 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java
@@ -42,8 +42,8 @@ import accord.api.Data;
import accord.api.Key;
import accord.api.Update;
import accord.api.Write;
-import accord.primitives.KeyRanges;
import accord.primitives.Keys;
+import accord.primitives.Ranges;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Columns;
@@ -71,8 +71,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.serializers.KeySerializers;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
@@ -761,7 +760,7 @@ public class AccordUpdate implements Update
for (AbstractUpdate updater : deserialize(updates, AbstractUpdate.serializer))
{
PartitionUpdate update = updater.apply(read.get(updater.partitionKey()));
- PartitionKey key = AccordKey.of(update);
+ PartitionKey key = PartitionKey.of(update);
if (updateMap.containsKey(key))
update = PartitionUpdate.merge(Lists.newArrayList(updateMap.get(key), update));
updateMap.put(key, update);
@@ -770,7 +769,7 @@ public class AccordUpdate implements Update
}
@Override
- public Update slice(KeyRanges ranges)
+ public Update slice(Ranges ranges)
{
Keys keys = this.keys.slice(ranges);
return new AccordUpdate(keys, select(this.keys, keys, updates), select(this.keys, keys, predicates));
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java b/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java
index 5584b7c3cc..ddaf89e813 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java
@@ -38,8 +38,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.accord.AccordCommandsForKey;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -52,7 +51,7 @@ public class AccordWrite extends AbstractKeyIndexed<PartitionUpdate> implements
public AccordWrite(List<PartitionUpdate> items)
{
- super(items, AccordKey::of);
+ super(items, PartitionKey::of);
}
public AccordWrite(Keys keys, ByteBuffer[] serialized)
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index a635ccc671..91bf1fed10 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -43,18 +43,18 @@ public class AcceptSerializers
{
CommandSerializers.ballot.serialize(accept.ballot, out, version);
CommandSerializers.timestamp.serialize(accept.executeAt, out, version);
- KeySerializers.keys.serialize(accept.keys, out, version);
+ KeySerializers.seekables.serialize(accept.keys, out, version);
DepsSerializer.partialDeps.serialize(accept.partialDeps, out, version);
CommandSerializers.kind.serialize(accept.kind, out, version);
}
@Override
- public Accept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
+ public Accept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
{
return create(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey,
CommandSerializers.ballot.deserialize(in, version),
CommandSerializers.timestamp.deserialize(in, version),
- KeySerializers.keys.deserialize(in, version),
+ KeySerializers.seekables.deserialize(in, version),
DepsSerializer.partialDeps.deserialize(in, version),
CommandSerializers.kind.deserialize(in, version));
}
@@ -64,7 +64,7 @@ public class AcceptSerializers
{
return CommandSerializers.ballot.serializedSize(accept.ballot, version)
+ CommandSerializers.timestamp.serializedSize(accept.executeAt, version)
- + KeySerializers.keys.serializedSize(accept.keys, version)
+ + KeySerializers.seekables.serializedSize(accept.keys, version)
+ DepsSerializer.partialDeps.serializedSize(accept.partialDeps, version)
+ CommandSerializers.kind.serializedSize(accept.kind, version);
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index 79d9b1b2b0..e14902e7a4 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -21,12 +21,11 @@ package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
import accord.api.RoutingKey;
-import accord.local.SaveStatus;
import accord.local.Status;
import accord.messages.BeginInvalidation;
import accord.messages.BeginInvalidation.InvalidateReply;
-import accord.primitives.AbstractRoute;
import accord.primitives.Ballot;
+import accord.primitives.Route;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -44,7 +43,7 @@ public class BeginInvalidationSerializers
public void serialize(BeginInvalidation begin, DataOutputPlus out, int version) throws IOException
{
CommandSerializers.txnId.serialize(begin.txnId, out, version);
- KeySerializers.routingKeys.serialize(begin.someKeys, out, version);
+ KeySerializers.unseekables.serialize(begin.someUnseekables, out, version);
CommandSerializers.ballot.serialize(begin.ballot, out, version);
}
@@ -52,7 +51,7 @@ public class BeginInvalidationSerializers
public BeginInvalidation deserialize(DataInputPlus in, int version) throws IOException
{
return new BeginInvalidation(CommandSerializers.txnId.deserialize(in, version),
- KeySerializers.routingKeys.deserialize(in, version),
+ KeySerializers.unseekables.deserialize(in, version),
CommandSerializers.ballot.deserialize(in, version));
}
@@ -60,7 +59,7 @@ public class BeginInvalidationSerializers
public long serializedSize(BeginInvalidation begin, int version)
{
return CommandSerializers.txnId.serializedSize(begin.txnId, version)
- + KeySerializers.routingKeys.serializedSize(begin.someKeys, version)
+ + KeySerializers.unseekables.serializedSize(begin.someUnseekables, version)
+ CommandSerializers.ballot.serializedSize(begin.ballot, version);
}
};
@@ -74,7 +73,7 @@ public class BeginInvalidationSerializers
CommandSerializers.ballot.serialize(reply.accepted, out, version);
CommandSerializers.status.serialize(reply.status, out, version);
out.writeBoolean(reply.acceptedFastPath);
- serializeNullable(KeySerializers.abstractRoute, reply.route, out, version);
+ serializeNullable(KeySerializers.route, reply.route, out, version);
serializeNullable(KeySerializers.routingKey, reply.homeKey, out, version);
}
@@ -85,7 +84,7 @@ public class BeginInvalidationSerializers
Ballot accepted = CommandSerializers.ballot.deserialize(in, version);
Status status = CommandSerializers.status.deserialize(in, version);
boolean acceptedFastPath = in.readBoolean();
- AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version);
+ Route<?> route = deserializeNullable(KeySerializers.route, in, version);
RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
return new InvalidateReply(supersededBy, accepted, status, acceptedFastPath, route, homeKey);
}
@@ -97,7 +96,7 @@ public class BeginInvalidationSerializers
+ CommandSerializers.ballot.serializedSize(reply.accepted, version)
+ CommandSerializers.status.serializedSize(reply.status, version)
+ TypeSizes.BOOL_SIZE
- + serializedSizeNullable(KeySerializers.abstractRoute, reply.route, version)
+ + serializedSizeNullable(KeySerializers.route, reply.route, version)
+ serializedSizeNullable(KeySerializers.routingKey, reply.homeKey, version);
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index 7076dcd1cb..9e359f8e01 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -29,13 +29,14 @@ import accord.messages.CheckStatus.CheckStatusNack;
import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.CheckStatusOkFull;
import accord.messages.CheckStatus.CheckStatusReply;
-import accord.primitives.AbstractRoute;
import accord.primitives.Ballot;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Routables;
+import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import accord.primitives.Writes;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -58,7 +59,7 @@ public class CheckStatusSerializers
public void serialize(CheckStatus check, DataOutputPlus out, int version) throws IOException
{
CommandSerializers.txnId.serialize(check.txnId, out, version);
- KeySerializers.routingKeys.serialize(check.someKeys, out, version);
+ KeySerializers.unseekables.serialize(check.query, out, version);
out.writeUnsignedVInt(check.startEpoch);
out.writeUnsignedVInt(check.endEpoch - check.startEpoch);
out.writeByte(check.includeInfo.ordinal());
@@ -68,18 +69,18 @@ public class CheckStatusSerializers
public CheckStatus deserialize(DataInputPlus in, int version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
- RoutingKeys someKeys = KeySerializers.routingKeys.deserialize(in, version);
+ Unseekables<?, ?> query = KeySerializers.unseekables.deserialize(in, version);
long startEpoch = in.readUnsignedVInt();
long endEpoch = in.readUnsignedVInt() + startEpoch;
CheckStatus.IncludeInfo info = infos[in.readByte()];
- return new CheckStatus(txnId, someKeys, startEpoch, endEpoch, info);
+ return new CheckStatus(txnId, query, startEpoch, endEpoch, info);
}
@Override
public long serializedSize(CheckStatus check, int version)
{
return CommandSerializers.txnId.serializedSize(check.txnId, version)
- + KeySerializers.routingKeys.serializedSize(check.someKeys, version)
+ + KeySerializers.unseekables.serializedSize(check.query, version)
+ TypeSizes.sizeofUnsignedVInt(check.startEpoch)
+ TypeSizes.sizeofUnsignedVInt(check.endEpoch - check.startEpoch)
+ TypeSizes.BYTE_SIZE;
@@ -109,7 +110,7 @@ public class CheckStatusSerializers
serializeNullable(ok.executeAt, out, version, CommandSerializers.timestamp);
out.writeBoolean(ok.isCoordinating);
CommandSerializers.durability.serialize(ok.durability, out, version);
- serializeNullable(ok.route, out, version, KeySerializers.abstractRoute);
+ serializeNullable(ok.route, out, version, KeySerializers.route);
serializeNullable(ok.homeKey, out, version, KeySerializers.routingKey);
if (!(reply instanceof CheckStatusOkFull))
@@ -139,7 +140,7 @@ public class CheckStatusSerializers
Timestamp executeAt = deserializeNullable(in, version, CommandSerializers.timestamp);
boolean isCoordinating = in.readBoolean();
Durability durability = CommandSerializers.durability.deserialize(in, version);
- AbstractRoute route = deserializeNullable(in, version, KeySerializers.abstractRoute);
+ Route<?> route = deserializeNullable(in, version, KeySerializers.route);
RoutingKey homeKey = deserializeNullable(in, version, KeySerializers.routingKey);
if (kind == OK)
@@ -169,7 +170,7 @@ public class CheckStatusSerializers
size += TypeSizes.BOOL_SIZE;
size += CommandSerializers.durability.serializedSize(ok.durability, version);
size += serializedSizeNullable(ok.homeKey, version, KeySerializers.routingKey);
- size += serializedSizeNullable(ok.route, version, KeySerializers.abstractRoute);
+ size += serializedSizeNullable(ok.route, version, KeySerializers.route);
if (!(reply instanceof CheckStatusOkFull))
return size;
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index ef19dbe2d0..9c74cbb29e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -27,9 +27,9 @@ import accord.local.SaveStatus;
import accord.local.Status;
import accord.local.Status.Durability;
import accord.primitives.Ballot;
-import accord.primitives.KeyRanges;
-import accord.primitives.Keys;
import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -131,7 +131,7 @@ public class CommandSerializers
{
CommandSerializers.kind.serialize(txn.kind(), out, version);
KeySerializers.ranges.serialize(txn.covering(), out, version);
- KeySerializers.keys.serialize(txn.keys(), out, version);
+ KeySerializers.seekables.serialize(txn.keys(), out, version);
AccordRead.serializer.serialize((AccordRead) txn.read(), out, version);
AccordQuery.serializer.serialize((AccordQuery) txn.query(), out, version);
out.writeBoolean(txn.update() != null);
@@ -143,8 +143,8 @@ public class CommandSerializers
public PartialTxn deserialize(DataInputPlus in, int version) throws IOException
{
Txn.Kind kind = CommandSerializers.kind.deserialize(in, version);
- KeyRanges covering = KeySerializers.ranges.deserialize(in, version);
- Keys keys = KeySerializers.keys.deserialize(in, version);
+ Ranges covering = KeySerializers.ranges.deserialize(in, version);
+ Seekables<?, ?> keys = KeySerializers.seekables.deserialize(in, version);
AccordRead read = AccordRead.serializer.deserialize(in, version);
AccordQuery query = AccordQuery.serializer.deserialize(in, version);
AccordUpdate update = in.readBoolean() ? AccordUpdate.serializer.deserialize(in, version) : null;
@@ -156,7 +156,7 @@ public class CommandSerializers
{
long size = CommandSerializers.kind.serializedSize(txn.kind(), version);
size += KeySerializers.ranges.serializedSize(txn.covering(), version);
- size += KeySerializers.keys.serializedSize(txn.keys(), version);
+ size += KeySerializers.seekables.serializedSize(txn.keys(), version);
size += AccordRead.serializer.serializedSize((AccordRead) txn.read(), version);
size += AccordQuery.serializer.serializedSize((AccordQuery) txn.query(), version);
size += TypeSizes.sizeof(txn.update() != null);
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
index 0093e8095e..c43489fb83 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
@@ -22,8 +22,9 @@ import java.io.IOException;
import accord.messages.Commit;
import accord.primitives.PartialRoute;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Routables;
import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -43,18 +44,18 @@ public class CommitSerializers
CommandSerializers.timestamp.serialize(msg.executeAt, out, version);
serializeNullable(msg.partialTxn, out, version, CommandSerializers.partialTxn);
DepsSerializer.partialDeps.serialize(msg.partialDeps, out, version);
- serializeNullable(msg.route, out, version, KeySerializers.route);
+ serializeNullable(msg.route, out, version, KeySerializers.fullRoute);
serializeNullable(msg.read, out, version, ReadDataSerializers.request);
}
@Override
- public Commit deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException
+ public Commit deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException
{
return Commit.SerializerSupport.create(txnId, scope, waitForEpoch,
CommandSerializers.timestamp.deserialize(in, version),
deserializeNullable(in, version, CommandSerializers.partialTxn),
DepsSerializer.partialDeps.deserialize(in, version),
- deserializeNullable(in, version, KeySerializers.route),
+ deserializeNullable(in, version, KeySerializers.fullRoute),
deserializeNullable(in, version, ReadDataSerializers.request)
);
}
@@ -65,7 +66,7 @@ public class CommitSerializers
return CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
+ serializedSizeNullable(msg.partialTxn, version, CommandSerializers.partialTxn)
+ DepsSerializer.partialDeps.serializedSize(msg.partialDeps, version)
- + serializedSizeNullable(msg.route, version, KeySerializers.route)
+ + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
+ serializedSizeNullable(msg.read, version, ReadDataSerializers.request);
}
};
@@ -76,7 +77,7 @@ public class CommitSerializers
public void serialize(Commit.Invalidate invalidate, DataOutputPlus out, int version) throws IOException
{
CommandSerializers.txnId.serialize(invalidate.txnId, out, version);
- KeySerializers.routingKeys.serialize(invalidate.scope, out, version);
+ KeySerializers.unseekables.serialize(invalidate.scope, out, version);
out.writeUnsignedVInt(invalidate.waitForEpoch);
out.writeUnsignedVInt(invalidate.invalidateUntilEpoch - invalidate.waitForEpoch);
}
@@ -85,17 +86,17 @@ public class CommitSerializers
public Commit.Invalidate deserialize(DataInputPlus in, int version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
- RoutingKeys routingKeys = KeySerializers.routingKeys.deserialize(in, version);
+ Unseekables<?, ?> scope = KeySerializers.unseekables.deserialize(in, version);
long waitForEpoch = in.readUnsignedVInt();
long invalidateUntilEpoch = in.readUnsignedVInt() + waitForEpoch;
- return Commit.Invalidate.SerializerSupport.create(txnId, routingKeys, waitForEpoch, invalidateUntilEpoch);
+ return Commit.Invalidate.SerializerSupport.create(txnId, scope, waitForEpoch, invalidateUntilEpoch);
}
@Override
public long serializedSize(Commit.Invalidate invalidate, int version)
{
return CommandSerializers.txnId.serializedSize(invalidate.txnId, version)
- + KeySerializers.routingKeys.serializedSize(invalidate.scope, version)
+ + KeySerializers.unseekables.serializedSize(invalidate.scope, version)
+ TypeSizes.sizeofUnsignedVInt(invalidate.waitForEpoch)
+ TypeSizes.sizeofUnsignedVInt(invalidate.invalidateUntilEpoch - invalidate.waitForEpoch);
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
index 2917862c13..22b80554f1 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
@@ -21,9 +21,9 @@ package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
import accord.primitives.Deps;
-import accord.primitives.KeyRanges;
import accord.primitives.Keys;
import accord.primitives.PartialDeps;
+import accord.primitives.Ranges;
import accord.primitives.TxnId;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -49,7 +49,7 @@ public abstract class DepsSerializer<D extends Deps> implements IVersionedSerial
@Override
PartialDeps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, DataInputPlus in, int version) throws IOException
{
- KeyRanges covering = KeySerializers.ranges.deserialize(in, version);
+ Ranges covering = KeySerializers.ranges.deserialize(in, version);
return PartialDeps.SerializerSupport.create(covering, keys, txnIds, keyToTxnIds);
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java
index eb8acf1a67..1de5b96a5b 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java
@@ -22,8 +22,8 @@ import java.io.IOException;
import accord.messages.GetDeps;
import accord.messages.GetDeps.GetDepsOk;
-import accord.primitives.Keys;
import accord.primitives.PartialRoute;
+import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -38,15 +38,15 @@ public class GetDepsSerializers
@Override
public void serializeBody(GetDeps msg, DataOutputPlus out, int version) throws IOException
{
- KeySerializers.keys.serialize(msg.keys, out, version);
+ KeySerializers.seekables.serialize(msg.keys, out, version);
CommandSerializers.timestamp.serialize(msg.executeAt, out, version);
CommandSerializers.kind.serialize(msg.kind, out, version);
}
@Override
- public GetDeps deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
+ public GetDeps deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
{
- Keys keys = KeySerializers.keys.deserialize(in, version);
+ Seekables<?, ?> keys = KeySerializers.seekables.deserialize(in, version);
Timestamp executeAt = CommandSerializers.timestamp.deserialize(in, version);
Txn.Kind kind = CommandSerializers.kind.deserialize(in, version);
return GetDeps.SerializationSupport.create(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey, keys, executeAt, kind);
@@ -55,7 +55,7 @@ public class GetDepsSerializers
@Override
public long serializedBodySize(GetDeps msg, int version)
{
- return KeySerializers.keys.serializedSize(msg.keys, version)
+ return KeySerializers.seekables.serializedSize(msg.keys, version)
+ CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
+ CommandSerializers.kind.serializedSize(msg.kind, version);
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index 58f62254aa..646abd3b3d 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -19,60 +19,49 @@
package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.function.IntFunction;
import accord.api.Key;
import accord.api.RoutingKey;
import accord.primitives.AbstractKeys;
-import accord.primitives.AbstractRoute;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
+import accord.primitives.AbstractRanges;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.FullRangeRoute;
+import accord.primitives.FullRoute;
import accord.primitives.Keys;
+import accord.primitives.PartialKeyRoute;
+import accord.primitives.PartialRangeRoute;
import accord.primitives.PartialRoute;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
import accord.primitives.Route;
import accord.primitives.RoutingKeys;
+import accord.primitives.Seekables;
+import accord.primitives.Unseekables;
+import accord.primitives.Unseekables.UnseekablesKind;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.api.AccordRoutingKey;
public class KeySerializers
{
private KeySerializers() {}
- public static final IVersionedSerializer<Key> key = (IVersionedSerializer<Key>) (IVersionedSerializer<?>) AccordKey.serializer;
+ public static final IVersionedSerializer<Key> key = (IVersionedSerializer<Key>) (IVersionedSerializer<?>) PartitionKey.serializer;
public static final IVersionedSerializer<RoutingKey> routingKey = (IVersionedSerializer<RoutingKey>) (IVersionedSerializer<?>) AccordRoutingKey.serializer;
-
- public static final IVersionedSerializer<KeyRanges> ranges = new IVersionedSerializer<KeyRanges>()
+ public static final IVersionedSerializer<RoutingKeys> routingKeys = new AbstractKeysSerializer<RoutingKey, RoutingKeys>(routingKey, RoutingKey[]::new)
{
- @Override
- public void serialize(KeyRanges ranges, DataOutputPlus out, int version) throws IOException
- {
- out.writeUnsignedVInt(ranges.size());
- for (int i=0, mi=ranges.size(); i<mi; i++)
- TokenRange.serializer.serialize((TokenRange) ranges.get(i), out, version);
- }
-
- @Override
- public KeyRanges deserialize(DataInputPlus in, int version) throws IOException
- {
- KeyRange[] ranges = new KeyRange[(int)in.readUnsignedVInt()];
- for (int i=0; i<ranges.length; i++)
- ranges[i] = TokenRange.serializer.deserialize(in, version);
- return KeyRanges.ofSortedAndDeoverlapped(ranges);
- }
-
- @Override
- public long serializedSize(KeyRanges ranges, int version)
+ @Override RoutingKeys deserialize(DataInputPlus in, int version, RoutingKey[] keys)
{
- long size = TypeSizes.sizeofUnsignedVInt(ranges.size());
- for (int i=0, mi=ranges.size(); i<mi; i++)
- size += TokenRange.serializer.serializedSize((TokenRange) ranges.get(i), version);
- return size;
+ return RoutingKeys.SerializationSupport.create(keys);
}
};
@@ -83,24 +72,27 @@ public class KeySerializers
return Keys.SerializationSupport.create(keys);
}
};
- public static final IVersionedSerializer<RoutingKeys> routingKeys = new AbstractKeysSerializer<RoutingKey, RoutingKeys>(routingKey, RoutingKey[]::new)
+
+ public static final IVersionedSerializer<Ranges> ranges = new AbstractRangesSerializer<Ranges>()
{
- @Override RoutingKeys deserialize(DataInputPlus in, int version, RoutingKey[] keys)
+ @Override
+ public Ranges deserialize(DataInputPlus in, int version, Range[] ranges)
{
- return RoutingKeys.SerializationSupport.create(keys);
+ return Ranges.ofSortedAndDeoverlapped(ranges);
}
};
- public static final IVersionedSerializer<PartialRoute> partialRoute = new AbstractKeysSerializer<RoutingKey, PartialRoute>(routingKey, RoutingKey[]::new)
+
+ public static final IVersionedSerializer<PartialKeyRoute> partialKeyRoute = new AbstractKeysSerializer<RoutingKey, PartialKeyRoute>(routingKey, RoutingKey[]::new)
{
- @Override PartialRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
+ @Override PartialKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
{
- KeyRanges covering = ranges.deserialize(in, version);
+ Ranges covering = ranges.deserialize(in, version);
RoutingKey homeKey = routingKey.deserialize(in, version);
- return PartialRoute.SerializationSupport.create(covering, homeKey, keys);
+ return PartialKeyRoute.SerializationSupport.create(covering, homeKey, keys);
}
@Override
- public void serialize(PartialRoute keys, DataOutputPlus out, int version) throws IOException
+ public void serialize(PartialKeyRoute keys, DataOutputPlus out, int version) throws IOException
{
super.serialize(keys, out, version);
ranges.serialize(keys.covering, out, version);
@@ -108,80 +100,236 @@ public class KeySerializers
}
@Override
- public long serializedSize(PartialRoute keys, int version)
+ public long serializedSize(PartialKeyRoute keys, int version)
{
return super.serializedSize(keys, version)
+ ranges.serializedSize(keys.covering, version)
+ routingKey.serializedSize(keys.homeKey, version);
}
};
- public static final IVersionedSerializer<Route> route = new AbstractKeysSerializer<RoutingKey, Route>(routingKey, RoutingKey[]::new)
+
+ public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new AbstractKeysSerializer<RoutingKey, FullKeyRoute>(routingKey, RoutingKey[]::new)
{
- @Override Route deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
+ @Override FullKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
{
RoutingKey homeKey = routingKey.deserialize(in, version);
- return Route.SerializationSupport.create(homeKey, keys);
+ return FullKeyRoute.SerializationSupport.create(homeKey, keys);
}
@Override
- public void serialize(Route keys, DataOutputPlus out, int version) throws IOException
+ public void serialize(FullKeyRoute keys, DataOutputPlus out, int version) throws IOException
{
super.serialize(keys, out, version);
routingKey.serialize(keys.homeKey, out, version);
}
@Override
- public long serializedSize(Route keys, int version)
+ public long serializedSize(FullKeyRoute keys, int version)
{
return super.serializedSize(keys, version)
+ routingKey.serializedSize(keys.homeKey, version);
}
};
- public static final IVersionedSerializer<AbstractRoute> abstractRoute = new IVersionedSerializer<AbstractRoute>()
+ public static final IVersionedSerializer<PartialRangeRoute> partialRangeRoute = new AbstractRangesSerializer<PartialRangeRoute>()
{
+ @Override PartialRangeRoute deserialize(DataInputPlus in, int version, Range[] rs) throws IOException
+ {
+ Ranges covering = ranges.deserialize(in, version);
+ RoutingKey homeKey = routingKey.deserialize(in, version);
+ return PartialRangeRoute.SerializationSupport.create(covering, homeKey, rs);
+ }
+
+ @Override
+ public void serialize(PartialRangeRoute rs, DataOutputPlus out, int version) throws IOException
+ {
+ super.serialize(rs, out, version);
+ ranges.serialize(rs.covering, out, version);
+ routingKey.serialize(rs.homeKey, out, version);
+ }
+
@Override
- public void serialize(AbstractRoute t, DataOutputPlus out, int version) throws IOException
+ public long serializedSize(PartialRangeRoute rs, int version)
+ {
+ return super.serializedSize(rs, version)
+ + ranges.serializedSize(rs.covering, version)
+ + routingKey.serializedSize(rs.homeKey, version);
+ }
+ };
+
+ public static final IVersionedSerializer<FullRangeRoute> fullRangeRoute = new AbstractRangesSerializer<FullRangeRoute>()
+ {
+ @Override FullRangeRoute deserialize(DataInputPlus in, int version, Range[] Ranges) throws IOException
{
- if (t instanceof Route)
+ RoutingKey homeRange = routingKey.deserialize(in, version);
+ return FullRangeRoute.SerializationSupport.create(homeRange, Ranges);
+ }
+
+ @Override
+ public void serialize(FullRangeRoute Ranges, DataOutputPlus out, int version) throws IOException
+ {
+ super.serialize(Ranges, out, version);
+ routingKey.serialize(Ranges.homeKey, out, version);
+ }
+
+ @Override
+ public long serializedSize(FullRangeRoute ranges, int version)
+ {
+ return super.serializedSize(ranges, version)
+ + routingKey.serializedSize(ranges.homeKey(), version);
+ }
+ };
+
+ public static final IVersionedSerializer<Route<?>> route = new AbstractRoutablesSerializer<>(
+ EnumSet.of(UnseekablesKind.PartialKeyRoute, UnseekablesKind.FullKeyRoute, UnseekablesKind.PartialRangeRoute, UnseekablesKind.FullRangeRoute)
+ );
+
+ public static final IVersionedSerializer<PartialRoute<?>> partialRoute = new AbstractRoutablesSerializer<>(
+ EnumSet.of(UnseekablesKind.PartialKeyRoute, UnseekablesKind.PartialRangeRoute)
+ );
+
+ public static final IVersionedSerializer<FullRoute<?>> fullRoute = new AbstractRoutablesSerializer<>(
+ EnumSet.of(UnseekablesKind.FullKeyRoute, UnseekablesKind.FullRangeRoute)
+ );
+
+ public static final IVersionedSerializer<Unseekables<?, ?>> unseekables = new AbstractRoutablesSerializer<>(
+ EnumSet.allOf(UnseekablesKind.class)
+ );
+
+ static class AbstractRoutablesSerializer<RS extends Unseekables<?, ?>> implements IVersionedSerializer<RS>
+ {
+ final EnumSet<UnseekablesKind> permitted;
+ protected AbstractRoutablesSerializer(EnumSet<UnseekablesKind> permitted)
+ {
+ this.permitted = permitted;
+ }
+
+ @Override
+ public void serialize(RS t, DataOutputPlus out, int version) throws IOException
+ {
+ UnseekablesKind kind = t.kind();
+ if (!permitted.contains(kind))
+ throw new IllegalArgumentException();
+
+ switch (kind)
{
- out.writeByte(1);
- route.serialize((Route)t, out, version);
+ default: throw new AssertionError();
+ case RoutingKeys:
+ out.writeByte(1);
+ routingKeys.serialize((RoutingKeys)t, out, version);
+ break;
+ case PartialKeyRoute:
+ out.writeByte(2);
+ partialKeyRoute.serialize((PartialKeyRoute)t, out, version);
+ break;
+ case FullKeyRoute:
+ out.writeByte(3);
+ fullKeyRoute.serialize((FullKeyRoute)t, out, version);
+ break;
+ case RoutingRanges:
+ out.writeByte(4);
+ ranges.serialize((Ranges)t, out, version);
+ break;
+ case PartialRangeRoute:
+ out.writeByte(5);
+ partialRangeRoute.serialize((PartialRangeRoute)t, out, version);
+ break;
+ case FullRangeRoute:
+ out.writeByte(6);
+ fullRangeRoute.serialize((FullRangeRoute)t, out, version);
+ break;
}
- else
+ }
+
+ @Override
+ public RS deserialize(DataInputPlus in, int version) throws IOException
+ {
+ byte b = in.readByte();
+ UnseekablesKind kind;
+ RS result;
+ switch (b)
+ {
+ default: throw new IOException("Corrupted input: expected byte 1, 2, 3, 4 or 5; received " + b);
+ case 1: kind = UnseekablesKind.RoutingKeys; result = (RS)routingKeys.deserialize(in, version); break;
+ case 2: kind = UnseekablesKind.PartialKeyRoute; result = (RS)partialKeyRoute.deserialize(in, version); break;
+ case 3: kind = UnseekablesKind.FullKeyRoute; result = (RS)fullKeyRoute.deserialize(in, version); break;
+ case 4: kind = UnseekablesKind.RoutingRanges; result = (RS)ranges.deserialize(in, version); break;
+ case 5: kind = UnseekablesKind.PartialRangeRoute; result = (RS)partialRangeRoute.deserialize(in, version); break;
+ case 6: kind = UnseekablesKind.FullRangeRoute; result = (RS)fullRangeRoute.deserialize(in, version); break;
+ }
+ if (!permitted.contains(kind))
+ throw new IllegalStateException();
+ return result;
+ }
+
+ @Override
+ public long serializedSize(RS t, int version)
+ {
+ switch (t.kind())
+ {
+ default: throw new AssertionError();
+ case RoutingKeys:
+ return 1 + routingKeys.serializedSize((RoutingKeys)t, version);
+ case PartialKeyRoute:
+ return 1 + partialKeyRoute.serializedSize((PartialKeyRoute)t, version);
+ case FullKeyRoute:
+ return 1 + fullKeyRoute.serializedSize((FullKeyRoute)t, version);
+ case RoutingRanges:
+ return 1 + ranges.serializedSize((Ranges)t, version);
+ case PartialRangeRoute:
+ return 1 + partialRangeRoute.serializedSize((PartialRangeRoute)t, version);
+ case FullRangeRoute:
+ return 1 + fullRangeRoute.serializedSize((FullRangeRoute)t, version);
+ }
+ }
+ }
+
+ public static final IVersionedSerializer<Seekables<?, ?>> seekables = new IVersionedSerializer<Seekables<?, ?>>()
+ {
+ @Override
+ public void serialize(Seekables<?, ?> t, DataOutputPlus out, int version) throws IOException
+ {
+ switch (t.kindOfContents())
{
- out.writeByte(2);
- partialRoute.serialize((PartialRoute)t, out, version);
+ default: throw new AssertionError();
+ case Key:
+ out.writeByte(1);
+ keys.serialize((Keys)t, out, version);
+ break;
+ case Range:
+ out.writeByte(2);
+ ranges.serialize((Ranges)t, out, version);
+ break;
}
}
@Override
- public AbstractRoute deserialize(DataInputPlus in, int version) throws IOException
+ public Seekables<?, ?> deserialize(DataInputPlus in, int version) throws IOException
{
byte b = in.readByte();
switch (b)
{
default: throw new IOException("Corrupted input: expected byte 1 or 2, received " + b);
- case 1: return route.deserialize(in, version);
- case 2: return partialRoute.deserialize(in, version);
+ case 1: return keys.deserialize(in, version);
+ case 2: return ranges.deserialize(in, version);
}
}
@Override
- public long serializedSize(AbstractRoute t, int version)
+ public long serializedSize(Seekables<?, ?> t, int version)
{
- if (t instanceof Route)
- {
- return 1 + route.serializedSize((Route)t, version);
- }
- else
+ switch (t.kindOfContents())
{
- return 1 + partialRoute.serializedSize((PartialRoute)t, version);
+ default: throw new AssertionError();
+ case Key:
+ return 1 + keys.serializedSize((Keys)t, version);
+ case Range:
+ return 1 + ranges.serializedSize((Ranges)t, version);
}
}
};
- public static abstract class AbstractKeysSerializer<K extends RoutingKey, KS extends AbstractKeys<K, ?>> implements IVersionedSerializer<KS>
+ public static abstract class AbstractKeysSerializer<K extends RoutableKey, KS extends AbstractKeys<K, ?>> implements IVersionedSerializer<KS>
{
final IVersionedSerializer<K> keySerializer;
final IntFunction<K[]> allocate;
@@ -219,5 +367,36 @@ public class KeySerializers
size += keySerializer.serializedSize(keys.get(i), version);
return size;
}
- };
+ }
+
+ public static abstract class AbstractRangesSerializer<RS extends AbstractRanges<?>> implements IVersionedSerializer<RS>
+ {
+ @Override
+ public void serialize(RS ranges, DataOutputPlus out, int version) throws IOException
+ {
+ out.writeUnsignedVInt(ranges.size());
+ for (int i=0, mi=ranges.size(); i<mi; i++)
+ TokenRange.serializer.serialize((TokenRange) ranges.get(i), out, version);
+ }
+
+ @Override
+ public RS deserialize(DataInputPlus in, int version) throws IOException
+ {
+ Range[] ranges = new Range[(int)in.readUnsignedVInt()];
+ for (int i=0; i<ranges.length; i++)
+ ranges[i] = TokenRange.serializer.deserialize(in, version);
+ return deserialize(in, version, ranges);
+ }
+
+ abstract RS deserialize(DataInputPlus in, int version, Range[] ranges) throws IOException;
+
+ @Override
+ public long serializedSize(RS ranges, int version)
+ {
+ long size = TypeSizes.sizeofUnsignedVInt(ranges.size());
+ for (int i=0, mi=ranges.size(); i<mi; i++)
+ size += TokenRange.serializer.serializedSize((TokenRange) ranges.get(i), version);
+ return size;
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
index 43dd3f8c90..d4748d7a7f 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
@@ -25,9 +25,9 @@ import javax.annotation.Nullable;
import accord.messages.PreAccept;
import accord.messages.PreAccept.PreAcceptOk;
import accord.messages.PreAccept.PreAcceptReply;
+import accord.primitives.FullRoute;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
-import accord.primitives.Route;
import accord.primitives.TxnId;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -49,25 +49,25 @@ public class PreacceptSerializers
public void serializeBody(PreAccept msg, DataOutputPlus out, int version) throws IOException
{
CommandSerializers.partialTxn.serialize(msg.partialTxn, out, version);
- serializeNullable(msg.route, out, version, KeySerializers.route);
+ serializeNullable(msg.route, out, version, KeySerializers.fullRoute);
out.writeUnsignedVInt(msg.maxEpoch - msg.minEpoch);
}
@Override
- public PreAccept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
+ public PreAccept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
{
PartialTxn partialTxn = CommandSerializers.partialTxn.deserialize(in, version);
- @Nullable Route route = deserializeNullable(in, version, KeySerializers.route);
+ @Nullable FullRoute<?> fullRoute = deserializeNullable(in, version, KeySerializers.fullRoute);
long maxEpoch = in.readUnsignedVInt() + minEpoch;
return PreAccept.SerializerSupport.create(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey,
- maxEpoch, partialTxn, route);
+ maxEpoch, partialTxn, fullRoute);
}
@Override
public long serializedBodySize(PreAccept msg, int version)
{
return CommandSerializers.partialTxn.serializedSize(msg.partialTxn, version)
- + serializedSizeNullable(msg.route, version, KeySerializers.route)
+ + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
+ TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - msg.minEpoch);
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 85e633e47e..1e43221cef 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -25,6 +25,7 @@ import accord.messages.ReadData.ReadNack;
import accord.messages.ReadData.ReadOk;
import accord.messages.ReadData.ReadReply;
import accord.primitives.Keys;
+import accord.primitives.Seekables;
import accord.primitives.TxnId;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -40,7 +41,7 @@ public class ReadDataSerializers
public void serialize(ReadData read, DataOutputPlus out, int version) throws IOException
{
CommandSerializers.txnId.serialize(read.txnId, out, version);
- KeySerializers.keys.serialize(read.readScope, out, version);
+ KeySerializers.seekables.serialize(read.readScope, out, version);
out.writeUnsignedVInt(read.waitForEpoch());
out.writeUnsignedVInt(read.executeAtEpoch - read.waitForEpoch());
}
@@ -49,7 +50,7 @@ public class ReadDataSerializers
public ReadData deserialize(DataInputPlus in, int version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
- Keys readScope = KeySerializers.keys.deserialize(in, version);
+ Seekables<?, ?> readScope = KeySerializers.seekables.deserialize(in, version);
long waitForEpoch = in.readUnsignedVInt();
long executeAtEpoch = in.readUnsignedVInt() + waitForEpoch;
return ReadData.SerializerSupport.create(txnId, readScope, executeAtEpoch, waitForEpoch);
@@ -59,7 +60,7 @@ public class ReadDataSerializers
public long serializedSize(ReadData read, int version)
{
return CommandSerializers.txnId.serializedSize(read.txnId, version)
- + KeySerializers.keys.serializedSize(read.readScope, version)
+ + KeySerializers.seekables.serializedSize(read.readScope, version)
+ TypeSizes.sizeofUnsignedVInt(read.waitForEpoch())
+ TypeSizes.sizeofUnsignedVInt(read.executeAtEpoch - read.waitForEpoch());
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index 3acd76e36e..8c75fc584d 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -30,10 +30,10 @@ import accord.messages.BeginRecovery.RecoverOk;
import accord.messages.BeginRecovery.RecoverReply;
import accord.primitives.Ballot;
import accord.primitives.Deps;
+import accord.primitives.FullRoute;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
-import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
@@ -56,15 +56,15 @@ public class RecoverySerializers
{
CommandSerializers.partialTxn.serialize(recover.partialTxn, out, version);
CommandSerializers.ballot.serialize(recover.ballot, out, version);
- serializeNullable(KeySerializers.route, recover.route, out, version);
+ serializeNullable(KeySerializers.fullRoute, recover.route, out, version);
}
@Override
- public BeginRecovery deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException
+ public BeginRecovery deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException
{
PartialTxn partialTxn = CommandSerializers.partialTxn.deserialize(in, version);
Ballot ballot = CommandSerializers.ballot.deserialize(in, version);
- @Nullable Route route = deserializeNullable(KeySerializers.route, in, version);
+ @Nullable FullRoute<?> route = deserializeNullable(KeySerializers.fullRoute, in, version);
return BeginRecovery.SerializationSupport.create(txnId, scope, waitForEpoch, partialTxn, ballot, route);
}
@@ -73,7 +73,7 @@ public class RecoverySerializers
{
return CommandSerializers.partialTxn.serializedSize(recover.partialTxn, version)
+ CommandSerializers.ballot.serializedSize(recover.ballot, version)
- + serializedSizeNullable(KeySerializers.route, recover.route, version);
+ + serializedSizeNullable(KeySerializers.fullRoute, recover.route, version);
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
index 0d133f78af..1db73e38ab 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
@@ -46,13 +46,13 @@ public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements I
serializeBody(msg, out, version);
}
- public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException;
+ public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException;
@Override
public final T deserialize(DataInputPlus in, int version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
- PartialRoute scope = KeySerializers.partialRoute.deserialize(in, version);
+ PartialRoute<?> scope = KeySerializers.partialRoute.deserialize(in, version);
// TODO: there should be a base epoch
long waitForEpoch = in.readUnsignedVInt();
return deserializeBody(in, version, txnId, scope, waitForEpoch);
@@ -83,10 +83,10 @@ public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements I
out.writeBoolean(msg.doNotComputeProgressKey);
}
- public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException;
+ public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException;
@Override
- public final T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException
+ public final T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException
{
long minEpoch = in.readUnsignedVInt();
boolean doNotComputeProgressKey = in.readBoolean();
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java
index 2203914a01..742da8d462 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java
@@ -22,8 +22,9 @@ import java.io.IOException;
import accord.messages.WaitOnCommit;
import accord.messages.WaitOnCommit.WaitOnCommitOk;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Routables;
import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -36,14 +37,14 @@ public class WaitOnCommitSerializer
public void serialize(WaitOnCommit wait, DataOutputPlus out, int version) throws IOException
{
CommandSerializers.txnId.serialize(wait.txnId, out, version);
- KeySerializers.routingKeys.serialize(wait.scope, out, version);
+ KeySerializers.unseekables.serialize(wait.scope, out, version);
}
@Override
public WaitOnCommit deserialize(DataInputPlus in, int version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
- RoutingKeys scope = KeySerializers.routingKeys.deserialize(in, version);
+ Unseekables<?, ?> scope = KeySerializers.unseekables.deserialize(in, version);
return WaitOnCommit.SerializerSupport.create(txnId, scope);
}
@@ -51,7 +52,7 @@ public class WaitOnCommitSerializer
public long serializedSize(WaitOnCommit wait, int version)
{
return CommandSerializers.txnId.serializedSize(wait.txnId, version)
- + KeySerializers.routingKeys.serializedSize(wait.scope, version);
+ + KeySerializers.unseekables.serializedSize(wait.scope, version);
}
};
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
index 3fb6584d84..d1027c080e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
@@ -45,8 +45,10 @@ import accord.coordinate.Preempted;
import accord.local.Status;
import accord.messages.Commit;
import accord.primitives.Keys;
+import accord.primitives.Routables;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import accord.topology.Topologies;
import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.Clustering;
@@ -230,8 +232,8 @@ public class AccordIntegrationTest extends TestBaseImpl
.withWrite("INSERT INTO " + keyspace + ".tbl (k, c, v) VALUES (?, 0, ?)", key, i++)
.withCondition(keyspace, "tbl", key, 0, NOT_EXISTS);
}
- Keys keySet = txn.build().keys();
- Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(keySet, 1);
+ Unseekables<?, ?> routables = txn.build().keys().toUnseekables();
+ Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(routables, 1);
// currently we don't detect out-of-bounds read/write, so need this logic to validate we reach different
// shards
Assertions.assertThat(topology.totalShards()).isEqualTo(2);
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 53b762de2f..2cd933c7b4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import static accord.local.Status.Durability.Durable;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -78,7 +78,7 @@ public class AccordCommandStoreTest
{
AtomicLong clock = new AtomicLong(0);
PartialTxn depTxn = createPartialTxn(0);
- Key key = depTxn.keys().get(0);
+ Key key = (Key)depTxn.keys().get(0);
AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(depTxn.covering(), false);
@@ -92,8 +92,8 @@ public class AccordCommandStoreTest
TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
AccordCommand command = new AccordCommand(txnId).initialize();
command.setPartialTxn(createPartialTxn(0));
- command.homeKey(key.toRoutingKey());
- command.progressKey(key.toRoutingKey());
+ command.homeKey(key.toUnseekable());
+ command.progressKey(key.toUnseekable());
command.setDurability(Durable);
command.setPromised(ballot(1, clock.incrementAndGet(), 0, 1));
command.setAccepted(ballot(1, clock.incrementAndGet(), 0, 1));
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 54ac4736ac..67683661e1 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service.accord;
-import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,10 +36,11 @@ import accord.messages.Accept;
import accord.messages.Commit;
import accord.messages.PreAccept;
import accord.primitives.Ballot;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
import accord.primitives.PartialDeps;
import accord.primitives.PartialRoute;
import accord.primitives.PartialTxn;
-import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -49,7 +49,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -90,11 +90,11 @@ public class AccordCommandTest
TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
Txn txn = createTxn(1);
- Key key = txn.keys().get(0);
- RoutingKey homeKey = key.toRoutingKey();
- Route fullRoute = txn.keys().toRoute(homeKey);
- PartialRoute route = fullRoute.slice(fullRange(txn));
- PartialTxn partialTxn = txn.slice(route.covering, true);
+ Key key = (Key)txn.keys().get(0);
+ RoutingKey homeKey = key.toUnseekable();
+ FullRoute<?> fullRoute = txn.keys().toRoute(homeKey);
+ PartialRoute<?> route = fullRoute.slice(fullRange(txn));
+ PartialTxn partialTxn = txn.slice(route.covering(), true);
PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route, 1, 1, false, 1, partialTxn, fullRoute);
// Check preaccept
@@ -122,7 +122,7 @@ public class AccordCommandTest
// check accept
TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1);
Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1);
- PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(route.covering, false);
+ PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(route.covering(), false);
builder.add(key, txnId2);
PartialDeps deps = builder.build();
Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, false, Ballot.ZERO, executeAt, partialTxn.keys(), deps, partialTxn.kind());
@@ -150,7 +150,7 @@ public class AccordCommandTest
Commit commit = Commit.SerializerSupport.create(txnId, route, 1, executeAt, partialTxn, deps, fullRoute, null);
commandStore.execute(commit, commit::apply).get();
- commandStore.execute(PreLoadContext.contextFor(txnId, Collections.singleton(key)),instance -> {
+ commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key)), instance -> {
Command command = instance.command(txnId);
Assert.assertEquals(commit.executeAt, command.executeAt());
Assert.assertTrue(command.hasBeen(Status.Committed));
@@ -171,11 +171,11 @@ public class AccordCommandTest
TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1);
Txn txn = createTxn(2);
- Key key = txn.keys().get(0);
- RoutingKey homeKey = key.toRoutingKey();
- Route fullRoute = txn.keys().toRoute(homeKey);
- PartialRoute route = fullRoute.slice(fullRange(txn));
- PartialTxn partialTxn = txn.slice(route.covering, true);
+ Key key = (Key)txn.keys().get(0);
+ RoutingKey homeKey = key.toUnseekable();
+ FullRoute<?> fullRoute = txn.keys().toRoute(homeKey);
+ PartialRoute<?> route = fullRoute.slice(fullRange(txn));
+ PartialTxn partialTxn = txn.slice(route.covering(), true);
PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1, route, 1, 1, false, 1, partialTxn, fullRoute);
commandStore.execute(preAccept1, preAccept1::apply).get();
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index ad74eaa467..65b273fdb9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -47,13 +47,15 @@ import accord.local.PreLoadContext;
import accord.local.Status.Known;
import accord.primitives.AbstractKeys;
import accord.primitives.Ballot;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
+import accord.primitives.Ranges;
+import accord.primitives.Keys;
import accord.primitives.PartialTxn;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Range;
+import accord.primitives.Routables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import accord.primitives.Writes;
import accord.topology.Shard;
import accord.topology.Topology;
@@ -61,7 +63,7 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.api.AccordAgent;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.db.AccordData;
import org.apache.cassandra.service.accord.db.AccordRead;
import org.apache.cassandra.utils.FBUtilities;
@@ -87,9 +89,9 @@ public class AccordTestUtils
@Override public void executed(Command command, ProgressShard progressShard) {}
@Override public void invalidated(Command command, ProgressShard progressShard) {}
@Override public void durable(Command command, Set<Id> persistedOn) {}
- @Override public void durable(TxnId txnId, @Nullable RoutingKeys someKeys, ProgressShard shard) {}
+ @Override public void durable(TxnId txnId, @Nullable Unseekables<?, ?> someKeys, ProgressShard shard) {}
@Override public void durableLocal(TxnId txnId) {}
- @Override public void waiting(TxnId blockedBy, Known blockedUntil, RoutingKeys blockedOnKeys) {}
+ @Override public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn) {}
};
public static Topology simpleTopology(TableId... tables)
@@ -102,7 +104,7 @@ public class AccordTestUtils
Set<Id> fastPath = Sets.newHashSet(node);
for (int i=0; i<tables.length; i++)
{
- KeyRange range = TokenRange.fullRange(tables[i]);
+ Range range = TokenRange.fullRange(tables[i]);
shards[i] = new Shard(range, nodes, fastPath, Collections.emptySet());
}
@@ -156,7 +158,7 @@ public class AccordTestUtils
})
.reduce(null, AccordData::merge);
Write write = txn.update().apply(readData);
- ((AccordCommand)command).setWrites(new Writes(command.executeAt(), txn.keys(), write));
+ ((AccordCommand)command).setWrites(new Writes(command.executeAt(), (Keys)txn.keys(), write));
((AccordCommand)command).setResult(txn.query().compute(command.txnId(), readData, txn.read(), txn.update()));
}).get();
}
@@ -175,43 +177,43 @@ public class AccordTestUtils
return createTxn(key, key);
}
- public static KeyRanges fullRange(Txn txn)
+ public static Ranges fullRange(Txn txn)
{
- TableId tableId = ((AccordKey)txn.keys().get(0)).tableId();
- return KeyRanges.of(TokenRange.fullRange(tableId));
+ TableId tableId = ((PartitionKey)txn.keys().get(0)).tableId();
+ return Ranges.of(TokenRange.fullRange(tableId));
}
public static PartialTxn createPartialTxn(int key)
{
Txn txn = createTxn(key, key);
- KeyRanges ranges = fullRange(txn);
+ Ranges ranges = fullRange(txn);
return new PartialTxn.InMemory(ranges, txn.kind(), txn.keys(), txn.read(), txn.query(), txn.update());
}
private static class SingleEpochRanges implements CommandStore.RangesForEpoch
{
- private final KeyRanges ranges;
+ private final Ranges ranges;
- public SingleEpochRanges(KeyRanges ranges)
+ public SingleEpochRanges(Ranges ranges)
{
this.ranges = ranges;
}
@Override
- public KeyRanges at(long epoch)
+ public Ranges at(long epoch)
{
assert epoch == 1;
return ranges;
}
@Override
- public KeyRanges between(long fromInclusive, long toInclusive)
+ public Ranges between(long fromInclusive, long toInclusive)
{
return ranges;
}
@Override
- public KeyRanges since(long epoch)
+ public Ranges since(long epoch)
{
assert epoch == 1;
return ranges;
@@ -222,13 +224,6 @@ public class AccordTestUtils
{
return ranges.contains(key);
}
-
- @Override
- public boolean intersects(long epoch, AbstractKeys<?, ?> keys)
- {
- assert epoch == 1;
- return ranges.intersects(keys);
- }
}
public static InMemoryCommandStore.Synchronized createInMemoryCommandStore(LongSupplier now, String keyspace, String table)
@@ -249,7 +244,7 @@ public class AccordTestUtils
new AccordAgent(),
null,
cs -> null,
- new SingleEpochRanges(KeyRanges.of(range)));
+ new SingleEpochRanges(Ranges.of(range)));
}
public static AccordCommandStore createAccordCommandStore(Node.Id node, LongSupplier now, Topology topology)
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
index 4dd94847a9..a7a015a7c1 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -62,10 +62,10 @@ public class AccordTopologyTest
Token maxToken = partitioner.getMaximumToken();
// topology.forKey(new AccordKey.TokenKey(tableId, minToken.minKeyBound()));
- topology.forKey(new AccordKey.PartitionKey(tableId, new BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))));
+ topology.forKey(new PartitionKey(tableId, new BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))).toUnseekable());
// topology.forKey(new AccordKey.TokenKey(tableId, minToken.maxKeyBound()));
// topology.forKey(new AccordKey.TokenKey(tableId, maxToken.minKeyBound()));
- topology.forKey(new AccordKey.PartitionKey(tableId, new BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))));
+ topology.forKey(new PartitionKey(tableId, new BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))).toUnseekable());
// topology.forKey(new AccordKey.TokenKey(tableId, maxToken.maxKeyBound()));
}
}
diff --git a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
index 81f9b49a22..f520a6bb6c 100644
--- a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.accord.api.AccordKey.*;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.utils.ByteBufferUtil;
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 83f3d8058e..fbbb53ee31 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -41,8 +41,7 @@ import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordCommandsForKey;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordStateCache;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
@@ -90,7 +89,7 @@ public class AsyncLoaderTest
AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), singleton(key));
// everything is cached, so the loader should return immediately
- commandStore.processBlocking(() -> {
+ commandStore.executeBlocking(() -> {
boolean result = loader.load(context, (o, t) -> Assert.fail());
Assert.assertTrue(result);
});
@@ -124,7 +123,7 @@ public class AsyncLoaderTest
AsyncContext context = new AsyncContext();
AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), singleton(key));
AsyncPromise<Void> cbFired = new AsyncPromise<>();
- commandStore.processBlocking(() -> {
+ commandStore.executeBlocking(() -> {
boolean result = loader.load(context, (o, t) -> {
Assert.assertNull(t);
cbFired.setSuccess(null);
@@ -135,7 +134,7 @@ public class AsyncLoaderTest
cbFired.awaitUninterruptibly(1, TimeUnit.SECONDS);
// then return immediately after the callback has fired
- commandStore.processBlocking(() -> {
+ commandStore.executeBlocking(() -> {
boolean result = loader.load(context, (o, t) -> Assert.fail());
Assert.assertTrue(result);
});
@@ -166,7 +165,7 @@ public class AsyncLoaderTest
AsyncContext context = new AsyncContext();
AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), singleton(key));
AsyncPromise<Void> cbFired = new AsyncPromise<>();
- commandStore.processBlocking(() -> {
+ commandStore.executeBlocking(() -> {
boolean result = loader.load(context, (o, t) -> {
Assert.assertNull(t);
cbFired.setSuccess(null);
@@ -177,7 +176,7 @@ public class AsyncLoaderTest
cbFired.awaitUninterruptibly(1, TimeUnit.SECONDS);
// then return immediately after the callback has fired
- commandStore.processBlocking(() -> {
+ commandStore.executeBlocking(() -> {
boolean result = loader.load(context, (o, t) -> Assert.fail());
Assert.assertTrue(result);
});
@@ -212,7 +211,7 @@ public class AsyncLoaderTest
commandCache.setLoadFuture(command.txnId(), readFuture);
AsyncPromise<Void> cbFired = new AsyncPromise<>();
- commandStore.processBlocking(() -> {
+ commandStore.executeBlocking(() -> {
boolean result = loader.load(context, (o, t) -> {
Assert.assertNull(t);
cbFired.setSuccess(null);
@@ -226,7 +225,7 @@ public class AsyncLoaderTest
Assert.assertTrue(cbFired.isSuccess());
// then return immediately after the callback has fired
- commandStore.processBlocking(() -> {
+ commandStore.executeBlocking(() -> {
boolean result = loader.load(context, (o, t) -> Assert.fail());
Assert.assertTrue(result);
});
@@ -242,7 +241,7 @@ public class AsyncLoaderTest
TxnId blockApply = txnId(1, clock.incrementAndGet(), 0, 1);
TxnId blockCommit = txnId(1, clock.incrementAndGet(), 0, 1);
PartialTxn txn = createPartialTxn(0);
- AccordKey.PartitionKey key = (AccordKey.PartitionKey) getOnlyElement(txn.keys());
+ PartitionKey key = (PartitionKey) getOnlyElement(txn.keys());
AccordCommand command = new AccordCommand(txnId).initialize();
command.setPartialTxn(txn);
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index ebf272aac9..30edd5f8af 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -30,10 +30,10 @@ import org.junit.BeforeClass;
import org.junit.Test;
import accord.local.Command;
-import accord.local.CommandStore;
import accord.local.CommandsForKey;
import accord.local.SafeCommandStore;
import accord.local.Status;
+import accord.primitives.Keys;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -48,9 +48,8 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordCommand;
import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordKeyspace;
-import org.apache.cassandra.service.accord.AccordPartialCommand;
import org.apache.cassandra.service.accord.AccordStateCache;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.FBUtilities;
import static accord.local.PreLoadContext.contextFor;
@@ -92,7 +91,7 @@ public class AsyncOperationTest
AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
Txn txn = createTxn((int)clock.incrementAndGet());
- AccordKey.PartitionKey key = (AccordKey.PartitionKey) Iterables.getOnlyElement(txn.keys());
+ PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
commandStore.execute(contextFor(txnId), instance -> {
Command command = instance.ifPresent(txnId);
@@ -108,9 +107,9 @@ public class AsyncOperationTest
{
AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
Txn txn = createTxn((int)clock.incrementAndGet());
- AccordKey.PartitionKey key = (AccordKey.PartitionKey) Iterables.getOnlyElement(txn.keys());
+ PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
- commandStore.execute(contextFor(Collections.emptyList(), Collections.singleton(key)), instance -> {
+ commandStore.execute(contextFor(Collections.emptyList(), Keys.of(key)),instance -> {
CommandsForKey cfk = commandStore.maybeCommandsForKey(key);
Assert.assertNull(cfk);
}).get();
@@ -173,7 +172,7 @@ public class AsyncOperationTest
}
@Override
- AsyncLoader createAsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<AccordKey.PartitionKey> keys)
+ AsyncLoader createAsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys)
{
return new AsyncLoader(commandStore, txnIds, keys) {
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 37d1b5f764..9d2e1e5a2b 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
import accord.local.Command;
import accord.local.Status;
-import accord.primitives.KeyRanges;
+import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
@@ -39,7 +39,7 @@ import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordCommandsForKey;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordPartialCommand;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import static accord.local.PreLoadContext.contextFor;
import static com.google.common.collect.Iterables.getOnlyElement;
@@ -85,7 +85,7 @@ public class AsyncWriterTest
TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1);
TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1);
Txn txn = createTxn(0);
- KeyRanges ranges = fullRange(txn);
+ Ranges ranges = fullRange(txn);
AccordCommand blocking = new AccordCommand(blockingId).initialize();
blocking.setPartialTxn(txn.slice(ranges, true));
blocking.setExecuteAt(blockingId);
@@ -135,8 +135,8 @@ public class AsyncWriterTest
TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1);
Txn txn = createTxn(0);
- KeyRanges ranges = fullRange(txn);
- AccordKey.PartitionKey key = (AccordKey.PartitionKey) getOnlyElement(txn.keys());
+ Ranges ranges = fullRange(txn);
+ PartitionKey key = (PartitionKey) getOnlyElement(txn.keys());
AccordCommandsForKey cfk = new AccordCommandsForKey(commandStore, key).initialize();
AccordKeyspace.getCommandsForKeyMutation(commandStore, cfk, commandStore.nextSystemTimestampMicros()).apply();
@@ -200,7 +200,7 @@ public class AsyncWriterTest
TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1);
TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1);
Txn txn = createTxn(0);
- KeyRanges ranges = fullRange(txn);
+ Ranges ranges = fullRange(txn);
{
AccordCommand blocking = new AccordCommand(blockingId).initialize();
diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
index 2d034b5734..0997d234c0 100644
--- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
@@ -21,17 +21,15 @@ package org.apache.cassandra.service.accord.serializers;
import org.junit.BeforeClass;
import org.junit.Test;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
import accord.primitives.Txn;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.accord.AccordTxnBuilder;
import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.SerializerTestUtils;
import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -56,8 +54,8 @@ public class CommandSerializersTest
txnBuilder.withWrite("INSERT INTO ks.tbl (k, c, v) VALUES (0, 0, 1)");
txnBuilder.withCondition("ks", "tbl", 0, 0, NOT_EXISTS);
Txn txn = txnBuilder.build();
- TableId tableId = ((AccordKey.PartitionKey) txn.keys().get(0)).tableId();
- PartialTxn expected = txn.slice(KeyRanges.of(TokenRange.fullRange(tableId)), true);
+ TableId tableId = ((PartitionKey) txn.keys().get(0)).tableId();
+ PartialTxn expected = txn.slice(Ranges.of(TokenRange.fullRange(tableId)), true);
SerializerTestUtils.assertSerializerIOEquality(expected, CommandSerializers.partialTxn);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org