You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/12/02 22:49:33 UTC

[cassandra] branch cep-15-accord updated: CEP-15: Routables - Integrate accord-core changes for CASSANDRA-18087

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new aef9979502 CEP-15: Routables   - Integrate accord-core changes for CASSANDRA-18087
aef9979502 is described below

commit aef9979502347ad0784749dafe2a8b9279e6a0fc
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Sat Oct 22 00:05:54 2022 +0100

    CEP-15: Routables
      - Integrate accord-core changes for CASSANDRA-18087
    
    patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18087
---
 .build/include-accord.sh                           |   4 +-
 .../cassandra/db/SinglePartitionReadCommand.java   |   2 -
 .../cassandra/service/accord/AccordCommand.java    |  25 +-
 .../service/accord/AccordCommandStore.java         | 149 ++++++++--
 .../service/accord/AccordCommandsForKey.java       |   2 +-
 .../cassandra/service/accord/AccordKeyspace.java   |  13 +-
 .../service/accord/AccordObjectSizes.java          |  98 +++++--
 .../cassandra/service/accord/AccordTxnBuilder.java |  16 +-
 .../cassandra/service/accord/ListenerProxy.java    |  18 +-
 .../cassandra/service/accord/TokenRange.java       |   5 +-
 .../cassandra/service/accord/api/AccordAgent.java  |   9 +-
 .../cassandra/service/accord/api/AccordKey.java    | 217 ---------------
 .../service/accord/api/AccordRoutableKey.java      |  80 ++++++
 .../service/accord/api/AccordRoutingKey.java       | 129 ++-------
 .../cassandra/service/accord/api/PartitionKey.java | 177 ++++++++++++
 .../service/accord/async/AsyncContext.java         |   2 +-
 .../service/accord/async/AsyncLoader.java          |   2 +-
 .../service/accord/async/AsyncOperation.java       |  16 +-
 .../service/accord/async/AsyncWriter.java          |  10 +-
 .../service/accord/db/AbstractKeyIndexed.java      |  14 +-
 .../cassandra/service/accord/db/AccordData.java    |   6 +-
 .../cassandra/service/accord/db/AccordRead.java    |  10 +-
 .../cassandra/service/accord/db/AccordUpdate.java  |   9 +-
 .../cassandra/service/accord/db/AccordWrite.java   |   5 +-
 .../accord/serializers/AcceptSerializers.java      |   8 +-
 .../serializers/BeginInvalidationSerializers.java  |  15 +-
 .../accord/serializers/CheckStatusSerializers.java |  19 +-
 .../accord/serializers/CommandSerializers.java     |  12 +-
 .../accord/serializers/CommitSerializers.java      |  19 +-
 .../service/accord/serializers/DepsSerializer.java |   4 +-
 .../accord/serializers/GetDepsSerializers.java     |  10 +-
 .../service/accord/serializers/KeySerializers.java | 307 ++++++++++++++++-----
 .../accord/serializers/PreacceptSerializers.java   |  12 +-
 .../accord/serializers/ReadDataSerializers.java    |   7 +-
 .../accord/serializers/RecoverySerializers.java    |  10 +-
 .../accord/serializers/TxnRequestSerializer.java   |   8 +-
 .../accord/serializers/WaitOnCommitSerializer.java |   9 +-
 .../test/accord/AccordIntegrationTest.java         |   6 +-
 .../service/accord/AccordCommandStoreTest.java     |   8 +-
 .../service/accord/AccordCommandTest.java          |  30 +-
 .../cassandra/service/accord/AccordTestUtils.java  |  45 ++-
 .../service/accord/AccordTopologyTest.java         |   6 +-
 .../service/accord/api/AccordKeyTest.java          |   1 -
 .../service/accord/async/AsyncLoaderTest.java      |  19 +-
 .../service/accord/async/AsyncOperationTest.java   |  13 +-
 .../service/accord/async/AsyncWriterTest.java      |  12 +-
 .../accord/serializers/CommandSerializersTest.java |  10 +-
 47 files changed, 946 insertions(+), 662 deletions(-)

diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index ea1e0544df..0dc01bf2ba 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -24,8 +24,8 @@ set -o nounset
 
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
-accord_repo='https://github.com/belliottsmith/cassandra-accord.git'
-accord_branch='bugfix-invalidation'
+accord_repo='https://github.com/apache/cassandra-accord.git'
+accord_branch='trunk'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 96af39708a..b9e595dcfa 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -48,10 +48,8 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.*;
-import org.apache.cassandra.service.accord.api.AccordKey;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.btree.BTreeSet;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 45885f657d..878e4209f9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -1,3 +1,4 @@
+/*
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -32,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Data;
-import accord.api.Key;
 import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.local.Command;
@@ -45,18 +45,18 @@ import accord.local.SaveStatus;
 import accord.local.Status;
 import accord.local.Status.Durability;
 import accord.local.Status.Known;
-import accord.primitives.AbstractRoute;
 import accord.primitives.Ballot;
-import accord.primitives.KeyRanges;
-import accord.primitives.Keys;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Route;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.DeterministicIdentitySet;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.db.AccordData;
 import org.apache.cassandra.service.accord.store.StoredNavigableMap;
@@ -114,7 +114,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
 
     private final TxnId txnId;
     private final int instanceCount = INSTANCE_COUNTER.getAndIncrement();
-    public final StoredValue<AbstractRoute> route;
+    public final StoredValue<Route<?>> route;
     public final StoredValue<RoutingKey> homeKey;
     public final StoredValue<RoutingKey> progressKey;
     public final StoredValue<PartialTxn> partialTxn;
@@ -475,13 +475,13 @@ public class AccordCommand extends Command implements AccordState<TxnId>
     }
 
     @Override
-    public AbstractRoute route()
+    public Route<?> route()
     {
         return route.get();
     }
 
     @Override
-    protected void setRoute(AbstractRoute newRoute)
+    protected void setRoute(Route<?> newRoute)
     {
         route.set(newRoute);
     }
@@ -632,16 +632,17 @@ public class AccordCommand extends Command implements AccordState<TxnId>
 
     private boolean canApplyWithCurrentScope(SafeCommandStore safeStore)
     {
-        KeyRanges ranges = safeStore.ranges().at(executeAt().epoch);
-        Keys keys = partialTxn().keys();
+        Ranges ranges = safeStore.ranges().at(executeAt().epoch);
+        Seekables<?, ?> keys = partialTxn().keys();
         for (int i=0,mi=keys.size(); i<mi; i++)
         {
-            Key key = keys.get(i);
-            if (((AccordCommandStore)safeStore).isCommandsForKeyInContext((AccordKey.PartitionKey) key))
+            PartitionKey key = (PartitionKey) keys.get(i);
+            if (((AccordCommandStore)safeStore).isCommandsForKeyInContext(key))
                 continue;
 
             if (!safeStore.commandStore().hashIntersects(key))
                 continue;
+
             if (!ranges.contains(key))
                 continue;
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index c7348a149f..068b08eff8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -40,9 +41,14 @@ import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.Routables;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
+import accord.primitives.AbstractKeys;
 import accord.primitives.TxnId;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.async.AsyncOperation;
 import org.apache.cassandra.utils.Clock;
@@ -254,6 +260,60 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
         });
     }
 
+    public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+    {
+        switch (keysOrRanges.kindOfContents()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                // TODO: efficiency
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+                return keys.stream()
+                           .filter(slice::contains)
+                           .filter(this::hashIntersects)
+                           .map(this::commandsForKey)
+                           .map(map)
+                           .reduce(initialValue, reduce);
+            case Range:
+                // TODO:
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach)
+    {
+        switch (keysOrRanges.kindOfContents()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+                keys.forEach(slice, key -> {
+                    if (hashIntersects(key))
+                        forEach.accept(commandsForKey(key));
+                });
+                break;
+            case Range:
+                // TODO:
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    public void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach)
+    {
+        switch (keyOrRange.kind())
+        {
+            default: throw new AssertionError();
+            case Key:
+                Key key = (Key) keyOrRange;
+                if (slice.contains(key))
+                    forEach.accept(commandsForKey(key));
+                break;
+            case Range:
+                // TODO:
+                throw new UnsupportedOperationException();
+        }
+    }
+
     @Override
     public CommandStore commandStore()
     {
@@ -266,22 +326,6 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
         return dataStore;
     }
 
-    public void processBlocking(Runnable runnable)
-    {
-        try
-        {
-            executor.submit(runnable).get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new UncheckedInterruptedException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e.getCause());
-        }
-    }
-
     @Override
     public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
     {
@@ -315,7 +359,7 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
     }
 
     @Override
-    public Timestamp preaccept(TxnId txnId, Keys keys)
+    public Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys)
     {
         Timestamp max = maxConflict(keys);
         long epoch = latestEpoch();
@@ -331,10 +375,11 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
         return time;
     }
 
-    public Timestamp maxConflict(Keys keys)
+    public Timestamp maxConflict(Seekables<?, ?> keys)
     {
+        // TODO: Seekables
         // TODO: efficiency
-        return keys.stream()
+        return ((Keys)keys).stream()
                    .map(this::maybeCommandsForKey)
                    .filter(Objects::nonNull)
                    .map(CommandsForKey::max)
@@ -350,6 +395,70 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
         return operation;
     }
 
+    public void executeBlocking(Runnable runnable)
+    {
+        try
+        {
+            executor.submit(runnable).get();
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) {
+        switch (keysOrRanges.kindOfContents()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+                return keys.stream()
+                           .filter(this::hashIntersects)
+                           .map(this::commandsForKey)
+                           .map(map)
+                           .reduce(initialValue, reduce);
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    public void forEach(Routables<?, ?> keysOrRanges, Consumer<CommandsForKey> forEach)
+    {
+        switch (keysOrRanges.kindOfContents()) {
+            default:
+                throw new AssertionError();
+            case Key:
+                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+                keys.forEach(key -> {
+                    if (hashIntersects(key))
+                        forEach.accept(commandsForKey(key));
+                });
+                break;
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
+    }
+
+    public void forEach(Routable keyOrRange, Consumer<CommandsForKey> forEach)
+    {
+        switch (keyOrRange.kind())
+        {
+            default: throw new AssertionError();
+            case Key:
+                forEach.accept(commandsForKey((Key) keyOrRange));
+                break;
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
+    }
 
     @Override
     public void shutdown()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index 49941dbfb3..8a1715384e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -40,7 +40,7 @@ import accord.local.CommandsForKey;
 import accord.local.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.store.StoredLong;
 import org.apache.cassandra.service.accord.store.StoredNavigableMap;
 import org.apache.cassandra.service.accord.store.StoredSet;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 8677dd8fea..3313567ee2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -41,10 +41,10 @@ import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.SaveStatus;
 import accord.local.Status;
-import accord.primitives.AbstractRoute;
 import accord.primitives.Ballot;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
+import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -97,8 +97,7 @@ import org.apache.cassandra.schema.Types;
 import org.apache.cassandra.schema.Views;
 import org.apache.cassandra.serializers.UUIDSerializer;
 import org.apache.cassandra.service.accord.AccordCommandsForKey.SeriesKind;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.service.accord.db.AccordData;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
@@ -161,7 +160,7 @@ public class AccordKeyspace
     // TODO: naming is not very clearly distinct from the base serializers
     private static class CommandsSerializers
     {
-        static final LocalVersionedSerializer<AbstractRoute> abstractRoute = localSerializer(KeySerializers.abstractRoute);
+        static final LocalVersionedSerializer<Route<?>> route = localSerializer(KeySerializers.route);
         static final LocalVersionedSerializer<AccordRoutingKey> routingKey = localSerializer(AccordRoutingKey.serializer);
         static final LocalVersionedSerializer<PartialTxn> partialTxn = localSerializer(CommandSerializers.partialTxn);
         static final LocalVersionedSerializer<PartialDeps> partialDeps = localSerializer(DepsSerializer.partialDeps);
@@ -442,7 +441,7 @@ public class AccordKeyspace
                 builder.addCell(live(CommandsColumns.progress_key, timestampMicros, serializeOrNull((AccordRoutingKey) command.progressKey.get(), CommandsSerializers.routingKey)));
 
             if (command.route.hasModifications())
-                builder.addCell(live(CommandsColumns.route, timestampMicros, serializeOrNull(command.route.get(), CommandsSerializers.abstractRoute)));
+                builder.addCell(live(CommandsColumns.route, timestampMicros, serializeOrNull(command.route.get(), CommandsSerializers.route)));
 
             if (command.durability.hasModifications())
                 builder.addCell(live(CommandsColumns.durability, timestampMicros, accessor.valueOf(command.durability.get().ordinal())));
@@ -450,7 +449,7 @@ public class AccordKeyspace
             if (command.partialTxn.hasModifications())
                 builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.partialTxn.get(), CommandsSerializers.partialTxn)));
 
-            if (command.kind.hasModifications())
+            if (command.kind.hasModifications() && command.kind.get() != null) // initialize sets hasModification(), and don't want to persist null
                 builder.addCell(live(CommandsColumns.kind, timestampMicros, accessor.valueOf(command.kind.get().ordinal())));
 
             if (command.executeAt.hasModifications())
@@ -595,7 +594,7 @@ public class AccordKeyspace
             command.status.load(SaveStatus.values()[row.getInt("status")]);
             command.homeKey.load(deserializeOrNull(row.getBlob("home_key"), CommandsSerializers.routingKey));
             command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"), CommandsSerializers.routingKey));
-            command.route.load(deserializeOrNull(row.getBlob("route"), CommandsSerializers.abstractRoute));
+            command.route.load(deserializeOrNull(row.getBlob("route"), CommandsSerializers.route));
             // TODO: something less brittle than ordinal, more efficient than values()
             command.durability.load(Status.Durability.values()[row.getInt("durability", 0)]);
             command.partialTxn.load(deserializeOrNull(row.getBlob("txn"), CommandsSerializers.partialTxn));
diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 85da64ae8c..1f5263ede6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -23,20 +23,25 @@ import java.util.Map;
 import accord.api.Key;
 import accord.api.RoutingKey;
 import accord.local.Node;
-import accord.primitives.AbstractRoute;
+import accord.primitives.AbstractKeys;
+import accord.primitives.AbstractRanges;
 import accord.primitives.Deps;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.FullRangeRoute;
 import accord.primitives.Keys;
-import accord.primitives.PartialRoute;
+import accord.primitives.PartialKeyRoute;
+import accord.primitives.PartialRangeRoute;
 import accord.primitives.PartialTxn;
-import accord.primitives.Route;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
 import accord.primitives.RoutingKeys;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import accord.primitives.Writes;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.service.accord.db.AccordQuery;
@@ -49,7 +54,7 @@ public class AccordObjectSizes
 {
     public static long key(Key key)
     {
-        return ((AccordKey.PartitionKey) key).estimatedSizeOnHeap();
+        return ((PartitionKey) key).estimatedSizeOnHeap();
     }
 
     public static long key(RoutingKey key)
@@ -58,13 +63,13 @@ public class AccordObjectSizes
     }
 
     private static final long EMPTY_KEY_RANGE_SIZE = ObjectSizes.measure(TokenRange.fullRange(TableId.generate()));
-    public static long range(KeyRange range)
+    public static long range(Range range)
     {
         return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end());
     }
 
-    private static final long EMPTY_KEY_RANGES_SIZE = ObjectSizes.measure(KeyRanges.of());
-    public static long ranges(KeyRanges ranges)
+    private static final long EMPTY_KEY_RANGES_SIZE = ObjectSizes.measure(Ranges.of());
+    public static long ranges(Ranges ranges)
     {
         long size = EMPTY_KEY_RANGES_SIZE;
         size += ObjectSizes.sizeOfReferenceArray(ranges.size());
@@ -84,7 +89,17 @@ public class AccordObjectSizes
         return size;
     }
 
-    private static long routingKeysOnly(RoutingKeys keys)
+    public static long seekables(Seekables<?, ?> seekables)
+    {
+        switch (seekables.kindOfContents())
+        {
+            default: throw new AssertionError();
+            case Key: return keys((Keys) seekables);
+            case Range: return ranges((Ranges) seekables);
+        }
+    }
+
+    private static long routingKeysOnly(AbstractKeys<RoutingKey, ?> keys)
     {
         // TODO: many routing keys are fixed size, can compute by multiplication
         long size = ObjectSizes.sizeOfReferenceArray(keys.size());
@@ -96,37 +111,70 @@ public class AccordObjectSizes
     private static final long EMPTY_ROUTING_KEYS_SIZE = ObjectSizes.measure(RoutingKeys.of());
     public static long routingKeys(RoutingKeys keys)
     {
-        return routingKeysOnly(keys) + EMPTY_ROUTING_KEYS_SIZE;
+        return EMPTY_ROUTING_KEYS_SIZE + routingKeysOnly(keys);
     }
 
-    private static final long EMPTY_ROUTE_SIZE = ObjectSizes.measure(new Route(new TokenKey(null, null), new RoutingKey[0]));
-    public static long route(Route route)
+    private static final long EMPTY_FULL_KEY_ROUTE_SIZE = ObjectSizes.measure(new FullKeyRoute(new TokenKey(null, null), new RoutingKey[0]));
+    public static long fullKeyRoute(FullKeyRoute route)
     {
-        return EMPTY_ROUTE_SIZE
+        return EMPTY_FULL_KEY_ROUTE_SIZE
                + routingKeysOnly(route)
-               + key(route.homeKey); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error
+               + key(route.homeKey()); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error
     }
 
-    private static final long EMPTY_PARTIAL_ROUTE_KEYS_SIZE = ObjectSizes.measure(new PartialRoute(KeyRanges.EMPTY, new TokenKey(null, null), new RoutingKey[0]));
-    public static long route(PartialRoute route)
+    private static final long EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE = ObjectSizes.measure(new PartialKeyRoute(Ranges.EMPTY, new TokenKey(null, null), new RoutingKey[0]));
+    public static long partialKeyRoute(PartialKeyRoute route)
     {
-        return EMPTY_PARTIAL_ROUTE_KEYS_SIZE
+        return EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE
                + routingKeysOnly(route)
-               + ranges(route.covering)
-               + key(route.homeKey);
+               + ranges(route.covering())
+               + key(route.homeKey());
+    }
+
+    private static long rangesOnly(AbstractRanges<?> ranges)
+    {
+        long size = ObjectSizes.sizeOfReferenceArray(ranges.size());
+        for (int i=0, mi=ranges.size(); i<mi; i++)
+            size += range(ranges.get(i));
+        return size;
     }
 
-    public static long route(AbstractRoute route)
+    private static final long EMPTY_FULL_RANGE_ROUTE_SIZE = ObjectSizes.measure(new FullRangeRoute(new TokenKey(null, null), new Range[0]));
+    public static long fullRangeRoute(FullRangeRoute route)
     {
-        if (route instanceof Route) return route((Route) route);
-        else return route((PartialRoute) route);
+        return EMPTY_FULL_RANGE_ROUTE_SIZE
+               + rangesOnly(route)
+               + key(route.homeKey()); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error
+    }
+
+    private static final long EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE = ObjectSizes.measure(new PartialRangeRoute(Ranges.EMPTY, new TokenKey(null, null), new Range[0]));
+    public static long partialRangeRoute(PartialRangeRoute route)
+    {
+        return EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE
+               + rangesOnly(route)
+               + ranges(route.covering())
+               + key(route.homeKey());
+    }
+
+    public static long route(Unseekables<?, ?> unseekables)
+    {
+        switch (unseekables.kind())
+        {
+            default: throw new AssertionError();
+            case RoutingKeys: return routingKeys((RoutingKeys) unseekables);
+            case PartialKeyRoute: return partialKeyRoute((PartialKeyRoute) unseekables);
+            case FullKeyRoute: return fullKeyRoute((FullKeyRoute) unseekables);
+            case RoutingRanges: return ranges((Ranges) unseekables);
+            case PartialRangeRoute: return partialRangeRoute((PartialRangeRoute) unseekables);
+            case FullRangeRoute: return fullRangeRoute((FullRangeRoute) unseekables);
+        }
     }
 
     private static final long EMPTY_TXN = ObjectSizes.measure(new PartialTxn.InMemory(null, null, null, null, null, null));
     public static long txn(PartialTxn txn)
     {
         long size = EMPTY_TXN;
-        size += keys(txn.keys());
+        size += seekables(txn.keys());
         size += ((AccordRead) txn.read()).estimatedSizeOnHeap();
         if (txn.update() != null)
             size += ((AccordUpdate) txn.update()).estimatedSizeOnHeap();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java b/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java
index f32940345e..680a8ba52b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTxnBuilder.java
@@ -51,8 +51,8 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.api.AccordRoutableKey;
 import org.apache.cassandra.service.accord.db.AccordQuery;
 import org.apache.cassandra.service.accord.db.AccordRead;
 import org.apache.cassandra.service.accord.db.AccordUpdate;
@@ -61,7 +61,7 @@ import org.apache.cassandra.service.accord.db.AccordUpdate.UpdatePredicate.Type;
 
 public class AccordTxnBuilder
 {
-    private Set<AccordKey.PartitionKey> keys = new HashSet<>();
+    private Set<PartitionKey> keys = new HashSet<>();
     private List<SinglePartitionReadCommand> reads = new ArrayList<>();
     private AccordQuery query = AccordQuery.ALL;
     private List<AccordUpdate.AbstractUpdate> updates = new ArrayList<>();
@@ -77,7 +77,7 @@ public class AccordTxnBuilder
         SinglePartitionReadQuery.Group<SinglePartitionReadCommand> selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) readQuery;
         for (SinglePartitionReadCommand command : selectQuery.queries)
         {
-            keys.add(AccordKey.of(command));
+            keys.add(PartitionKey.of(command));
             reads.add(command);
         }
         return this;
@@ -95,7 +95,7 @@ public class AccordTxnBuilder
         {
             for (PartitionUpdate update : mutation.getPartitionUpdates())
             {
-                keys.add(new AccordKey.PartitionKey(update.metadata().id, update.partitionKey()));
+                keys.add(new PartitionKey(update.metadata().id, update.partitionKey()));
                 updates.add(new AccordUpdate.SimpleUpdate(update));
             }
         }
@@ -108,7 +108,7 @@ public class AccordTxnBuilder
         Preconditions.checkNotNull(metadata);
 
         DecoratedKey partitionKey = metadata.partitioner.decorateKey(decompose(metadata.partitionKeyType, key));
-        AccordKey.PartitionKey accordKey = new AccordKey.PartitionKey(metadata.id, partitionKey);
+        PartitionKey accordKey = new PartitionKey(metadata.id, partitionKey);
 
         keys.add(accordKey);
         updates.add(new AccordUpdate.AppendingUpdate(accordKey, appends));
@@ -129,7 +129,7 @@ public class AccordTxnBuilder
         Preconditions.checkNotNull(metadata);
 
         DecoratedKey partitionKey = metadata.partitioner.decorateKey(decompose(metadata.partitionKeyType, key));
-        AccordKey.PartitionKey accordKey = new AccordKey.PartitionKey(metadata.id, partitionKey);
+        PartitionKey accordKey = new PartitionKey(metadata.id, partitionKey);
 
         keys.add(accordKey);
         updates.add(new AccordUpdate.IncrementingUpdate(accordKey, increments));
@@ -185,7 +185,7 @@ public class AccordTxnBuilder
     public Txn build()
     {
         Key[] keyArray = keys.toArray(new Key[0]);
-        Arrays.sort(keyArray, AccordRoutingKey::compareKeys);
+        Arrays.sort(keyArray, Key::compareTo);
         predicates.sort(Comparator.comparing(UpdatePredicate::partitionKey));
         if (updates.isEmpty())
         {
diff --git a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
index e0397e2651..bd9061d350 100644
--- a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
+++ b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.Objects;
 
 import com.google.common.collect.ImmutableList;
@@ -31,10 +30,11 @@ import accord.local.Command;
 import accord.local.CommandListener;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
+import accord.primitives.Keys;
 import accord.primitives.TxnId;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -130,7 +130,7 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
             AccordCommand command = (AccordCommand) c;
             AccordCommandStore commandStore = (AccordCommandStore) safeStore;
             AsyncContext context = commandStore.getContext();
-            PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId(), txnId), Collections.emptyList());
+            PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId(), txnId), Keys.EMPTY);
             if (context.containsScopedItems(loadCtx))
             {
                 // TODO (soon): determine if this can break anything by not waiting for the current operation to denormalize it's data
@@ -163,9 +163,9 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
     static class CommandsForKeyListenerProxy extends ListenerProxy
     {
         private static final long EMPTY_SIZE = ObjectSizes.measure(new CommandsForKeyListenerProxy(null));
-        private final AccordKey.PartitionKey key;
+        private final PartitionKey key;
 
-        public CommandsForKeyListenerProxy(AccordKey.PartitionKey key)
+        public CommandsForKeyListenerProxy(PartitionKey key)
         {
             this.key = key;
         }
@@ -218,9 +218,9 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
         @Override
         public ByteBuffer identifier()
         {
-            ByteBuffer bytes = ByteBuffer.allocate((int) (1 + AccordKey.PartitionKey.serializer.serializedSize(key)));
+            ByteBuffer bytes = ByteBuffer.allocate((int) (1 + PartitionKey.serializer.serializedSize(key)));
             ByteBufferAccessor.instance.putByte(bytes, 0, (byte) kind().ordinal());
-            AccordKey.PartitionKey.serializer.serialize(key, bytes, ByteBufferAccessor.instance, 1);
+            PartitionKey.serializer.serialize(key, bytes, ByteBufferAccessor.instance, 1);
             return bytes;
         }
 
@@ -230,7 +230,7 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
             AccordCommand command = (AccordCommand) c;
             AccordCommandStore commandStore = (AccordCommandStore) safeStore;
             AsyncContext context = commandStore.getContext();
-            PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId()), ImmutableList.of(key));
+            PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId()), Keys.of(key));
             if (context.containsScopedItems(loadCtx))
             {
                 logger.trace("{}: synchronously updating listening cfk {}", c.txnId(), key);
@@ -265,7 +265,7 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste
                 TxnId txnId = CommandSerializers.txnId.deserialize(src, accessor, offset);
                 return new CommandListenerProxy(txnId);
             case COMMANDS_FOR_KEY:
-                AccordKey.PartitionKey key = AccordKey.PartitionKey.serializer.deserialize(src, accessor, offset);
+                PartitionKey key = PartitionKey.serializer.deserialize(src, accessor, offset);
                 return new CommandsForKeyListenerProxy(key);
             default:
                 throw new IOException("Unknown kind ordinal " + ordinal);
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 020197d995..22683d4b8e 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -21,16 +21,15 @@ package org.apache.cassandra.service.accord;
 import java.io.IOException;
 
 import accord.api.RoutingKey;
-import accord.primitives.KeyRange;
+import accord.primitives.Range;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.accord.api.AccordKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
 
-public class TokenRange extends KeyRange.EndInclusive
+public class TokenRange extends Range.EndInclusive
 {
     public TokenRange(AccordRoutingKey start, AccordRoutingKey end)
     {
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 6375bdc7b9..f8a8832a5d 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service.accord.api;
 
+import java.util.concurrent.TimeUnit;
+
 import accord.api.Agent;
 import accord.api.Result;
 import accord.local.Command;
@@ -25,6 +27,9 @@ import accord.local.Node;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static org.apache.cassandra.config.DatabaseDescriptor.getReadRpcTimeout;
+
 public class AccordAgent implements Agent
 {
     @Override
@@ -54,7 +59,7 @@ public class AccordAgent implements Agent
     @Override
     public boolean isExpired(TxnId initiated, long now)
     {
-        // TODO: this
-        return false;
+        // TODO: should distinguish between reads and writes
+        return now - initiated.real > getReadRpcTimeout(MICROSECONDS);
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordKey.java
deleted file mode 100644
index 05a47fb443..0000000000
--- a/src/java/org/apache/cassandra/service/accord/api/AccordKey.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord.api;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Preconditions;
-
-import accord.api.Key;
-import accord.api.RoutingKey;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.PartitionPosition;
-import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.db.marshal.ValueAccessor;
-import org.apache.cassandra.db.partitions.Partition;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.ObjectSizes;
-
-public interface AccordKey extends Key
-{
-    TableId tableId();
-    PartitionPosition partitionKey();
-
-    static AccordKey of(Key key)
-    {
-        return (AccordKey) key;
-    }
-
-    static PartitionKey of(Partition partition)
-    {
-        return new PartitionKey(partition.metadata().id, partition.partitionKey());
-    }
-
-    static PartitionKey of(SinglePartitionReadCommand command)
-    {
-        return new PartitionKey(command.metadata().id, command.partitionKey());
-    }
-
-    abstract class AbstractKey<T extends PartitionPosition> extends AccordRoutingKey.AbstractRoutingKey implements AccordKey
-    {
-        private final T key;
-
-        public AbstractKey(TableId tableId, T key)
-        {
-            super(tableId);
-            this.key = key;
-        }
-
-        @Override
-        public Token token()
-        {
-            return partitionKey().getToken();
-        }
-
-        @Override
-        public Kind kind()
-        {
-            return Kind.TOKEN;
-        }
-
-        @Override
-        public T partitionKey()
-        {
-            return key;
-        }
-
-        @Override
-        public RoutingKey toRoutingKey()
-        {
-            return this;
-        }
-    }
-
-    class PartitionKey extends AbstractKey<DecoratedKey>
-    {
-        private static final long EMPTY_SIZE;
-
-        static
-        {
-            DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-            EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, key));
-        }
-
-        public PartitionKey(TableId tableId, DecoratedKey key)
-        {
-            super(tableId, key);
-        }
-
-        @Override
-        public String toString()
-        {
-            return "PartitionKey{" +
-                   "tableId=" + tableId() +
-                   ", key=" + partitionKey() +
-                   '}';
-        }
-
-        @Override
-        public RoutingKey toRoutingKey()
-        {
-            return new TokenKey(tableId(), token());
-        }
-
-        public long estimatedSizeOnHeap()
-        {
-            return EMPTY_SIZE + ByteBufferAccessor.instance.size(partitionKey().getKey());
-        }
-
-        public static final Serializer serializer = new Serializer();
-        public static class Serializer implements IVersionedSerializer<PartitionKey>
-        {
-            // TODO: add vint to value accessor and use vints
-            private Serializer() {}
-
-            @Override
-            public void serialize(PartitionKey key, DataOutputPlus out, int version) throws IOException
-            {
-                key.tableId().serialize(out);
-                ByteBufferUtil.writeWithShortLength(key.partitionKey().getKey(), out);
-            }
-
-            public <V> int serialize(PartitionKey key, V dst, ValueAccessor<V> accessor, int offset)
-            {
-                int position = offset;
-                position += key.tableId().serialize(dst, accessor, position);
-                ByteBuffer bytes = key.partitionKey().getKey();
-                int numBytes = ByteBufferAccessor.instance.size(bytes);
-                Preconditions.checkState(numBytes <= Short.MAX_VALUE);
-                position += accessor.putShort(dst, position, (short) numBytes);
-                position += accessor.copyByteBufferTo(bytes, 0, dst, position, numBytes);
-                return position - offset;
-
-            }
-
-            @Override
-            public PartitionKey deserialize(DataInputPlus in, int version) throws IOException
-            {
-                TableId tableId = TableId.deserialize(in);
-                TableMetadata metadata = Schema.instance.getExistingTableMetadata(tableId);
-                DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
-                return new PartitionKey(tableId, key);
-            }
-
-            public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, int offset) throws IOException
-            {
-                TableId tableId = TableId.deserialize(src, accessor, offset);
-                offset += TableId.serializedSize();
-                TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
-                int numBytes = accessor.getShort(src, offset);
-                offset += TypeSizes.SHORT_SIZE;
-                ByteBuffer bytes = ByteBuffer.allocate(numBytes);
-                accessor.copyTo(src, offset, bytes, ByteBufferAccessor.instance, 0, numBytes);
-                DecoratedKey key = metadata.partitioner.decorateKey(bytes);
-                return new PartitionKey(tableId, key);
-            }
-
-            @Override
-            public long serializedSize(PartitionKey key, int version)
-            {
-                return serializedSize(key);
-            }
-
-            public long serializedSize(PartitionKey key)
-            {
-                return key.tableId().serializedSize() + ByteBufferUtil.serializedSizeWithShortLength(key.partitionKey().getKey());
-            }
-        }
-    }
-
-    IVersionedSerializer<AccordKey> serializer = new IVersionedSerializer<AccordKey>()
-    {
-        @Override
-        public void serialize(AccordKey key, DataOutputPlus out, int version) throws IOException
-        {
-            PartitionKey.serializer.serialize((PartitionKey) key, out, version);
-        }
-
-        @Override
-        public AccordKey deserialize(DataInputPlus in, int version) throws IOException
-        {
-            return PartitionKey.serializer.deserialize(in, version);
-        }
-
-        @Override
-        public long serializedSize(AccordKey key, int version)
-        {
-            return PartitionKey.serializer.serializedSize((PartitionKey) key, version);
-        }
-    };
-}
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
new file mode 100644
index 0000000000..f4066e9c1c
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.api;
+
+import java.util.Objects;
+
+import accord.primitives.RoutableKey;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.TableId;
+
+public abstract class AccordRoutableKey implements RoutableKey
+{
+    final TableId tableId;
+
+    protected AccordRoutableKey(TableId tableId)
+    {
+        this.tableId = tableId;
+    }
+
+    public final TableId tableId() { return tableId; }
+    public abstract Token token();
+
+    @Override
+    public final int routingHash()
+    {
+        return token().tokenHash();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(tableId, routingHash());
+    }
+
+    public final int compareTo(RoutableKey that)
+    {
+        return compareTo((AccordRoutableKey) that);
+    }
+
+    public final int compareTo(AccordRoutableKey that)
+    {
+        int cmp = this.tableId().compareTo(that.tableId());
+        if (cmp != 0)
+            return cmp;
+
+        if (this instanceof AccordRoutingKey.SentinelKey || that instanceof AccordRoutingKey.SentinelKey)
+        {
+            int leftInt = this instanceof AccordRoutingKey.SentinelKey ? ((AccordRoutingKey.SentinelKey) this).asInt() : 0;
+            int rightInt = that instanceof AccordRoutingKey.SentinelKey ? ((AccordRoutingKey.SentinelKey) that).asInt() : 0;
+            return Integer.compare(leftInt, rightInt);
+        }
+
+        return this.token().compareTo(that.token());
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AccordRoutableKey that = (AccordRoutableKey) o;
+        return compareTo(that) == 0;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 543e497c07..ed9924395d 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -35,77 +35,38 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
-public interface AccordRoutingKey extends RoutingKey
+public abstract class AccordRoutingKey extends AccordRoutableKey implements RoutingKey
 {
-    enum Kind
+    enum RoutingKeyKind
     {
         TOKEN, SENTINEL;
     }
 
-    TableId tableId();
-    Token token();
-    Kind kind();
-    long estimatedSizeOnHeap();
-
-    static AccordRoutingKey of(Key key)
+    protected AccordRoutingKey(TableId tableId)
     {
-        return (AccordRoutingKey) key;
+        super(tableId);
     }
 
-    static int compare(AccordRoutingKey left, AccordRoutingKey right)
-    {
-        int cmp = left.tableId().compareTo(right.tableId());
-        if (cmp != 0)
-            return cmp;
-
-        if (left instanceof SentinelKey || right instanceof SentinelKey)
-        {
-            int leftInt = left instanceof SentinelKey ? ((SentinelKey) left).asInt() : 0;
-            int rightInt = right instanceof SentinelKey ? ((SentinelKey) right).asInt() : 0;
-            return Integer.compare(leftInt, rightInt);
-        }
-
-        return left.token().compareTo(right.token());
-    }
-
-    static int compareKeys(Key left, Key right)
-    {
-        return compare((AccordRoutingKey) left, (AccordRoutingKey) right);
-    }
-
-    default int compareTo(AccordRoutingKey that)
-    {
-        return compare(this, that);
-    }
+    public abstract RoutingKeyKind kindOfRoutingKey();
+    public abstract long estimatedSizeOnHeap();
 
-    @Override
-    default int routingHash()
+    public static AccordRoutingKey of(Key key)
     {
-        return token().tokenHash();
+        return (AccordRoutingKey) key;
     }
 
-    class SentinelKey implements AccordRoutingKey
+    public static class SentinelKey extends AccordRoutingKey
     {
         private static final long EMPTY_SIZE = ObjectSizes.measure(new SentinelKey(null, true));
 
-        private final TableId tableId;
         private final boolean isMin;
 
         private SentinelKey(TableId tableId, boolean isMin)
         {
-            this.tableId = tableId;
+            super(tableId);
             this.isMin = isMin;
         }
 
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            SentinelKey that = (SentinelKey) o;
-            return isMin == that.isMin && tableId.equals(that.tableId);
-        }
-
         @Override
         public int hashCode()
         {
@@ -113,15 +74,9 @@ public interface AccordRoutingKey extends RoutingKey
         }
 
         @Override
-        public int compareTo(RoutingKey that)
+        public RoutingKeyKind kindOfRoutingKey()
         {
-            return compare(this, (AccordRoutingKey) that);
-        }
-
-        @Override
-        public Kind kind()
-        {
-            return Kind.SENTINEL;
+            return RoutingKeyKind.SENTINEL;
         }
 
         @Override
@@ -140,12 +95,6 @@ public interface AccordRoutingKey extends RoutingKey
             return new SentinelKey(tableId, false);
         }
 
-        @Override
-        public TableId tableId()
-        {
-            return tableId;
-        }
-
         @Override
         public Token token()
         {
@@ -191,44 +140,7 @@ public interface AccordRoutingKey extends RoutingKey
         };
     }
 
-    abstract class AbstractRoutingKey implements AccordRoutingKey
-    {
-        private final TableId tableId;
-
-        public AbstractRoutingKey(TableId tableId)
-        {
-            this.tableId = tableId;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            AbstractRoutingKey that = (AbstractRoutingKey) o;
-            return tableId.equals(that.tableId) && token().equals(that.token());
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hash(tableId, token());
-        }
-
-        @Override
-        public int compareTo(RoutingKey that)
-        {
-            return compare(this, (AccordRoutingKey) that);
-        }
-
-        @Override
-        public TableId tableId()
-        {
-            return tableId;
-        }
-    }
-
-    class TokenKey extends AbstractRoutingKey
+    public static class TokenKey extends AccordRoutingKey
     {
         private static final long EMPTY_SIZE;
 
@@ -252,9 +164,9 @@ public interface AccordRoutingKey extends RoutingKey
         }
 
         @Override
-        public Kind kind()
+        public RoutingKeyKind kindOfRoutingKey()
         {
-            return Kind.TOKEN;
+            return RoutingKeyKind.TOKEN;
         }
 
         @Override
@@ -300,13 +212,14 @@ public interface AccordRoutingKey extends RoutingKey
         }
     }
 
-    IVersionedSerializer<AccordRoutingKey> serializer = new IVersionedSerializer<AccordRoutingKey>()
+    public static final IVersionedSerializer<AccordRoutingKey> serializer = new IVersionedSerializer<AccordRoutingKey>()
     {
+        final RoutingKeyKind[] kinds = RoutingKeyKind.values();
         @Override
         public void serialize(AccordRoutingKey key, DataOutputPlus out, int version) throws IOException
         {
-            out.write(key.kind().ordinal());
-            switch (key.kind())
+            out.write(key.kindOfRoutingKey().ordinal());
+            switch (key.kindOfRoutingKey())
             {
                 case TOKEN:
                     TokenKey.serializer.serialize((TokenKey) key, out, version);
@@ -322,7 +235,7 @@ public interface AccordRoutingKey extends RoutingKey
         @Override
         public AccordRoutingKey deserialize(DataInputPlus in, int version) throws IOException
         {
-            Kind kind = Kind.values()[in.readByte()];
+            RoutingKeyKind kind = kinds[in.readByte()];
             switch (kind)
             {
                 case TOKEN:
@@ -338,7 +251,7 @@ public interface AccordRoutingKey extends RoutingKey
         public long serializedSize(AccordRoutingKey key, int version)
         {
             long size = TypeSizes.BYTE_SIZE; // kind ordinal
-            switch (key.kind())
+            switch (key.kindOfRoutingKey())
             {
                 case TOKEN:
                     size += TokenKey.serializer.serializedSize((TokenKey) key, version);
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
new file mode 100644
index 0000000000..18296d4adc
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+
+import accord.api.Key;
+import accord.api.RoutingKey;
+import accord.primitives.Routable;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class PartitionKey extends AccordRoutableKey implements Key
+{
+    private static final long EMPTY_SIZE;
+
+    static
+    {
+        DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+        EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, key));
+    }
+
+    final DecoratedKey key;
+
+    public PartitionKey(TableId tableId, DecoratedKey key)
+    {
+        super(tableId);
+        this.key = key;
+    }
+
+    public static PartitionKey of(Key key)
+    {
+        return (PartitionKey) key;
+    }
+
+    public static PartitionKey of(Partition partition)
+    {
+        return new PartitionKey(partition.metadata().id, partition.partitionKey());
+    }
+
+    public static PartitionKey of(SinglePartitionReadCommand command)
+    {
+        return new PartitionKey(command.metadata().id, command.partitionKey());
+    }
+
+    @Override
+    public Token token()
+    {
+        return partitionKey().getToken();
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return key;
+    }
+
+    @Override
+    public RoutingKey toUnseekable()
+    {
+        return new TokenKey(tableId(), token());
+    }
+
+    public long estimatedSizeOnHeap()
+    {
+        return EMPTY_SIZE + ByteBufferAccessor.instance.size(partitionKey().getKey());
+    }
+
+    @Override
+    public String toString()
+    {
+        return "PartitionKey{" +
+               "tableId=" + tableId() +
+               ", key=" + partitionKey() +
+               '}';
+    }
+
+    // TODO: callers to this method are not correctly handling ranges
+    public static PartitionKey toPartitionKey(Routable routable)
+    {
+        return (PartitionKey) routable;
+    }
+
+    public static final Serializer serializer = new Serializer();
+    public static class Serializer implements IVersionedSerializer<PartitionKey>
+    {
+        // TODO: add vint to value accessor and use vints
+        private Serializer() {}
+
+        @Override
+        public void serialize(PartitionKey key, DataOutputPlus out, int version) throws IOException
+        {
+            key.tableId().serialize(out);
+            ByteBufferUtil.writeWithShortLength(key.partitionKey().getKey(), out);
+        }
+
+        public <V> int serialize(PartitionKey key, V dst, ValueAccessor<V> accessor, int offset)
+        {
+            int position = offset;
+            position += key.tableId().serialize(dst, accessor, position);
+            ByteBuffer bytes = key.partitionKey().getKey();
+            int numBytes = ByteBufferAccessor.instance.size(bytes);
+            Preconditions.checkState(numBytes <= Short.MAX_VALUE);
+            position += accessor.putShort(dst, position, (short) numBytes);
+            position += accessor.copyByteBufferTo(bytes, 0, dst, position, numBytes);
+            return position - offset;
+
+        }
+
+        @Override
+        public PartitionKey deserialize(DataInputPlus in, int version) throws IOException
+        {
+            TableId tableId = TableId.deserialize(in);
+            TableMetadata metadata = Schema.instance.getExistingTableMetadata(tableId);
+            DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
+            return new PartitionKey(tableId, key);
+        }
+
+        public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, int offset) throws IOException
+        {
+            TableId tableId = TableId.deserialize(src, accessor, offset);
+            offset += TableId.serializedSize();
+            TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
+            int numBytes = accessor.getShort(src, offset);
+            offset += TypeSizes.SHORT_SIZE;
+            ByteBuffer bytes = ByteBuffer.allocate(numBytes);
+            accessor.copyTo(src, offset, bytes, ByteBufferAccessor.instance, 0, numBytes);
+            DecoratedKey key = metadata.partitioner.decorateKey(bytes);
+            return new PartitionKey(tableId, key);
+        }
+
+        @Override
+        public long serializedSize(PartitionKey key, int version)
+        {
+            return serializedSize(key);
+        }
+
+        public long serializedSize(PartitionKey key)
+        {
+            return key.tableId().serializedSize() + ByteBufferUtil.serializedSizeWithShortLength(key.partitionKey().getKey());
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java b/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java
index 0b3c87536f..4a307333a4 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncContext.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.service.accord.AccordPartialCommand;
 import org.apache.cassandra.service.accord.AccordState;
 import org.apache.cassandra.service.accord.AccordStateCache;
 import org.apache.cassandra.service.accord.AccordState.WriteOnly;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 
 public class AsyncContext
 {
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 74c25739e1..3bbc9c2828 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -38,7 +38,7 @@ import org.apache.cassandra.service.accord.AccordCommandsForKey;
 import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordStateCache;
 import org.apache.cassandra.service.accord.AccordState;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
 
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 7f5bc1a1c5..d52fcb1439 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -27,13 +27,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import accord.api.Key;
 import accord.local.CommandStore;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
+import accord.primitives.Seekables;
 import accord.primitives.TxnId;
 import org.apache.cassandra.service.accord.AccordCommandStore;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 
 public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runnable, Function<SafeCommandStore, R>, BiConsumer<Object, Throwable>
@@ -208,9 +208,17 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna
         }
     }
 
-    private static Iterable<PartitionKey> toPartitionKeys(Iterable<? extends Key> iterable)
+    private static Iterable<PartitionKey> toPartitionKeys(Seekables<?, ?> keys)
     {
-        return (Iterable<PartitionKey>) iterable;
+        switch (keys.kindOfContents())
+        {
+            default: throw new AssertionError();
+            case Key:
+                return (Iterable<PartitionKey>) keys;
+            case Range:
+                // TODO: implement
+                throw new UnsupportedOperationException();
+        }
     }
 
     static class ForFunction<R> extends AsyncOperation<R>
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index 64f6bdeab0..cd8aca5ab5 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -32,7 +32,8 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.api.Key;
+import accord.primitives.Routable;
+import accord.primitives.Seekable;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import org.apache.cassandra.concurrent.Stage;
@@ -44,7 +45,7 @@ import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordPartialCommand;
 import org.apache.cassandra.service.accord.AccordStateCache;
 import org.apache.cassandra.service.accord.AccordState;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.store.StoredSet;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
@@ -259,8 +260,11 @@ public class AsyncWriter
         //             might not reflect the update timestamp in the map? Probably best addressed following Blake's refactor.
         if (command.known().isDefinitionKnown() && AccordPartialCommand.serializer.needsUpdate(command))
         {
-            for (Key key : command.partialTxn().keys())
+            for (Seekable key : command.partialTxn().keys())
             {
+                // TODO: implement
+                if (key.kind() == Routable.Kind.Range)
+                    throw new UnsupportedOperationException();
                 PartitionKey partitionKey = (PartitionKey) key;
                 AccordCommandsForKey cfk = cfkForDenormalization(partitionKey, context);
                 cfk.updateSummaries(command);
diff --git a/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java b/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java
index 9e1fb75a9e..ec4b260934 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AbstractKeyIndexed.java
@@ -22,23 +22,15 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Objects;
-import java.util.TreeMap;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import com.google.common.base.Preconditions;
-
 import accord.api.Key;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
 import accord.primitives.Keys;
+import accord.primitives.Ranges;
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -46,7 +38,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.accord.AccordObjectSizes;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -148,7 +140,7 @@ public abstract class AbstractKeyIndexed<T>
             serialized[this.keys.indexOf(keyFunction.apply(items.get(i)))] = serialize(items.get(i));
     }
 
-    protected <V> V slice(KeyRanges ranges, BiFunction<Keys, ByteBuffer[], V> constructor)
+    protected <V> V slice(Ranges ranges, BiFunction<Keys, ByteBuffer[], V> constructor)
     {
         // TODO: Routables patch permits us to do this more efficiently
         Keys keys = this.keys.slice(ranges);
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordData.java b/src/java/org/apache/cassandra/service/accord/db/AccordData.java
index b5feba8901..59fefe930b 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordData.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordData.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.TreeMap;
 
 import com.google.common.base.Preconditions;
 
@@ -43,8 +42,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ObjectSizes;
 
 public class AccordData extends AbstractKeyIndexed<FilteredPartition> implements Data, Result, Iterable<FilteredPartition>
@@ -58,7 +56,7 @@ public class AccordData extends AbstractKeyIndexed<FilteredPartition> implements
 
     public AccordData(FilteredPartition partition)
     {
-        this(Keys.of(AccordKey.of(partition)), new ByteBuffer[] { serialize(partition, partitionSerializer) });
+        this(Keys.of(PartitionKey.of(partition)), new ByteBuffer[] { serialize(partition, partitionSerializer) });
     }
 
     public AccordData(List<FilteredPartition> items)
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordRead.java b/src/java/org/apache/cassandra/service/accord/db/AccordRead.java
index a0a7532df3..4df86b07c9 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordRead.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordRead.java
@@ -30,10 +30,9 @@ import accord.api.Data;
 import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.Read;
-import accord.local.CommandStore;
 import accord.local.SafeCommandStore;
-import accord.primitives.KeyRanges;
 import accord.primitives.Keys;
+import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import org.apache.cassandra.concurrent.Stage;
@@ -48,8 +47,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.AccordCommandsForKey;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -60,7 +58,7 @@ public class AccordRead extends AbstractKeyIndexed<SinglePartitionReadCommand> i
 
     public AccordRead(List<SinglePartitionReadCommand> items)
     {
-        super(items, AccordKey::of);
+        super(items, PartitionKey::of);
     }
 
     public AccordRead(Keys keys, ByteBuffer[] serialized)
@@ -128,7 +126,7 @@ public class AccordRead extends AbstractKeyIndexed<SinglePartitionReadCommand> i
     }
 
     @Override
-    public Read slice(KeyRanges ranges)
+    public Read slice(Ranges ranges)
     {
         return super.slice(ranges, AccordRead::new);
     }
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java b/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java
index 6ea2c4de41..a25ce6c874 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordUpdate.java
@@ -42,8 +42,8 @@ import accord.api.Data;
 import accord.api.Key;
 import accord.api.Update;
 import accord.api.Write;
-import accord.primitives.KeyRanges;
 import accord.primitives.Keys;
+import accord.primitives.Ranges;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.Columns;
@@ -71,8 +71,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -761,7 +760,7 @@ public class AccordUpdate implements Update
         for (AbstractUpdate updater : deserialize(updates, AbstractUpdate.serializer))
         {
             PartitionUpdate update = updater.apply(read.get(updater.partitionKey()));
-            PartitionKey key = AccordKey.of(update);
+            PartitionKey key = PartitionKey.of(update);
             if (updateMap.containsKey(key))
                 update = PartitionUpdate.merge(Lists.newArrayList(updateMap.get(key), update));
             updateMap.put(key, update);
@@ -770,7 +769,7 @@ public class AccordUpdate implements Update
     }
 
     @Override
-    public Update slice(KeyRanges ranges)
+    public Update slice(Ranges ranges)
     {
         Keys keys = this.keys.slice(ranges);
         return new AccordUpdate(keys, select(this.keys, keys, updates), select(this.keys, keys, predicates));
diff --git a/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java b/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java
index 5584b7c3cc..ddaf89e813 100644
--- a/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/db/AccordWrite.java
@@ -38,8 +38,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.AccordCommandsForKey;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -52,7 +51,7 @@ public class AccordWrite extends AbstractKeyIndexed<PartitionUpdate> implements
 
     public AccordWrite(List<PartitionUpdate> items)
     {
-        super(items, AccordKey::of);
+        super(items, PartitionKey::of);
     }
 
     public AccordWrite(Keys keys, ByteBuffer[] serialized)
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index a635ccc671..91bf1fed10 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -43,18 +43,18 @@ public class AcceptSerializers
         {
             CommandSerializers.ballot.serialize(accept.ballot, out, version);
             CommandSerializers.timestamp.serialize(accept.executeAt, out, version);
-            KeySerializers.keys.serialize(accept.keys, out, version);
+            KeySerializers.seekables.serialize(accept.keys, out, version);
             DepsSerializer.partialDeps.serialize(accept.partialDeps, out, version);
             CommandSerializers.kind.serialize(accept.kind, out, version);
         }
 
         @Override
-        public Accept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
+        public Accept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
         {
             return create(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey,
                           CommandSerializers.ballot.deserialize(in, version),
                           CommandSerializers.timestamp.deserialize(in, version),
-                          KeySerializers.keys.deserialize(in, version),
+                          KeySerializers.seekables.deserialize(in, version),
                           DepsSerializer.partialDeps.deserialize(in, version),
                           CommandSerializers.kind.deserialize(in, version));
         }
@@ -64,7 +64,7 @@ public class AcceptSerializers
         {
             return CommandSerializers.ballot.serializedSize(accept.ballot, version)
                    + CommandSerializers.timestamp.serializedSize(accept.executeAt, version)
-                   + KeySerializers.keys.serializedSize(accept.keys, version)
+                   + KeySerializers.seekables.serializedSize(accept.keys, version)
                    + DepsSerializer.partialDeps.serializedSize(accept.partialDeps, version)
                    + CommandSerializers.kind.serializedSize(accept.kind, version);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index 79d9b1b2b0..e14902e7a4 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -21,12 +21,11 @@ package org.apache.cassandra.service.accord.serializers;
 import java.io.IOException;
 
 import accord.api.RoutingKey;
-import accord.local.SaveStatus;
 import accord.local.Status;
 import accord.messages.BeginInvalidation;
 import accord.messages.BeginInvalidation.InvalidateReply;
-import accord.primitives.AbstractRoute;
 import accord.primitives.Ballot;
+import accord.primitives.Route;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -44,7 +43,7 @@ public class BeginInvalidationSerializers
         public void serialize(BeginInvalidation begin, DataOutputPlus out, int version) throws IOException
         {
             CommandSerializers.txnId.serialize(begin.txnId, out, version);
-            KeySerializers.routingKeys.serialize(begin.someKeys, out, version);
+            KeySerializers.unseekables.serialize(begin.someUnseekables, out, version);
             CommandSerializers.ballot.serialize(begin.ballot, out, version);
         }
 
@@ -52,7 +51,7 @@ public class BeginInvalidationSerializers
         public BeginInvalidation deserialize(DataInputPlus in, int version) throws IOException
         {
             return new BeginInvalidation(CommandSerializers.txnId.deserialize(in, version),
-                                       KeySerializers.routingKeys.deserialize(in, version),
+                                       KeySerializers.unseekables.deserialize(in, version),
                                        CommandSerializers.ballot.deserialize(in, version));
         }
 
@@ -60,7 +59,7 @@ public class BeginInvalidationSerializers
         public long serializedSize(BeginInvalidation begin, int version)
         {
             return CommandSerializers.txnId.serializedSize(begin.txnId, version)
-                   + KeySerializers.routingKeys.serializedSize(begin.someKeys, version)
+                   + KeySerializers.unseekables.serializedSize(begin.someUnseekables, version)
                    + CommandSerializers.ballot.serializedSize(begin.ballot, version);
         }
     };
@@ -74,7 +73,7 @@ public class BeginInvalidationSerializers
             CommandSerializers.ballot.serialize(reply.accepted, out, version);
             CommandSerializers.status.serialize(reply.status, out, version);
             out.writeBoolean(reply.acceptedFastPath);
-            serializeNullable(KeySerializers.abstractRoute, reply.route, out, version);
+            serializeNullable(KeySerializers.route, reply.route, out, version);
             serializeNullable(KeySerializers.routingKey, reply.homeKey, out, version);
         }
 
@@ -85,7 +84,7 @@ public class BeginInvalidationSerializers
             Ballot accepted = CommandSerializers.ballot.deserialize(in, version);
             Status status = CommandSerializers.status.deserialize(in, version);
             boolean acceptedFastPath = in.readBoolean();
-            AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version);
+            Route<?> route = deserializeNullable(KeySerializers.route, in, version);
             RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
             return new InvalidateReply(supersededBy, accepted, status, acceptedFastPath, route, homeKey);
         }
@@ -97,7 +96,7 @@ public class BeginInvalidationSerializers
                     + CommandSerializers.ballot.serializedSize(reply.accepted, version)
                     + CommandSerializers.status.serializedSize(reply.status, version)
                     + TypeSizes.BOOL_SIZE
-                    + serializedSizeNullable(KeySerializers.abstractRoute, reply.route, version)
+                    + serializedSizeNullable(KeySerializers.route, reply.route, version)
                     + serializedSizeNullable(KeySerializers.routingKey, reply.homeKey, version);
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index 7076dcd1cb..9e359f8e01 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -29,13 +29,14 @@ import accord.messages.CheckStatus.CheckStatusNack;
 import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.CheckStatus.CheckStatusOkFull;
 import accord.messages.CheckStatus.CheckStatusReply;
-import accord.primitives.AbstractRoute;
 import accord.primitives.Ballot;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Routables;
+import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import accord.primitives.Writes;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -58,7 +59,7 @@ public class CheckStatusSerializers
         public void serialize(CheckStatus check, DataOutputPlus out, int version) throws IOException
         {
             CommandSerializers.txnId.serialize(check.txnId, out, version);
-            KeySerializers.routingKeys.serialize(check.someKeys, out, version);
+            KeySerializers.unseekables.serialize(check.query, out, version);
             out.writeUnsignedVInt(check.startEpoch);
             out.writeUnsignedVInt(check.endEpoch - check.startEpoch);
             out.writeByte(check.includeInfo.ordinal());
@@ -68,18 +69,18 @@ public class CheckStatusSerializers
         public CheckStatus deserialize(DataInputPlus in, int version) throws IOException
         {
             TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
-            RoutingKeys someKeys = KeySerializers.routingKeys.deserialize(in, version);
+            Unseekables<?, ?> query = KeySerializers.unseekables.deserialize(in, version);
             long startEpoch = in.readUnsignedVInt();
             long endEpoch = in.readUnsignedVInt() + startEpoch;
             CheckStatus.IncludeInfo info = infos[in.readByte()];
-            return new CheckStatus(txnId, someKeys, startEpoch, endEpoch, info);
+            return new CheckStatus(txnId, query, startEpoch, endEpoch, info);
         }
 
         @Override
         public long serializedSize(CheckStatus check, int version)
         {
             return CommandSerializers.txnId.serializedSize(check.txnId, version)
-                   + KeySerializers.routingKeys.serializedSize(check.someKeys, version)
+                   + KeySerializers.unseekables.serializedSize(check.query, version)
                    + TypeSizes.sizeofUnsignedVInt(check.startEpoch)
                    + TypeSizes.sizeofUnsignedVInt(check.endEpoch - check.startEpoch)
                    + TypeSizes.BYTE_SIZE;
@@ -109,7 +110,7 @@ public class CheckStatusSerializers
             serializeNullable(ok.executeAt, out, version, CommandSerializers.timestamp);
             out.writeBoolean(ok.isCoordinating);
             CommandSerializers.durability.serialize(ok.durability, out, version);
-            serializeNullable(ok.route, out, version, KeySerializers.abstractRoute);
+            serializeNullable(ok.route, out, version, KeySerializers.route);
             serializeNullable(ok.homeKey, out, version, KeySerializers.routingKey);
 
             if (!(reply instanceof CheckStatusOkFull))
@@ -139,7 +140,7 @@ public class CheckStatusSerializers
                     Timestamp executeAt = deserializeNullable(in, version, CommandSerializers.timestamp);
                     boolean isCoordinating = in.readBoolean();
                     Durability durability = CommandSerializers.durability.deserialize(in, version);
-                    AbstractRoute route = deserializeNullable(in, version, KeySerializers.abstractRoute);
+                    Route<?> route = deserializeNullable(in, version, KeySerializers.route);
                     RoutingKey homeKey = deserializeNullable(in, version, KeySerializers.routingKey);
 
                     if (kind == OK)
@@ -169,7 +170,7 @@ public class CheckStatusSerializers
             size += TypeSizes.BOOL_SIZE;
             size += CommandSerializers.durability.serializedSize(ok.durability, version);
             size += serializedSizeNullable(ok.homeKey, version, KeySerializers.routingKey);
-            size += serializedSizeNullable(ok.route, version, KeySerializers.abstractRoute);
+            size += serializedSizeNullable(ok.route, version, KeySerializers.route);
 
             if (!(reply instanceof CheckStatusOkFull))
                 return size;
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index ef19dbe2d0..9c74cbb29e 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -27,9 +27,9 @@ import accord.local.SaveStatus;
 import accord.local.Status;
 import accord.local.Status.Durability;
 import accord.primitives.Ballot;
-import accord.primitives.KeyRanges;
-import accord.primitives.Keys;
 import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -131,7 +131,7 @@ public class CommandSerializers
         {
             CommandSerializers.kind.serialize(txn.kind(), out, version);
             KeySerializers.ranges.serialize(txn.covering(), out, version);
-            KeySerializers.keys.serialize(txn.keys(), out, version);
+            KeySerializers.seekables.serialize(txn.keys(), out, version);
             AccordRead.serializer.serialize((AccordRead) txn.read(), out, version);
             AccordQuery.serializer.serialize((AccordQuery) txn.query(), out, version);
             out.writeBoolean(txn.update() != null);
@@ -143,8 +143,8 @@ public class CommandSerializers
         public PartialTxn deserialize(DataInputPlus in, int version) throws IOException
         {
             Txn.Kind kind = CommandSerializers.kind.deserialize(in, version);
-            KeyRanges covering = KeySerializers.ranges.deserialize(in, version);
-            Keys keys = KeySerializers.keys.deserialize(in, version);
+            Ranges covering = KeySerializers.ranges.deserialize(in, version);
+            Seekables<?, ?> keys = KeySerializers.seekables.deserialize(in, version);
             AccordRead read = AccordRead.serializer.deserialize(in, version);
             AccordQuery query = AccordQuery.serializer.deserialize(in, version);
             AccordUpdate update = in.readBoolean() ? AccordUpdate.serializer.deserialize(in, version) : null;
@@ -156,7 +156,7 @@ public class CommandSerializers
         {
             long size = CommandSerializers.kind.serializedSize(txn.kind(), version);
             size += KeySerializers.ranges.serializedSize(txn.covering(), version);
-            size += KeySerializers.keys.serializedSize(txn.keys(), version);
+            size += KeySerializers.seekables.serializedSize(txn.keys(), version);
             size += AccordRead.serializer.serializedSize((AccordRead) txn.read(), version);
             size += AccordQuery.serializer.serializedSize((AccordQuery) txn.query(), version);
             size += TypeSizes.sizeof(txn.update() != null);
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
index 0093e8095e..c43489fb83 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
@@ -22,8 +22,9 @@ import java.io.IOException;
 
 import accord.messages.Commit;
 import accord.primitives.PartialRoute;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Routables;
 import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -43,18 +44,18 @@ public class CommitSerializers
             CommandSerializers.timestamp.serialize(msg.executeAt, out, version);
             serializeNullable(msg.partialTxn, out, version, CommandSerializers.partialTxn);
             DepsSerializer.partialDeps.serialize(msg.partialDeps, out, version);
-            serializeNullable(msg.route, out, version, KeySerializers.route);
+            serializeNullable(msg.route, out, version, KeySerializers.fullRoute);
             serializeNullable(msg.read, out, version, ReadDataSerializers.request);
         }
 
         @Override
-        public Commit deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException
+        public Commit deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException
         {
             return Commit.SerializerSupport.create(txnId, scope, waitForEpoch,
                                                    CommandSerializers.timestamp.deserialize(in, version),
                                                    deserializeNullable(in, version, CommandSerializers.partialTxn),
                                                    DepsSerializer.partialDeps.deserialize(in, version),
-                                                   deserializeNullable(in, version, KeySerializers.route),
+                                                   deserializeNullable(in, version, KeySerializers.fullRoute),
                                                    deserializeNullable(in, version, ReadDataSerializers.request)
             );
         }
@@ -65,7 +66,7 @@ public class CommitSerializers
             return CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
                    + serializedSizeNullable(msg.partialTxn, version, CommandSerializers.partialTxn)
                    + DepsSerializer.partialDeps.serializedSize(msg.partialDeps, version)
-                   + serializedSizeNullable(msg.route, version, KeySerializers.route)
+                   + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
                    + serializedSizeNullable(msg.read, version, ReadDataSerializers.request);
         }
     };
@@ -76,7 +77,7 @@ public class CommitSerializers
         public void serialize(Commit.Invalidate invalidate, DataOutputPlus out, int version) throws IOException
         {
             CommandSerializers.txnId.serialize(invalidate.txnId, out, version);
-            KeySerializers.routingKeys.serialize(invalidate.scope, out, version);
+            KeySerializers.unseekables.serialize(invalidate.scope, out, version);
             out.writeUnsignedVInt(invalidate.waitForEpoch);
             out.writeUnsignedVInt(invalidate.invalidateUntilEpoch - invalidate.waitForEpoch);
         }
@@ -85,17 +86,17 @@ public class CommitSerializers
         public Commit.Invalidate deserialize(DataInputPlus in, int version) throws IOException
         {
             TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
-            RoutingKeys routingKeys = KeySerializers.routingKeys.deserialize(in, version);
+            Unseekables<?, ?> scope = KeySerializers.unseekables.deserialize(in, version);
             long waitForEpoch = in.readUnsignedVInt();
             long invalidateUntilEpoch = in.readUnsignedVInt() + waitForEpoch;
-            return Commit.Invalidate.SerializerSupport.create(txnId, routingKeys, waitForEpoch, invalidateUntilEpoch);
+            return Commit.Invalidate.SerializerSupport.create(txnId, scope, waitForEpoch, invalidateUntilEpoch);
         }
 
         @Override
         public long serializedSize(Commit.Invalidate invalidate, int version)
         {
             return CommandSerializers.txnId.serializedSize(invalidate.txnId, version)
-                   + KeySerializers.routingKeys.serializedSize(invalidate.scope, version)
+                   + KeySerializers.unseekables.serializedSize(invalidate.scope, version)
                    + TypeSizes.sizeofUnsignedVInt(invalidate.waitForEpoch)
                    + TypeSizes.sizeofUnsignedVInt(invalidate.invalidateUntilEpoch - invalidate.waitForEpoch);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
index 2917862c13..22b80554f1 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
@@ -21,9 +21,9 @@ package org.apache.cassandra.service.accord.serializers;
 import java.io.IOException;
 
 import accord.primitives.Deps;
-import accord.primitives.KeyRanges;
 import accord.primitives.Keys;
 import accord.primitives.PartialDeps;
+import accord.primitives.Ranges;
 import accord.primitives.TxnId;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -49,7 +49,7 @@ public abstract class DepsSerializer<D extends Deps> implements IVersionedSerial
         @Override
         PartialDeps deserialize(Keys keys, TxnId[] txnIds, int[] keyToTxnIds, DataInputPlus in, int version) throws IOException
         {
-            KeyRanges covering = KeySerializers.ranges.deserialize(in, version);
+            Ranges covering = KeySerializers.ranges.deserialize(in, version);
             return PartialDeps.SerializerSupport.create(covering, keys, txnIds, keyToTxnIds);
         }
 
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java
index eb8acf1a67..1de5b96a5b 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/GetDepsSerializers.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 
 import accord.messages.GetDeps;
 import accord.messages.GetDeps.GetDepsOk;
-import accord.primitives.Keys;
 import accord.primitives.PartialRoute;
+import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -38,15 +38,15 @@ public class GetDepsSerializers
         @Override
         public void serializeBody(GetDeps msg, DataOutputPlus out, int version) throws IOException
         {
-            KeySerializers.keys.serialize(msg.keys, out, version);
+            KeySerializers.seekables.serialize(msg.keys, out, version);
             CommandSerializers.timestamp.serialize(msg.executeAt, out, version);
             CommandSerializers.kind.serialize(msg.kind, out, version);
         }
 
         @Override
-        public GetDeps deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
+        public GetDeps deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
         {
-            Keys keys = KeySerializers.keys.deserialize(in, version);
+            Seekables<?, ?> keys = KeySerializers.seekables.deserialize(in, version);
             Timestamp executeAt = CommandSerializers.timestamp.deserialize(in, version);
             Txn.Kind kind = CommandSerializers.kind.deserialize(in, version);
             return GetDeps.SerializationSupport.create(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey, keys, executeAt, kind);
@@ -55,7 +55,7 @@ public class GetDepsSerializers
         @Override
         public long serializedBodySize(GetDeps msg, int version)
         {
-            return KeySerializers.keys.serializedSize(msg.keys, version)
+            return KeySerializers.seekables.serializedSize(msg.keys, version)
                    + CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
                    + CommandSerializers.kind.serializedSize(msg.kind, version);
         }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index 58f62254aa..646abd3b3d 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -19,60 +19,49 @@
 package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.function.IntFunction;
 
 import accord.api.Key;
 import accord.api.RoutingKey;
 import accord.primitives.AbstractKeys;
-import accord.primitives.AbstractRoute;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
+import accord.primitives.AbstractRanges;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.FullRangeRoute;
+import accord.primitives.FullRoute;
 import accord.primitives.Keys;
+import accord.primitives.PartialKeyRoute;
+import accord.primitives.PartialRangeRoute;
 import accord.primitives.PartialRoute;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
 import accord.primitives.Route;
 import accord.primitives.RoutingKeys;
+import accord.primitives.Seekables;
+import accord.primitives.Unseekables;
+import accord.primitives.Unseekables.UnseekablesKind;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 
 public class KeySerializers
 {
     private KeySerializers() {}
 
-    public static final IVersionedSerializer<Key> key = (IVersionedSerializer<Key>) (IVersionedSerializer<?>) AccordKey.serializer;
+    public static final IVersionedSerializer<Key> key = (IVersionedSerializer<Key>) (IVersionedSerializer<?>) PartitionKey.serializer;
     public static final IVersionedSerializer<RoutingKey> routingKey = (IVersionedSerializer<RoutingKey>) (IVersionedSerializer<?>) AccordRoutingKey.serializer;
 
-
-    public static final IVersionedSerializer<KeyRanges> ranges = new IVersionedSerializer<KeyRanges>()
+    public static final IVersionedSerializer<RoutingKeys> routingKeys = new AbstractKeysSerializer<RoutingKey, RoutingKeys>(routingKey, RoutingKey[]::new)
     {
-        @Override
-        public void serialize(KeyRanges ranges, DataOutputPlus out, int version) throws IOException
-        {
-            out.writeUnsignedVInt(ranges.size());
-            for (int i=0, mi=ranges.size(); i<mi; i++)
-                TokenRange.serializer.serialize((TokenRange) ranges.get(i), out, version);
-        }
-
-        @Override
-        public KeyRanges deserialize(DataInputPlus in, int version) throws IOException
-        {
-            KeyRange[] ranges = new KeyRange[(int)in.readUnsignedVInt()];
-            for (int i=0; i<ranges.length; i++)
-                ranges[i] = TokenRange.serializer.deserialize(in, version);
-            return KeyRanges.ofSortedAndDeoverlapped(ranges);
-        }
-
-        @Override
-        public long serializedSize(KeyRanges ranges, int version)
+        @Override RoutingKeys deserialize(DataInputPlus in, int version, RoutingKey[] keys)
         {
-            long size = TypeSizes.sizeofUnsignedVInt(ranges.size());
-            for (int i=0, mi=ranges.size(); i<mi; i++)
-                size += TokenRange.serializer.serializedSize((TokenRange) ranges.get(i), version);
-            return size;
+            return RoutingKeys.SerializationSupport.create(keys);
         }
     };
 
@@ -83,24 +72,27 @@ public class KeySerializers
             return Keys.SerializationSupport.create(keys);
         }
     };
-    public static final IVersionedSerializer<RoutingKeys> routingKeys = new AbstractKeysSerializer<RoutingKey, RoutingKeys>(routingKey, RoutingKey[]::new)
+
+    public static final IVersionedSerializer<Ranges> ranges = new AbstractRangesSerializer<Ranges>()
     {
-        @Override RoutingKeys deserialize(DataInputPlus in, int version, RoutingKey[] keys)
+        @Override
+        public Ranges deserialize(DataInputPlus in, int version, Range[] ranges)
         {
-            return RoutingKeys.SerializationSupport.create(keys);
+            return Ranges.ofSortedAndDeoverlapped(ranges);
         }
     };
-    public static final IVersionedSerializer<PartialRoute> partialRoute = new AbstractKeysSerializer<RoutingKey, PartialRoute>(routingKey, RoutingKey[]::new)
+
+    public static final IVersionedSerializer<PartialKeyRoute> partialKeyRoute = new AbstractKeysSerializer<RoutingKey, PartialKeyRoute>(routingKey, RoutingKey[]::new)
     {
-        @Override PartialRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
+        @Override PartialKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
         {
-            KeyRanges covering = ranges.deserialize(in, version);
+            Ranges covering = ranges.deserialize(in, version);
             RoutingKey homeKey = routingKey.deserialize(in, version);
-            return PartialRoute.SerializationSupport.create(covering, homeKey, keys);
+            return PartialKeyRoute.SerializationSupport.create(covering, homeKey, keys);
         }
 
         @Override
-        public void serialize(PartialRoute keys, DataOutputPlus out, int version) throws IOException
+        public void serialize(PartialKeyRoute keys, DataOutputPlus out, int version) throws IOException
         {
             super.serialize(keys, out, version);
             ranges.serialize(keys.covering, out, version);
@@ -108,80 +100,236 @@ public class KeySerializers
         }
 
         @Override
-        public long serializedSize(PartialRoute keys, int version)
+        public long serializedSize(PartialKeyRoute keys, int version)
         {
             return super.serializedSize(keys, version)
                    + ranges.serializedSize(keys.covering, version)
                    + routingKey.serializedSize(keys.homeKey, version);
         }
     };
-    public static final IVersionedSerializer<Route> route = new AbstractKeysSerializer<RoutingKey, Route>(routingKey, RoutingKey[]::new)
+
+    public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new AbstractKeysSerializer<RoutingKey, FullKeyRoute>(routingKey, RoutingKey[]::new)
     {
-        @Override Route deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
+        @Override FullKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException
         {
             RoutingKey homeKey = routingKey.deserialize(in, version);
-            return Route.SerializationSupport.create(homeKey, keys);
+            return FullKeyRoute.SerializationSupport.create(homeKey, keys);
         }
 
         @Override
-        public void serialize(Route keys, DataOutputPlus out, int version) throws IOException
+        public void serialize(FullKeyRoute keys, DataOutputPlus out, int version) throws IOException
         {
             super.serialize(keys, out, version);
             routingKey.serialize(keys.homeKey, out, version);
         }
 
         @Override
-        public long serializedSize(Route keys, int version)
+        public long serializedSize(FullKeyRoute keys, int version)
         {
             return super.serializedSize(keys, version)
                  + routingKey.serializedSize(keys.homeKey, version);
         }
     };
 
-    public static final IVersionedSerializer<AbstractRoute> abstractRoute = new IVersionedSerializer<AbstractRoute>()
+    public static final IVersionedSerializer<PartialRangeRoute> partialRangeRoute = new AbstractRangesSerializer<PartialRangeRoute>()
     {
+        @Override PartialRangeRoute deserialize(DataInputPlus in, int version, Range[] rs) throws IOException
+        {
+            Ranges covering = ranges.deserialize(in, version);
+            RoutingKey homeKey = routingKey.deserialize(in, version);
+            return PartialRangeRoute.SerializationSupport.create(covering, homeKey, rs);
+        }
+
+        @Override
+        public void serialize(PartialRangeRoute rs, DataOutputPlus out, int version) throws IOException
+        {
+            super.serialize(rs, out, version);
+            ranges.serialize(rs.covering, out, version);
+            routingKey.serialize(rs.homeKey, out, version);
+        }
+
         @Override
-        public void serialize(AbstractRoute t, DataOutputPlus out, int version) throws IOException
+        public long serializedSize(PartialRangeRoute rs, int version)
+        {
+            return super.serializedSize(rs, version)
+                   + ranges.serializedSize(rs.covering, version)
+                   + routingKey.serializedSize(rs.homeKey, version);
+        }
+    };
+
+    public static final IVersionedSerializer<FullRangeRoute> fullRangeRoute = new AbstractRangesSerializer<FullRangeRoute>()
+    {
+        @Override FullRangeRoute deserialize(DataInputPlus in, int version, Range[] Ranges) throws IOException
         {
-            if (t instanceof Route)
+            RoutingKey homeRange = routingKey.deserialize(in, version);
+            return FullRangeRoute.SerializationSupport.create(homeRange, Ranges);
+        }
+
+        @Override
+        public void serialize(FullRangeRoute Ranges, DataOutputPlus out, int version) throws IOException
+        {
+            super.serialize(Ranges, out, version);
+            routingKey.serialize(Ranges.homeKey, out, version);
+        }
+
+        @Override
+        public long serializedSize(FullRangeRoute ranges, int version)
+        {
+            return super.serializedSize(ranges, version)
+                 + routingKey.serializedSize(ranges.homeKey(), version);
+        }
+    };
+
+    public static final IVersionedSerializer<Route<?>> route = new AbstractRoutablesSerializer<>(
+        EnumSet.of(UnseekablesKind.PartialKeyRoute, UnseekablesKind.FullKeyRoute, UnseekablesKind.PartialRangeRoute, UnseekablesKind.FullRangeRoute)
+    );
+
+    public static final IVersionedSerializer<PartialRoute<?>> partialRoute = new AbstractRoutablesSerializer<>(
+        EnumSet.of(UnseekablesKind.PartialKeyRoute, UnseekablesKind.PartialRangeRoute)
+    );
+
+    public static final IVersionedSerializer<FullRoute<?>> fullRoute = new AbstractRoutablesSerializer<>(
+        EnumSet.of(UnseekablesKind.FullKeyRoute, UnseekablesKind.FullRangeRoute)
+    );
+
+    public static final IVersionedSerializer<Unseekables<?, ?>> unseekables = new AbstractRoutablesSerializer<>(
+        EnumSet.allOf(UnseekablesKind.class)
+    );
+
+    static class AbstractRoutablesSerializer<RS extends Unseekables<?, ?>> implements IVersionedSerializer<RS>
+    {
+        final EnumSet<UnseekablesKind> permitted;
+        protected AbstractRoutablesSerializer(EnumSet<UnseekablesKind> permitted)
+        {
+            this.permitted = permitted;
+        }
+
+        @Override
+        public void serialize(RS t, DataOutputPlus out, int version) throws IOException
+        {
+            UnseekablesKind kind = t.kind();
+            if (!permitted.contains(kind))
+                throw new IllegalArgumentException();
+
+            switch (kind)
             {
-                out.writeByte(1);
-                route.serialize((Route)t, out, version);
+                default: throw new AssertionError();
+                case RoutingKeys:
+                    out.writeByte(1);
+                    routingKeys.serialize((RoutingKeys)t, out, version);
+                    break;
+                case PartialKeyRoute:
+                    out.writeByte(2);
+                    partialKeyRoute.serialize((PartialKeyRoute)t, out, version);
+                    break;
+                case FullKeyRoute:
+                    out.writeByte(3);
+                    fullKeyRoute.serialize((FullKeyRoute)t, out, version);
+                    break;
+                case RoutingRanges:
+                    out.writeByte(4);
+                    ranges.serialize((Ranges)t, out, version);
+                    break;
+                case PartialRangeRoute:
+                    out.writeByte(5);
+                    partialRangeRoute.serialize((PartialRangeRoute)t, out, version);
+                    break;
+                case FullRangeRoute:
+                    out.writeByte(6);
+                    fullRangeRoute.serialize((FullRangeRoute)t, out, version);
+                    break;
             }
-            else
+        }
+
+        @Override
+        public RS deserialize(DataInputPlus in, int version) throws IOException
+        {
+            byte b = in.readByte();
+            UnseekablesKind kind;
+            RS result;
+            switch (b)
+            {
+                default: throw new IOException("Corrupted input: expected byte 1, 2, 3, 4 or 5; received " + b);
+                case 1: kind = UnseekablesKind.RoutingKeys; result = (RS)routingKeys.deserialize(in, version); break;
+                case 2: kind = UnseekablesKind.PartialKeyRoute; result = (RS)partialKeyRoute.deserialize(in, version); break;
+                case 3: kind = UnseekablesKind.FullKeyRoute; result = (RS)fullKeyRoute.deserialize(in, version); break;
+                case 4: kind = UnseekablesKind.RoutingRanges; result = (RS)ranges.deserialize(in, version); break;
+                case 5: kind = UnseekablesKind.PartialRangeRoute; result = (RS)partialRangeRoute.deserialize(in, version); break;
+                case 6: kind = UnseekablesKind.FullRangeRoute; result = (RS)fullRangeRoute.deserialize(in, version); break;
+            }
+            if (!permitted.contains(kind))
+                throw new IllegalStateException();
+            return result;
+        }
+
+        @Override
+        public long serializedSize(RS t, int version)
+        {
+            switch (t.kind())
+            {
+                default: throw new AssertionError();
+                case RoutingKeys:
+                    return 1 + routingKeys.serializedSize((RoutingKeys)t, version);
+                case PartialKeyRoute:
+                    return 1 + partialKeyRoute.serializedSize((PartialKeyRoute)t, version);
+                case FullKeyRoute:
+                    return 1 + fullKeyRoute.serializedSize((FullKeyRoute)t, version);
+                case RoutingRanges:
+                    return 1 + ranges.serializedSize((Ranges)t, version);
+                case PartialRangeRoute:
+                    return 1 + partialRangeRoute.serializedSize((PartialRangeRoute)t, version);
+                case FullRangeRoute:
+                    return 1 + fullRangeRoute.serializedSize((FullRangeRoute)t, version);
+            }
+        }
+    }
+
+    public static final IVersionedSerializer<Seekables<?, ?>> seekables = new IVersionedSerializer<Seekables<?, ?>>()
+    {
+        @Override
+        public void serialize(Seekables<?, ?> t, DataOutputPlus out, int version) throws IOException
+        {
+            switch (t.kindOfContents())
             {
-                out.writeByte(2);
-                partialRoute.serialize((PartialRoute)t, out, version);
+                default: throw new AssertionError();
+                case Key:
+                    out.writeByte(1);
+                    keys.serialize((Keys)t, out, version);
+                    break;
+                case Range:
+                    out.writeByte(2);
+                    ranges.serialize((Ranges)t, out, version);
+                    break;
             }
         }
 
         @Override
-        public AbstractRoute deserialize(DataInputPlus in, int version) throws IOException
+        public Seekables<?, ?> deserialize(DataInputPlus in, int version) throws IOException
         {
             byte b = in.readByte();
             switch (b)
             {
                 default: throw new IOException("Corrupted input: expected byte 1 or 2, received " + b);
-                case 1: return route.deserialize(in, version);
-                case 2: return partialRoute.deserialize(in, version);
+                case 1: return keys.deserialize(in, version);
+                case 2: return ranges.deserialize(in, version);
             }
         }
 
         @Override
-        public long serializedSize(AbstractRoute t, int version)
+        public long serializedSize(Seekables<?, ?> t, int version)
         {
-            if (t instanceof Route)
-            {
-                return 1 + route.serializedSize((Route)t, version);
-            }
-            else
+            switch (t.kindOfContents())
             {
-                return 1 + partialRoute.serializedSize((PartialRoute)t, version);
+                default: throw new AssertionError();
+                case Key:
+                    return 1 + keys.serializedSize((Keys)t, version);
+                case Range:
+                    return 1 + ranges.serializedSize((Ranges)t, version);
             }
         }
     };
 
-    public static abstract class AbstractKeysSerializer<K extends RoutingKey, KS extends AbstractKeys<K, ?>> implements IVersionedSerializer<KS>
+    public static abstract class AbstractKeysSerializer<K extends RoutableKey, KS extends AbstractKeys<K, ?>> implements IVersionedSerializer<KS>
     {
         final IVersionedSerializer<K> keySerializer;
         final IntFunction<K[]> allocate;
@@ -219,5 +367,36 @@ public class KeySerializers
                 size += keySerializer.serializedSize(keys.get(i), version);
             return size;
         }
-    };
+    }
+
+    public static abstract class AbstractRangesSerializer<RS extends AbstractRanges<?>> implements IVersionedSerializer<RS>
+    {
+        @Override
+        public void serialize(RS ranges, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeUnsignedVInt(ranges.size());
+            for (int i=0, mi=ranges.size(); i<mi; i++)
+                TokenRange.serializer.serialize((TokenRange) ranges.get(i), out, version);
+        }
+
+        @Override
+        public RS deserialize(DataInputPlus in, int version) throws IOException
+        {
+            Range[] ranges = new Range[(int)in.readUnsignedVInt()];
+            for (int i=0; i<ranges.length; i++)
+                ranges[i] = TokenRange.serializer.deserialize(in, version);
+            return deserialize(in, version, ranges);
+        }
+
+        abstract RS deserialize(DataInputPlus in, int version, Range[] ranges) throws IOException;
+
+        @Override
+        public long serializedSize(RS ranges, int version)
+        {
+            long size = TypeSizes.sizeofUnsignedVInt(ranges.size());
+            for (int i=0, mi=ranges.size(); i<mi; i++)
+                size += TokenRange.serializer.serializedSize((TokenRange) ranges.get(i), version);
+            return size;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
index 43dd3f8c90..d4748d7a7f 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
@@ -25,9 +25,9 @@ import javax.annotation.Nullable;
 import accord.messages.PreAccept;
 import accord.messages.PreAccept.PreAcceptOk;
 import accord.messages.PreAccept.PreAcceptReply;
+import accord.primitives.FullRoute;
 import accord.primitives.PartialRoute;
 import accord.primitives.PartialTxn;
-import accord.primitives.Route;
 import accord.primitives.TxnId;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -49,25 +49,25 @@ public class PreacceptSerializers
         public void serializeBody(PreAccept msg, DataOutputPlus out, int version) throws IOException
         {
             CommandSerializers.partialTxn.serialize(msg.partialTxn, out, version);
-            serializeNullable(msg.route, out, version, KeySerializers.route);
+            serializeNullable(msg.route, out, version, KeySerializers.fullRoute);
             out.writeUnsignedVInt(msg.maxEpoch - msg.minEpoch);
         }
 
         @Override
-        public PreAccept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
+        public PreAccept deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException
         {
             PartialTxn partialTxn = CommandSerializers.partialTxn.deserialize(in, version);
-            @Nullable Route route = deserializeNullable(in, version, KeySerializers.route);
+            @Nullable FullRoute<?> fullRoute = deserializeNullable(in, version, KeySerializers.fullRoute);
             long maxEpoch = in.readUnsignedVInt() + minEpoch;
             return PreAccept.SerializerSupport.create(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey,
-                                                      maxEpoch, partialTxn, route);
+                                                      maxEpoch, partialTxn, fullRoute);
         }
 
         @Override
         public long serializedBodySize(PreAccept msg, int version)
         {
             return CommandSerializers.partialTxn.serializedSize(msg.partialTxn, version)
-                   + serializedSizeNullable(msg.route, version, KeySerializers.route)
+                   + serializedSizeNullable(msg.route, version, KeySerializers.fullRoute)
                    + TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - msg.minEpoch);
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 85e633e47e..1e43221cef 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -25,6 +25,7 @@ import accord.messages.ReadData.ReadNack;
 import accord.messages.ReadData.ReadOk;
 import accord.messages.ReadData.ReadReply;
 import accord.primitives.Keys;
+import accord.primitives.Seekables;
 import accord.primitives.TxnId;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -40,7 +41,7 @@ public class ReadDataSerializers
         public void serialize(ReadData read, DataOutputPlus out, int version) throws IOException
         {
             CommandSerializers.txnId.serialize(read.txnId, out, version);
-            KeySerializers.keys.serialize(read.readScope, out, version);
+            KeySerializers.seekables.serialize(read.readScope, out, version);
             out.writeUnsignedVInt(read.waitForEpoch());
             out.writeUnsignedVInt(read.executeAtEpoch - read.waitForEpoch());
         }
@@ -49,7 +50,7 @@ public class ReadDataSerializers
         public ReadData deserialize(DataInputPlus in, int version) throws IOException
         {
             TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
-            Keys readScope = KeySerializers.keys.deserialize(in, version);
+            Seekables<?, ?> readScope = KeySerializers.seekables.deserialize(in, version);
             long waitForEpoch = in.readUnsignedVInt();
             long executeAtEpoch = in.readUnsignedVInt() + waitForEpoch;
             return ReadData.SerializerSupport.create(txnId, readScope, executeAtEpoch, waitForEpoch);
@@ -59,7 +60,7 @@ public class ReadDataSerializers
         public long serializedSize(ReadData read, int version)
         {
             return CommandSerializers.txnId.serializedSize(read.txnId, version)
-                   + KeySerializers.keys.serializedSize(read.readScope, version)
+                   + KeySerializers.seekables.serializedSize(read.readScope, version)
                    + TypeSizes.sizeofUnsignedVInt(read.waitForEpoch())
                    + TypeSizes.sizeofUnsignedVInt(read.executeAtEpoch - read.waitForEpoch());
         }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index 3acd76e36e..8c75fc584d 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -30,10 +30,10 @@ import accord.messages.BeginRecovery.RecoverOk;
 import accord.messages.BeginRecovery.RecoverReply;
 import accord.primitives.Ballot;
 import accord.primitives.Deps;
+import accord.primitives.FullRoute;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialRoute;
 import accord.primitives.PartialTxn;
-import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
@@ -56,15 +56,15 @@ public class RecoverySerializers
         {
             CommandSerializers.partialTxn.serialize(recover.partialTxn, out, version);
             CommandSerializers.ballot.serialize(recover.ballot, out, version);
-            serializeNullable(KeySerializers.route, recover.route, out, version);
+            serializeNullable(KeySerializers.fullRoute, recover.route, out, version);
         }
 
         @Override
-        public BeginRecovery deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException
+        public BeginRecovery deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException
         {
             PartialTxn partialTxn = CommandSerializers.partialTxn.deserialize(in, version);
             Ballot ballot = CommandSerializers.ballot.deserialize(in, version);
-            @Nullable Route route = deserializeNullable(KeySerializers.route, in, version);
+            @Nullable FullRoute<?> route = deserializeNullable(KeySerializers.fullRoute, in, version);
             return BeginRecovery.SerializationSupport.create(txnId, scope, waitForEpoch, partialTxn, ballot, route);
         }
 
@@ -73,7 +73,7 @@ public class RecoverySerializers
         {
             return CommandSerializers.partialTxn.serializedSize(recover.partialTxn, version)
                    + CommandSerializers.ballot.serializedSize(recover.ballot, version)
-                   + serializedSizeNullable(KeySerializers.route, recover.route, version);
+                   + serializedSizeNullable(KeySerializers.fullRoute, recover.route, version);
         }
     };
 
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
index 0d133f78af..1db73e38ab 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
@@ -46,13 +46,13 @@ public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements I
         serializeBody(msg, out, version);
     }
 
-    public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException;
+    public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException;
 
     @Override
     public final T deserialize(DataInputPlus in, int version) throws IOException
     {
         TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
-        PartialRoute scope = KeySerializers.partialRoute.deserialize(in, version);
+        PartialRoute<?> scope = KeySerializers.partialRoute.deserialize(in, version);
         // TODO: there should be a base epoch
         long waitForEpoch = in.readUnsignedVInt();
         return deserializeBody(in, version, txnId, scope, waitForEpoch);
@@ -83,10 +83,10 @@ public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements I
             out.writeBoolean(msg.doNotComputeProgressKey);
         }
 
-        public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException;
+        public abstract T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey) throws IOException;
 
         @Override
-        public final T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute scope, long waitForEpoch) throws IOException
+        public final T deserializeBody(DataInputPlus in, int version, TxnId txnId, PartialRoute<?> scope, long waitForEpoch) throws IOException
         {
             long minEpoch = in.readUnsignedVInt();
             boolean doNotComputeProgressKey = in.readBoolean();
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java
index 2203914a01..742da8d462 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/WaitOnCommitSerializer.java
@@ -22,8 +22,9 @@ import java.io.IOException;
 
 import accord.messages.WaitOnCommit;
 import accord.messages.WaitOnCommit.WaitOnCommitOk;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Routables;
 import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -36,14 +37,14 @@ public class WaitOnCommitSerializer
         public void serialize(WaitOnCommit wait, DataOutputPlus out, int version) throws IOException
         {
             CommandSerializers.txnId.serialize(wait.txnId, out, version);
-            KeySerializers.routingKeys.serialize(wait.scope, out, version);
+            KeySerializers.unseekables.serialize(wait.scope, out, version);
         }
 
         @Override
         public WaitOnCommit deserialize(DataInputPlus in, int version) throws IOException
         {
             TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
-            RoutingKeys scope = KeySerializers.routingKeys.deserialize(in, version);
+            Unseekables<?, ?> scope = KeySerializers.unseekables.deserialize(in, version);
             return WaitOnCommit.SerializerSupport.create(txnId, scope);
         }
 
@@ -51,7 +52,7 @@ public class WaitOnCommitSerializer
         public long serializedSize(WaitOnCommit wait, int version)
         {
             return CommandSerializers.txnId.serializedSize(wait.txnId, version)
-                   + KeySerializers.routingKeys.serializedSize(wait.scope, version);
+                   + KeySerializers.unseekables.serializedSize(wait.scope, version);
         }
     };
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
index 3fb6584d84..d1027c080e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
@@ -45,8 +45,10 @@ import accord.coordinate.Preempted;
 import accord.local.Status;
 import accord.messages.Commit;
 import accord.primitives.Keys;
+import accord.primitives.Routables;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import accord.topology.Topologies;
 import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.Clustering;
@@ -230,8 +232,8 @@ public class AccordIntegrationTest extends TestBaseImpl
                              .withWrite("INSERT INTO " + keyspace + ".tbl (k, c, v) VALUES (?, 0, ?)", key, i++)
                              .withCondition(keyspace, "tbl", key, 0, NOT_EXISTS);
                 }
-                Keys keySet = txn.build().keys();
-                Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(keySet, 1);
+                Unseekables<?, ?> routables = txn.build().keys().toUnseekables();
+                Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(routables, 1);
                 // currently we don't detect out-of-bounds read/write, so need this logic to validate we reach different
                 // shards
                 Assertions.assertThat(topology.totalShards()).isEqualTo(2);
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 53b762de2f..2cd933c7b4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 
 import static accord.local.Status.Durability.Durable;
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -78,7 +78,7 @@ public class AccordCommandStoreTest
     {
         AtomicLong clock = new AtomicLong(0);
         PartialTxn depTxn = createPartialTxn(0);
-        Key key = depTxn.keys().get(0);
+        Key key = (Key)depTxn.keys().get(0);
         AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
 
         PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(depTxn.covering(), false);
@@ -92,8 +92,8 @@ public class AccordCommandStoreTest
         TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
         AccordCommand command = new AccordCommand(txnId).initialize();
         command.setPartialTxn(createPartialTxn(0));
-        command.homeKey(key.toRoutingKey());
-        command.progressKey(key.toRoutingKey());
+        command.homeKey(key.toUnseekable());
+        command.progressKey(key.toUnseekable());
         command.setDurability(Durable);
         command.setPromised(ballot(1, clock.incrementAndGet(), 0, 1));
         command.setAccepted(ballot(1, clock.incrementAndGet(), 0, 1));
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 54ac4736ac..67683661e1 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -37,10 +36,11 @@ import accord.messages.Accept;
 import accord.messages.Commit;
 import accord.messages.PreAccept;
 import accord.primitives.Ballot;
+import accord.primitives.FullRoute;
+import accord.primitives.Keys;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialRoute;
 import accord.primitives.PartialTxn;
-import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -49,7 +49,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -90,11 +90,11 @@ public class AccordCommandTest
 
         TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
         Txn txn = createTxn(1);
-        Key key = txn.keys().get(0);
-        RoutingKey homeKey = key.toRoutingKey();
-        Route fullRoute = txn.keys().toRoute(homeKey);
-        PartialRoute route = fullRoute.slice(fullRange(txn));
-        PartialTxn partialTxn = txn.slice(route.covering, true);
+        Key key = (Key)txn.keys().get(0);
+        RoutingKey homeKey = key.toUnseekable();
+        FullRoute<?> fullRoute = txn.keys().toRoute(homeKey);
+        PartialRoute<?> route = fullRoute.slice(fullRange(txn));
+        PartialTxn partialTxn = txn.slice(route.covering(), true);
         PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route, 1, 1, false, 1, partialTxn, fullRoute);
 
         // Check preaccept
@@ -122,7 +122,7 @@ public class AccordCommandTest
         // check accept
         TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1);
         Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1);
-        PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(route.covering, false);
+        PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(route.covering(), false);
         builder.add(key, txnId2);
         PartialDeps deps = builder.build();
         Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1, false, Ballot.ZERO, executeAt, partialTxn.keys(), deps, partialTxn.kind());
@@ -150,7 +150,7 @@ public class AccordCommandTest
         Commit commit = Commit.SerializerSupport.create(txnId, route, 1, executeAt, partialTxn, deps, fullRoute, null);
         commandStore.execute(commit, commit::apply).get();
 
-        commandStore.execute(PreLoadContext.contextFor(txnId, Collections.singleton(key)),instance -> {
+        commandStore.execute(PreLoadContext.contextFor(txnId, Keys.of(key)), instance -> {
             Command command = instance.command(txnId);
             Assert.assertEquals(commit.executeAt, command.executeAt());
             Assert.assertTrue(command.hasBeen(Status.Committed));
@@ -171,11 +171,11 @@ public class AccordCommandTest
 
         TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1);
         Txn txn = createTxn(2);
-        Key key = txn.keys().get(0);
-        RoutingKey homeKey = key.toRoutingKey();
-        Route fullRoute = txn.keys().toRoute(homeKey);
-        PartialRoute route = fullRoute.slice(fullRange(txn));
-        PartialTxn partialTxn = txn.slice(route.covering, true);
+        Key key = (Key)txn.keys().get(0);
+        RoutingKey homeKey = key.toUnseekable();
+        FullRoute<?> fullRoute = txn.keys().toRoute(homeKey);
+        PartialRoute<?> route = fullRoute.slice(fullRange(txn));
+        PartialTxn partialTxn = txn.slice(route.covering(), true);
         PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1, route, 1, 1, false, 1, partialTxn, fullRoute);
 
         commandStore.execute(preAccept1, preAccept1::apply).get();
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index ad74eaa467..65b273fdb9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -47,13 +47,15 @@ import accord.local.PreLoadContext;
 import accord.local.Status.Known;
 import accord.primitives.AbstractKeys;
 import accord.primitives.Ballot;
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
+import accord.primitives.Ranges;
+import accord.primitives.Keys;
 import accord.primitives.PartialTxn;
-import accord.primitives.RoutingKeys;
+import accord.primitives.Range;
+import accord.primitives.Routables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import accord.primitives.Writes;
 import accord.topology.Shard;
 import accord.topology.Topology;
@@ -61,7 +63,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.api.AccordAgent;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.db.AccordData;
 import org.apache.cassandra.service.accord.db.AccordRead;
 import org.apache.cassandra.utils.FBUtilities;
@@ -87,9 +89,9 @@ public class AccordTestUtils
         @Override public void executed(Command command, ProgressShard progressShard) {}
         @Override public void invalidated(Command command, ProgressShard progressShard) {}
         @Override public void durable(Command command, Set<Id> persistedOn) {}
-        @Override public void durable(TxnId txnId, @Nullable RoutingKeys someKeys, ProgressShard shard) {}
+        @Override public void durable(TxnId txnId, @Nullable Unseekables<?, ?> someKeys, ProgressShard shard) {}
         @Override public void durableLocal(TxnId txnId) {}
-        @Override public void waiting(TxnId blockedBy, Known blockedUntil, RoutingKeys blockedOnKeys) {}
+        @Override public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn) {}
     };
 
     public static Topology simpleTopology(TableId... tables)
@@ -102,7 +104,7 @@ public class AccordTestUtils
         Set<Id> fastPath = Sets.newHashSet(node);
         for (int i=0; i<tables.length; i++)
         {
-            KeyRange range = TokenRange.fullRange(tables[i]);
+            Range range = TokenRange.fullRange(tables[i]);
             shards[i] = new Shard(range, nodes, fastPath, Collections.emptySet());
         }
 
@@ -156,7 +158,7 @@ public class AccordTestUtils
                                 })
                                 .reduce(null, AccordData::merge);
             Write write = txn.update().apply(readData);
-            ((AccordCommand)command).setWrites(new Writes(command.executeAt(), txn.keys(), write));
+            ((AccordCommand)command).setWrites(new Writes(command.executeAt(), (Keys)txn.keys(), write));
             ((AccordCommand)command).setResult(txn.query().compute(command.txnId(), readData, txn.read(), txn.update()));
         }).get();
     }
@@ -175,43 +177,43 @@ public class AccordTestUtils
         return createTxn(key, key);
     }
 
-    public static KeyRanges fullRange(Txn txn)
+    public static Ranges fullRange(Txn txn)
     {
-        TableId tableId = ((AccordKey)txn.keys().get(0)).tableId();
-        return KeyRanges.of(TokenRange.fullRange(tableId));
+        TableId tableId = ((PartitionKey)txn.keys().get(0)).tableId();
+        return Ranges.of(TokenRange.fullRange(tableId));
     }
 
     public static PartialTxn createPartialTxn(int key)
     {
         Txn txn = createTxn(key, key);
-        KeyRanges ranges = fullRange(txn);
+        Ranges ranges = fullRange(txn);
         return new PartialTxn.InMemory(ranges, txn.kind(), txn.keys(), txn.read(), txn.query(), txn.update());
     }
 
     private static class SingleEpochRanges implements CommandStore.RangesForEpoch
     {
-        private final KeyRanges ranges;
+        private final Ranges ranges;
 
-        public SingleEpochRanges(KeyRanges ranges)
+        public SingleEpochRanges(Ranges ranges)
         {
             this.ranges = ranges;
         }
 
         @Override
-        public KeyRanges at(long epoch)
+        public Ranges at(long epoch)
         {
             assert epoch == 1;
             return ranges;
         }
 
         @Override
-        public KeyRanges between(long fromInclusive, long toInclusive)
+        public Ranges between(long fromInclusive, long toInclusive)
         {
             return ranges;
         }
 
         @Override
-        public KeyRanges since(long epoch)
+        public Ranges since(long epoch)
         {
             assert epoch == 1;
             return ranges;
@@ -222,13 +224,6 @@ public class AccordTestUtils
         {
             return ranges.contains(key);
         }
-
-        @Override
-        public boolean intersects(long epoch, AbstractKeys<?, ?> keys)
-        {
-            assert epoch == 1;
-            return ranges.intersects(keys);
-        }
     }
 
     public static InMemoryCommandStore.Synchronized createInMemoryCommandStore(LongSupplier now, String keyspace, String table)
@@ -249,7 +244,7 @@ public class AccordTestUtils
                                                      new AccordAgent(),
                                                      null,
                                                      cs -> null,
-                                                     new SingleEpochRanges(KeyRanges.of(range)));
+                                                     new SingleEpochRanges(Ranges.of(range)));
     }
 
     public static AccordCommandStore createAccordCommandStore(Node.Id node, LongSupplier now, Topology topology)
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
index 4dd94847a9..a7a015a7c1 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -62,10 +62,10 @@ public class AccordTopologyTest
         Token maxToken = partitioner.getMaximumToken();
 
 //        topology.forKey(new AccordKey.TokenKey(tableId, minToken.minKeyBound()));
-        topology.forKey(new AccordKey.PartitionKey(tableId, new BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))));
+        topology.forKey(new PartitionKey(tableId, new BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))).toUnseekable());
 //        topology.forKey(new AccordKey.TokenKey(tableId, minToken.maxKeyBound()));
 //        topology.forKey(new AccordKey.TokenKey(tableId, maxToken.minKeyBound()));
-        topology.forKey(new AccordKey.PartitionKey(tableId, new BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))));
+        topology.forKey(new PartitionKey(tableId, new BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))).toUnseekable());
 //        topology.forKey(new AccordKey.TokenKey(tableId, maxToken.maxKeyBound()));
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
index 81f9b49a22..f520a6bb6c 100644
--- a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.accord.api.AccordKey.*;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 83f3d8058e..fbbb53ee31 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -41,8 +41,7 @@ import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordCommandsForKey;
 import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordStateCache;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordKey.PartitionKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.Future;
 
@@ -90,7 +89,7 @@ public class AsyncLoaderTest
         AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), singleton(key));
 
         // everything is cached, so the loader should return immediately
-        commandStore.processBlocking(() -> {
+        commandStore.executeBlocking(() -> {
             boolean result = loader.load(context, (o, t) -> Assert.fail());
             Assert.assertTrue(result);
         });
@@ -124,7 +123,7 @@ public class AsyncLoaderTest
         AsyncContext context = new AsyncContext();
         AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), singleton(key));
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
-        commandStore.processBlocking(() -> {
+        commandStore.executeBlocking(() -> {
             boolean result = loader.load(context, (o, t) -> {
                 Assert.assertNull(t);
                 cbFired.setSuccess(null);
@@ -135,7 +134,7 @@ public class AsyncLoaderTest
         cbFired.awaitUninterruptibly(1, TimeUnit.SECONDS);
 
         // then return immediately after the callback has fired
-        commandStore.processBlocking(() -> {
+        commandStore.executeBlocking(() -> {
             boolean result = loader.load(context, (o, t) -> Assert.fail());
             Assert.assertTrue(result);
         });
@@ -166,7 +165,7 @@ public class AsyncLoaderTest
         AsyncContext context = new AsyncContext();
         AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId), singleton(key));
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
-        commandStore.processBlocking(() -> {
+        commandStore.executeBlocking(() -> {
             boolean result = loader.load(context, (o, t) -> {
                 Assert.assertNull(t);
                 cbFired.setSuccess(null);
@@ -177,7 +176,7 @@ public class AsyncLoaderTest
         cbFired.awaitUninterruptibly(1, TimeUnit.SECONDS);
 
         // then return immediately after the callback has fired
-        commandStore.processBlocking(() -> {
+        commandStore.executeBlocking(() -> {
             boolean result = loader.load(context, (o, t) -> Assert.fail());
             Assert.assertTrue(result);
         });
@@ -212,7 +211,7 @@ public class AsyncLoaderTest
         commandCache.setLoadFuture(command.txnId(), readFuture);
 
         AsyncPromise<Void> cbFired = new AsyncPromise<>();
-        commandStore.processBlocking(() -> {
+        commandStore.executeBlocking(() -> {
             boolean result = loader.load(context, (o, t) -> {
                 Assert.assertNull(t);
                 cbFired.setSuccess(null);
@@ -226,7 +225,7 @@ public class AsyncLoaderTest
         Assert.assertTrue(cbFired.isSuccess());
 
         // then return immediately after the callback has fired
-        commandStore.processBlocking(() -> {
+        commandStore.executeBlocking(() -> {
             boolean result = loader.load(context, (o, t) -> Assert.fail());
             Assert.assertTrue(result);
         });
@@ -242,7 +241,7 @@ public class AsyncLoaderTest
         TxnId blockApply = txnId(1, clock.incrementAndGet(), 0, 1);
         TxnId blockCommit = txnId(1, clock.incrementAndGet(), 0, 1);
         PartialTxn txn = createPartialTxn(0);
-        AccordKey.PartitionKey key = (AccordKey.PartitionKey) getOnlyElement(txn.keys());
+        PartitionKey key = (PartitionKey) getOnlyElement(txn.keys());
 
         AccordCommand command = new AccordCommand(txnId).initialize();
         command.setPartialTxn(txn);
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index ebf272aac9..30edd5f8af 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -30,10 +30,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import accord.local.Command;
-import accord.local.CommandStore;
 import accord.local.CommandsForKey;
 import accord.local.SafeCommandStore;
 import accord.local.Status;
+import accord.primitives.Keys;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -48,9 +48,8 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.AccordCommand;
 import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordKeyspace;
-import org.apache.cassandra.service.accord.AccordPartialCommand;
 import org.apache.cassandra.service.accord.AccordStateCache;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static accord.local.PreLoadContext.contextFor;
@@ -92,7 +91,7 @@ public class AsyncOperationTest
         AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
         TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
         Txn txn = createTxn((int)clock.incrementAndGet());
-        AccordKey.PartitionKey key = (AccordKey.PartitionKey) Iterables.getOnlyElement(txn.keys());
+        PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
 
         commandStore.execute(contextFor(txnId), instance -> {
             Command command = instance.ifPresent(txnId);
@@ -108,9 +107,9 @@ public class AsyncOperationTest
     {
         AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
         Txn txn = createTxn((int)clock.incrementAndGet());
-        AccordKey.PartitionKey key = (AccordKey.PartitionKey) Iterables.getOnlyElement(txn.keys());
+        PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
 
-        commandStore.execute(contextFor(Collections.emptyList(), Collections.singleton(key)), instance -> {
+        commandStore.execute(contextFor(Collections.emptyList(), Keys.of(key)),instance -> {
             CommandsForKey cfk = commandStore.maybeCommandsForKey(key);
             Assert.assertNull(cfk);
         }).get();
@@ -173,7 +172,7 @@ public class AsyncOperationTest
             }
 
             @Override
-            AsyncLoader createAsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<AccordKey.PartitionKey> keys)
+            AsyncLoader createAsyncLoader(AccordCommandStore commandStore, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys)
             {
                 return new AsyncLoader(commandStore, txnIds, keys) {
 
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 37d1b5f764..9d2e1e5a2b 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 
 import accord.local.Command;
 import accord.local.Status;
-import accord.primitives.KeyRanges;
+import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -39,7 +39,7 @@ import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordCommandsForKey;
 import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordPartialCommand;
-import org.apache.cassandra.service.accord.api.AccordKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 
 import static accord.local.PreLoadContext.contextFor;
 import static com.google.common.collect.Iterables.getOnlyElement;
@@ -85,7 +85,7 @@ public class AsyncWriterTest
         TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1);
         TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1);
         Txn txn = createTxn(0);
-        KeyRanges ranges = fullRange(txn);
+        Ranges ranges = fullRange(txn);
         AccordCommand blocking = new AccordCommand(blockingId).initialize();
         blocking.setPartialTxn(txn.slice(ranges, true));
         blocking.setExecuteAt(blockingId);
@@ -135,8 +135,8 @@ public class AsyncWriterTest
         TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
         Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1);
         Txn txn = createTxn(0);
-        KeyRanges ranges = fullRange(txn);
-        AccordKey.PartitionKey key = (AccordKey.PartitionKey) getOnlyElement(txn.keys());
+        Ranges ranges = fullRange(txn);
+        PartitionKey key = (PartitionKey) getOnlyElement(txn.keys());
 
         AccordCommandsForKey cfk = new AccordCommandsForKey(commandStore, key).initialize();
         AccordKeyspace.getCommandsForKeyMutation(commandStore, cfk, commandStore.nextSystemTimestampMicros()).apply();
@@ -200,7 +200,7 @@ public class AsyncWriterTest
         TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1);
         TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1);
         Txn txn = createTxn(0);
-        KeyRanges ranges = fullRange(txn);
+        Ranges ranges = fullRange(txn);
 
         {
             AccordCommand blocking = new AccordCommand(blockingId).initialize();
diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
index 2d034b5734..0997d234c0 100644
--- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
@@ -21,17 +21,15 @@ package org.apache.cassandra.service.accord.serializers;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import accord.primitives.KeyRange;
-import accord.primitives.KeyRanges;
 import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
 import accord.primitives.Txn;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.AccordTxnBuilder;
 import org.apache.cassandra.service.accord.TokenRange;
-import org.apache.cassandra.service.accord.api.AccordKey;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.SerializerTestUtils;
 
 import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -56,8 +54,8 @@ public class CommandSerializersTest
         txnBuilder.withWrite("INSERT INTO ks.tbl (k, c, v) VALUES (0, 0, 1)");
         txnBuilder.withCondition("ks", "tbl", 0, 0, NOT_EXISTS);
         Txn txn = txnBuilder.build();
-        TableId tableId = ((AccordKey.PartitionKey) txn.keys().get(0)).tableId();
-        PartialTxn expected = txn.slice(KeyRanges.of(TokenRange.fullRange(tableId)), true);
+        TableId tableId = ((PartitionKey) txn.keys().get(0)).tableId();
+        PartialTxn expected = txn.slice(Ranges.of(TokenRange.fullRange(tableId)), true);
         SerializerTestUtils.assertSerializerIOEquality(expected, CommandSerializers.partialTxn);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org