You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2021/11/01 19:29:19 UTC

[cassandra-accord] branch trunk updated: Instance refactor

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aec91ad  Instance refactor
aec91ad is described below

commit aec91ad7e2ca439d857d928658346d93c342f51f
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Oct 26 04:26:16 2021 -0500

    Instance refactor
    
    * add topology decoupled command shard
    * add range splitting
    * add locally sharded command metadta classes
    * rework topology changes to include local topology
    * make shard collections immutable
    * add multi-shard response tracker
    
    Co-authored-by: Blake Eggleston <be...@apple.com>
    Co-authored-by: Benedict Elliott Smith <be...@apple.com>
---
 accord-core/src/main/java/accord/api/KeyRange.java | 104 ++++-
 accord-core/src/main/java/accord/api/Read.java     |   4 +-
 accord-core/src/main/java/accord/api/Write.java    |   3 +-
 .../main/java/accord/coordinate/AcceptPhase.java   |  27 +-
 .../src/main/java/accord/coordinate/Agree.java     |  78 ++--
 .../src/main/java/accord/coordinate/Execute.java   |  83 ++--
 .../src/main/java/accord/coordinate/Preempted.java |   2 +-
 .../src/main/java/accord/coordinate/Recover.java   |  87 ++--
 .../src/main/java/accord/coordinate/Timeout.java   |   2 +-
 .../coordinate/tracking/AbstractQuorumTracker.java |  83 ++++
 .../tracking/AbstractResponseTracker.java          |  80 ++++
 .../coordinate/tracking/FastPathTracker.java       |  46 +++
 .../accord/coordinate/tracking/QuorumTracker.java  |  17 +
 .../accord/coordinate/tracking/ReadTracker.java    | 146 +++++++
 .../src/main/java/accord/local/Command.java        |  22 +-
 .../src/main/java/accord/local/CommandStore.java   | 449 +++++++++++++++++++++
 .../src/main/java/accord/local/CommandStores.java  | 154 +++++++
 .../src/main/java/accord/local/CommandsForKey.java |  10 +-
 .../src/main/java/accord/local/Instance.java       |  58 ---
 accord-core/src/main/java/accord/local/Node.java   |  41 +-
 .../main/java/accord/messages/BeginRecovery.java   |   2 -
 .../src/main/java/accord/messages/PreAccept.java   |   8 +-
 .../src/main/java/accord/messages/ReadData.java    |  20 +-
 .../main/java/accord/messages/WaitOnCommit.java    |   9 +-
 .../src/main/java/accord/topology/KeyRanges.java   | 148 ++++++-
 .../src/main/java/accord/topology/Shard.java       |  14 +-
 .../src/main/java/accord/topology/Topology.java    |  99 ++++-
 .../src/main/java/accord/txn/Dependencies.java     |  11 +-
 accord-core/src/main/java/accord/txn/Txn.java      |  48 ++-
 accord-core/src/main/java/accord/txn/Writes.java   |   6 +-
 .../main/java/accord/utils/IndexedPredicate.java   |   6 +
 accord-core/src/test/java/accord/Utils.java        |  14 +
 .../src/test/java/accord/burn/BurnTest.java        |   1 -
 .../java/accord/coordinate/CoordinateTest.java     |  56 ++-
 .../accord/coordinate/PreacceptTrackerTest.java    | 135 +++++++
 .../test/java/accord/coordinate/RecoverTest.java   |  86 ++--
 .../coordinate/tracking/QuorumTrackerTest.java     | 107 +++++
 .../coordinate/tracking/ReadTrackerTest.java       | 131 ++++++
 .../src/test/java/accord/impl/IntHashKey.java      |  30 ++
 accord-core/src/test/java/accord/impl/IntKey.java  |  35 ++
 .../src/test/java/accord/impl/TopologyFactory.java |  26 +-
 .../src/test/java/accord/impl/TopologyUtils.java   |  57 +++
 .../test/java/accord/impl/TopologyUtilsTest.java   |  17 +
 .../src/test/java/accord/impl/basic/Cluster.java   |  43 +-
 .../src/test/java/accord/impl/basic/NodeSink.java  |   2 +-
 .../src/test/java/accord/impl/list/ListRead.java   |  18 +-
 .../src/test/java/accord/impl/list/ListWrite.java  |  14 +-
 .../test/java/accord/impl/mock/MockCluster.java    |  40 +-
 .../test/java/accord/local/CommandStoreTest.java   |  92 +++++
 .../test/java/accord/messages/PreAcceptTest.java   | 110 ++---
 .../test/java/accord/topology/TopologyTest.java    |  25 +-
 .../src/test/java/accord/utils/KeyRangeTest.java   | 159 +++++++-
 .../src/test/java/accord/utils/KeyRangesTest.java  |  52 +++
 .../src/main/java/accord/maelstrom/Cluster.java    |  40 +-
 .../main/java/accord/maelstrom/MaelstromKey.java   |  33 ++
 .../main/java/accord/maelstrom/MaelstromRead.java  |  18 +-
 .../main/java/accord/maelstrom/MaelstromWrite.java |  14 +-
 .../src/main/java/accord/maelstrom/Main.java       |  66 +--
 58 files changed, 2717 insertions(+), 571 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/KeyRange.java b/accord-core/src/main/java/accord/api/KeyRange.java
index 9e6b850..fb11c1e 100644
--- a/accord-core/src/main/java/accord/api/KeyRange.java
+++ b/accord-core/src/main/java/accord/api/KeyRange.java
@@ -1,5 +1,6 @@
 package accord.api;
 
+import accord.topology.KeyRanges;
 import accord.txn.Keys;
 import com.google.common.base.Preconditions;
 
@@ -39,6 +40,12 @@ public abstract class KeyRange<K extends Key<K>>
         {
             return true;
         }
+
+        @Override
+        public KeyRange<K> tryMerge(KeyRange<K> that)
+        {
+            return KeyRange.tryMergeExclusiveInclusive(this, that);
+        }
     }
 
     public static abstract class StartInclusive<K extends Key<K>> extends KeyRange<K>
@@ -69,6 +76,34 @@ public abstract class KeyRange<K extends Key<K>>
         {
             return false;
         }
+
+        @Override
+        public KeyRange<K> tryMerge(KeyRange<K> that)
+        {
+            return KeyRange.tryMergeExclusiveInclusive(this, that);
+        }
+    }
+
+    private static <K extends Key<K>> KeyRange<K> tryMergeExclusiveInclusive(KeyRange<K> left, KeyRange<K> right)
+    {
+        if (left.getClass() != right.getClass())
+            return null;
+
+        Preconditions.checkArgument(left instanceof EndInclusive || left instanceof StartInclusive);
+
+        int cmp = left.compareIntersecting(right);
+
+        if (cmp == 0)
+            return left.subRange(left.start.compareTo(right.start) < 0 ? left.start : right.start,
+                                 left.end.compareTo(right.end) > 0 ? left.end : right.end);
+
+        if (cmp > 0 && right.end.equals(left.start))
+            return left.subRange(right.start, left.end);
+
+        if (cmp < 0 && left.end.equals(right.start))
+            return left.subRange(left.start, right.end);
+
+        return null;
     }
 
     private final K start;
@@ -95,6 +130,20 @@ public abstract class KeyRange<K extends Key<K>>
 
     public abstract boolean endInclusive();
 
+    /**
+     * Return a new range covering this and the given range if the ranges are intersecting or touching. That is,
+     * no keys can exist between the touching ends of the range.
+     */
+    public abstract KeyRange<K> tryMerge(KeyRange<K> that);
+
+    public abstract KeyRange<K> subRange(K start, K end);
+
+    /**
+     * Split this range into roughly equally sized subranges
+     * @param count the number of subranges to create
+     */
+    public abstract KeyRanges split(int count);
+
     @Override
     public boolean equals(Object o)
     {
@@ -127,6 +176,44 @@ public abstract class KeyRange<K extends Key<K>>
         return compareKey(key) == 0;
     }
 
+
+    /**
+     * Returns a negative integer, zero, or a positive integer if both points of the provided range are less than, the
+     * range intersects this range, or both points are greater than this range
+     */
+    public int compareIntersecting(KeyRange<K> that)
+    {
+        if (this.start.compareTo(that.end) >= 0)
+            return 1;
+        if (this.end.compareTo(that.start) <= 0)
+            return -1;
+        return 0;
+    }
+
+    public boolean fullyContains(KeyRange<K> that)
+    {
+        return that.start.compareTo(this.start) >= 0 && that.end.compareTo(this.end) <= 0;
+    }
+
+    public boolean intersects(Keys keys)
+    {
+        return lowKeyIndex(keys) >= 0;
+    }
+
+    /**
+     * Returns a range covering the overlapping parts of this and the provided range, returns
+     * null if the ranges do not overlap
+     */
+    public KeyRange<K> intersection(KeyRange<K> that)
+    {
+        if (this.compareIntersecting(that) != 0)
+            return null;
+
+        K start = this.start.compareTo(that.start) > 0 ? this.start : that.start;
+        K end = this.end.compareTo(that.end) < 0 ? this.end : that.end;
+        return subRange(start, end);
+    }
+
     /**
      * returns the index of the first key larger than what's covered by this range
      */
@@ -144,20 +231,25 @@ public abstract class KeyRange<K extends Key<K>>
     }
 
     /**
-     * returns the index of the lowest key contained in this range
+     * returns the index of the lowest key contained in this range. If the keys object contains no intersecting
+     * keys, <code>(-(<i>insertion point</i>) - 1)</code> is returned. Where <i>insertion point</i> is where an
+     * intersecting key would be inserted into the keys array
      * @param keys
      */
-    public int lowKeyIndex(Keys keys)
+    public int lowKeyIndex(Keys keys, int lowerBound, int upperBound)
     {
         if (keys.isEmpty()) return -1;
 
-        int i = keys.search(0, keys.size(), this,
+        int i = keys.search(lowerBound, upperBound, this,
                             (k, r) -> ((KeyRange) r).compareKey((Key) k) < 0 ? -1 : 1);
 
-        if (i < 0) i = -1 - i;
+        int minIdx = -1 - i;
 
-        if (i == 0 && !containsKey((K) keys.get(0))) i = -1;
+        return (minIdx < keys.size() && containsKey((K) keys.get(minIdx))) ? minIdx : i;
+    }
 
-        return i;
+    public int lowKeyIndex(Keys keys)
+    {
+        return lowKeyIndex(keys, 0, keys.size());
     }
 }
diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java
index 2149418..fa2780a 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -1,5 +1,7 @@
 package accord.api;
 
+import accord.topology.KeyRanges;
+
 /**
  * A read to be performed on potentially multiple shards, the inputs of which may be fed to a {@link Query}
  *
@@ -7,5 +9,5 @@ package accord.api;
  */
 public interface Read
 {
-    Data read(KeyRange range, Store store);
+    Data read(KeyRanges ranges, Store store);
 }
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
index eba020c..1479c86 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -1,5 +1,6 @@
 package accord.api;
 
+import accord.topology.KeyRanges;
 import accord.txn.Timestamp;
 
 /**
@@ -9,5 +10,5 @@ import accord.txn.Timestamp;
  */
 public interface Write
 {
-    void apply(KeyRange range, Timestamp executeAt, Store store);
+    void apply(KeyRanges range, Timestamp executeAt, Store store);
 }
diff --git a/accord-core/src/main/java/accord/coordinate/AcceptPhase.java b/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
index eb79af7..29f9331 100644
--- a/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
+++ b/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
@@ -4,7 +4,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import accord.messages.Preempted;
+import accord.coordinate.tracking.QuorumTracker;
 import accord.txn.Ballot;
 import accord.messages.Callback;
 import accord.local.Node;
@@ -24,13 +24,11 @@ class AcceptPhase extends CompletableFuture<Agreed>
     final Ballot ballot;
     final TxnId txnId;
     final Txn txn;
-    final Shards shards;
+    final Shards shards; // TODO: remove, hide in participants
 
     private List<AcceptOk> acceptOks;
     private Timestamp proposed;
-    private int[] accepts;
-    private int[] failures;
-    private int acceptQuorums;
+    private QuorumTracker acceptTracker;
 
     AcceptPhase(Node node, Ballot ballot, TxnId txnId, Txn txn, Shards shards)
     {
@@ -45,9 +43,8 @@ class AcceptPhase extends CompletableFuture<Agreed>
     {
         this.proposed = executeAt;
         this.acceptOks = new ArrayList<>();
-        this.accepts = new int[shards.size()];
-        this.failures = new int[shards.size()];
-        node.send(shards, new Accept(ballot, txnId, txn, executeAt, deps), new Callback<AcceptReply>()
+        this.acceptTracker = new QuorumTracker(shards);
+        node.send(acceptTracker.nodes(), new Accept(ballot, txnId, txn, executeAt, deps), new Callback<AcceptReply>()
         {
             @Override
             public void onSuccess(Id from, AcceptReply response)
@@ -58,10 +55,9 @@ class AcceptPhase extends CompletableFuture<Agreed>
             @Override
             public void onFailure(Id from, Throwable throwable)
             {
-                shards.forEachOn(from, (i, shard) -> {
-                    if (++failures[i] >= shard.slowPathQuorumSize)
-                        completeExceptionally(new accord.messages.Timeout());
-                });
+                acceptTracker.recordFailure(from);
+                if (acceptTracker.hasFailed())
+                    completeExceptionally(new Timeout());
             }
         });
     }
@@ -79,12 +75,9 @@ class AcceptPhase extends CompletableFuture<Agreed>
 
         AcceptOk ok = (AcceptOk) reply;
         acceptOks.add(ok);
-        shards.forEachOn(from, txn.keys(), (i, shard) -> {
-            if (++accepts[i] == shard.slowPathQuorumSize)
-                ++acceptQuorums;
-        });
+        acceptTracker.recordSuccess(from);
 
-        if (acceptQuorums == shards.size())
+        if (acceptTracker.hasReachedQuorum())
             onAccepted();
     }
 
diff --git a/accord-core/src/main/java/accord/coordinate/Agree.java b/accord-core/src/main/java/accord/coordinate/Agree.java
index 0d726b8..b12f118 100644
--- a/accord-core/src/main/java/accord/coordinate/Agree.java
+++ b/accord-core/src/main/java/accord/coordinate/Agree.java
@@ -4,8 +4,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletionStage;
 
-import accord.messages.Preempted;
-import accord.messages.Timeout;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.topology.Shard;
 import accord.txn.Ballot;
 import accord.messages.Callback;
 import accord.local.Node;
@@ -25,19 +25,32 @@ import accord.messages.PreAccept.PreAcceptReply;
  */
 class Agree extends AcceptPhase implements Callback<PreAcceptReply>
 {
+    static class ShardTracker extends FastPathTracker.FastPathShardTracker
+    {
+        public ShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        @Override
+        public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
+        {
+            return withFastPathTimestamp && shard.fastPathElectorate.contains(node);
+        }
+
+        @Override
+        public boolean hasMetFastPathCriteria()
+        {
+            return fastPathAccepts >= shard.fastPathQuorumSize;
+        }
+    }
+
     final Keys keys;
 
     public enum PreacceptOutcome { COMMIT, ACCEPT }
 
-    // TODO: handle reconfigurations
-    private int[] preAccepts;
-    private int[] fastPathPreAccepts;
-    private int[] failures;
-    private int[] responsesOutstanding;
+    private final FastPathTracker<ShardTracker> tracker;
 
-    private int preAccepted;
-    private int fastPathAccepted;
-    private int noOutstandingResponses;
     private PreacceptOutcome preacceptOutcome;
     private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
 
@@ -48,22 +61,9 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
     {
         super(node, Ballot.ZERO, txnId, txn, node.cluster().forKeys(txn.keys()));
         this.keys = txn.keys();
-        this.failures = new int[shards.size()];
-        this.preAccepts = new int[shards.size()];
-        this.fastPathPreAccepts = new int[shards.size()];
-        this.responsesOutstanding = new int[shards.size()];
-        shards.forEach((i, shard) -> {
-            this.responsesOutstanding[i] = shard.nodes.size();
-        });
-
+        tracker = new FastPathTracker<>(shards, ShardTracker[]::new, ShardTracker::new);
 
-        node.send(shards, new PreAccept(txnId, txn), this);
-    }
-
-    private void messageReceived(int shard)
-    {
-        if (--responsesOutstanding[shard] == 0)
-            noOutstandingResponses++;
+        node.send(tracker.nodes(), new PreAccept(txnId, txn), this);
     }
 
     @Override
@@ -78,11 +78,9 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         if (isDone() || isPreAccepted())
             return;
 
-        shards.forEachOn(from, (i, shard) -> {
-            messageReceived(i);
-            if (++failures[i] >= shard.slowPathQuorumSize)
-                completeExceptionally(new Timeout());
-        });
+        tracker.recordFailure(from);
+        if (tracker.hasFailed())
+            completeExceptionally(new Timeout());
 
         // if no other responses are expected and the slow quorum has been satisfied, proceed
         if (shouldSlowPathAccept())
@@ -105,22 +103,15 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         preAcceptOks.add(ok);
 
         boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
-        shards.forEachOn(from, (i, shard) -> {
-            messageReceived(i);
-            if (fastPath && shard.fastPathElectorate.contains(from) && ++fastPathPreAccepts[i] == shard.fastPathQuorumSize)
-                ++fastPathAccepted;
+        tracker.recordSuccess(from, fastPath);
 
-            if (++preAccepts[i] == shard.slowPathQuorumSize)
-                ++preAccepted;
-        });
-
-        if (isFastPathAccepted() || shouldSlowPathAccept())
+        if (tracker.hasMetFastPathCriteria() || shouldSlowPathAccept())
             onPreAccepted();
     }
 
     private void onPreAccepted()
     {
-        if (isFastPathAccepted())
+        if (tracker.hasMetFastPathCriteria())
         {
             preacceptOutcome = PreacceptOutcome.COMMIT;
             Dependencies deps = new Dependencies();
@@ -150,14 +141,9 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
         }
     }
 
-    private boolean isFastPathAccepted()
-    {
-        return fastPathAccepted == shards.size();
-    }
-
     private boolean shouldSlowPathAccept()
     {
-        return noOutstandingResponses == shards.size() && preAccepted == shards.size();
+        return !tracker.hasInFlight() && tracker.hasReachedQuorum();
     }
 
     private boolean isPreAccepted()
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 502e2e0..1ca01e0 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -1,10 +1,12 @@
 package accord.coordinate;
 
+import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 
 import accord.api.Data;
-import accord.messages.Preempted;
+import accord.coordinate.tracking.ReadTracker;
 import accord.api.Result;
 import accord.messages.Callback;
 import accord.local.Node;
@@ -12,7 +14,6 @@ import accord.txn.Dependencies;
 import accord.messages.Apply;
 import accord.messages.ReadData.ReadReply;
 import accord.messages.ReadData.ReadWaiting;
-import accord.topology.Shard;
 import accord.topology.Shards;
 import accord.local.Node.Id;
 import accord.txn.Timestamp;
@@ -22,6 +23,7 @@ import accord.txn.Keys;
 import accord.messages.Commit;
 import accord.messages.ReadData;
 import accord.messages.ReadData.ReadOk;
+import com.google.common.base.Preconditions;
 
 class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
 {
@@ -32,12 +34,9 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
     final Shards shards;
     final Keys keys;
     final Dependencies deps;
-    final int[] attempts;
-    final int[] inFlight;
-    final boolean[] hasData;
+    final ReadTracker tracker;
     private Data data;
     final int replicaIndex;
-    int count = 0;
 
     private Execute(Node node, Agreed agreed)
     {
@@ -48,9 +47,7 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
         this.deps = agreed.deps;
         this.executeAt = agreed.executeAt;
         this.shards = agreed.shards;
-        this.attempts = new int[shards.size()];
-        this.inFlight = new int[shards.size()];
-        this.hasData = new boolean[shards.size()];
+        this.tracker = new ReadTracker(shards);
         this.replicaIndex = node.random().nextInt(shards.get(0).nodes.size());
 
         // TODO: perhaps compose these different behaviours differently?
@@ -62,25 +59,21 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
         }
         else
         {
-            // TODO: we're sending duplicate commits
-            shards.forEach((i, shard) -> {
-                for (int n = 0 ; n < shard.nodes.size() ; ++n)
+            Set<Id> readSet = tracker.computeMinimalReadSetAndMarkInflight();
+            for (Node.Id to : tracker.nodes())
+            {
+                boolean read = readSet.contains(to);
+                Commit send = new Commit(txnId, txn, executeAt, agreed.deps, read);
+                if (read)
                 {
-                    Id to = shard.nodes.get(n);
-                    // TODO: Topology needs concept of locality/distance
-                    boolean read = n == replicaIndex % shard.nodes.size();
-                    Commit send = new Commit(txnId, txn, executeAt, agreed.deps, read);
-                    if (read)
-                    {
-                        node.send(to, send, this);
-                        shards.forEachOn(to, (j, s) -> ++inFlight[j]);
-                    }
-                    else
-                    {
-                        node.send(to, send);
-                    }
+                    node.send(to, send, this);
                 }
-            });
+                else
+                {
+                    node.send(to, send);
+                }
+
+            }
         }
     }
 
@@ -108,13 +101,9 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
         data = data == null ? ((ReadOk) reply).data
                             : data.merge(((ReadOk) reply).data);
 
-        shards.forEachOn(from, (i, shard) -> {
-            --inFlight[i];
-            if (!hasData[i]) ++count;
-            hasData[i] = true;
-        });
+        tracker.recordReadSuccess(from);
 
-        if (count == shards.size())
+        if (tracker.hasCompletedRead())
         {
             Result result = txn.result(data);
             node.send(shards, new Apply(txnId, txn, executeAt, deps, txn.execute(executeAt, data), result));
@@ -127,28 +116,18 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
     {
         // try again with another random node
         // TODO: API hooks
-        if (!(throwable instanceof accord.messages.Timeout))
+        if (!(throwable instanceof Timeout))
             throwable.printStackTrace();
 
-        shards.forEachOn(from, (i, shard) -> {
-            // TODO: less naive selection of replica to consult
-            if (--inFlight[i] == 0 && !hasData[i])
-                read(i);
-            if (inFlight[i] == 0 && !hasData[i])
-                completeExceptionally(throwable);
-        });
-    }
-
-    private void read(int shardIndex)
-    {
-        Shard shard = shards.get(shardIndex);
-        if (attempts[shardIndex] == shard.nodes.size())
-            return;
-
-        int nodeIndex = (replicaIndex + attempts[shardIndex]++) % shard.nodes.size();
-        Node.Id to = shard.nodes.get(nodeIndex);
-        shards.forEachOn(to, (i, s) -> ++inFlight[i]);
-        node.send(to, new ReadData(txnId, txn), this);
+        tracker.recordReadFailure(from);
+        Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
+        if (readFrom == null)
+        {
+            Preconditions.checkState(tracker.hasFailed());
+            completeExceptionally(throwable);
+        }
+        else
+            node.send(readFrom, new ReadData(txnId, txn), this);
     }
 
     static CompletionStage<Result> execute(Node instance, Agreed agreed)
diff --git a/accord-core/src/main/java/accord/coordinate/Preempted.java b/accord-core/src/main/java/accord/coordinate/Preempted.java
index ad96801..0157376 100644
--- a/accord-core/src/main/java/accord/coordinate/Preempted.java
+++ b/accord-core/src/main/java/accord/coordinate/Preempted.java
@@ -1,4 +1,4 @@
-package accord.messages;
+package accord.coordinate;
 
 /**
  * Thrown when a coordinator is preempted by another recovery
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java
index 816c5a3..2f261ab 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -4,7 +4,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import accord.messages.Preempted;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.topology.Shard;
 import accord.txn.Ballot;
 import accord.messages.Callback;
 import accord.local.Node;
@@ -27,14 +29,11 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
 {
     class RetryAfterCommits implements Callback<WaitOnCommitOk>
     {
-        final int[] failures;
-        final int[] commits;
-        int commitQuorums;
+        final QuorumTracker retryTracker;
 
         RetryAfterCommits(Dependencies waitOn)
         {
-            commits = new int[waitOn.size()];
-            failures = new int[waitOn.size()];
+            retryTracker = new QuorumTracker(shards);
             for (Map.Entry<TxnId, Txn> e : waitOn)
                 node.send(shards, new WaitOnCommit(e.getKey(), e.getValue().keys()), this);
         }
@@ -44,15 +43,12 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
         {
             synchronized (Recover.this)
             {
-                if (isDone() || commitQuorums == commits.length)
+                if (isDone() || retryTracker.hasReachedQuorum())
                     return;
 
-                shards.forEachOn(from, (i, shard) -> {
-                    if (++commits[i] == shard.slowPathQuorumSize)
-                        ++commitQuorums;
-                });
+                retryTracker.recordSuccess(from);
 
-                if (commitQuorums == commits.length)
+                if (retryTracker.hasReachedQuorum())
                 {
                     new Recover(node, ballot, txnId, txn, shards).handle((success, failure) -> {
                         if (success != null) complete(success);
@@ -71,20 +67,42 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
                 if (isDone())
                     return;
 
-                shards.forEachOn(from, (i, shard) -> {
-                    if (++failures[i] >= shard.slowPathQuorumSize)
-                        completeExceptionally(new accord.messages.Timeout());
-                });
+                retryTracker.recordFailure(from);
+                if (retryTracker.hasFailed())
+                    completeExceptionally(new Timeout());
             }
         }
     }
 
+    // TODO: not sure it makes sense to extend FastPathTracker, as intent here is a bit different
+    static class ShardTracker extends FastPathTracker.FastPathShardTracker
+    {
+        int responsesFromElectorate;
+        public ShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        @Override
+        public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
+        {
+            if (!shard.fastPathElectorate.contains(node))
+                return false;
+
+            ++responsesFromElectorate;
+            return withFastPathTimestamp;
+        }
+
+        @Override
+        public boolean hasMetFastPathCriteria()
+        {
+            int fastPathRejections = responsesFromElectorate - fastPathAccepts;
+            return fastPathRejections <= shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
+        }
+    }
+
     final List<RecoverOk> recoverOks = new ArrayList<>();
-    int[] failure;
-    int[] recovery;
-    int[] recoveryWithFastPath;
-    int recoveryWithFastPathQuorums = 0;
-    int recoveryQuorums = 0;
+    final FastPathTracker<ShardTracker> tracker;
 
     public Recover(Node node, Ballot ballot, TxnId txnId, Txn txn)
     {
@@ -94,16 +112,14 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
     private Recover(Node node, Ballot ballot, TxnId txnId, Txn txn, Shards shards)
     {
         super(node, ballot, txnId, txn, shards);
-        this.failure = new int[this.shards.size()];
-        this.recovery = new int[this.shards.size()];
-        this.recoveryWithFastPath = new int[this.shards.size()];
-        node.send(this.shards, new BeginRecovery(txnId, txn, ballot), this);
+        tracker = new FastPathTracker<>(shards, ShardTracker[]::new, ShardTracker::new);
+        node.send(tracker.nodes(), new BeginRecovery(txnId, txn, ballot), this);
     }
 
     @Override
     public synchronized void onSuccess(Id from, RecoverReply response)
     {
-        if (isDone() || recoveryQuorums == shards.size())
+        if (isDone() || tracker.hasReachedQuorum())
             return;
 
         if (!response.isOK())
@@ -115,15 +131,9 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
         RecoverOk ok = (RecoverOk) response;
         recoverOks.add(ok);
         boolean fastPath = ok.executeAt.compareTo(txnId) == 0;
-        shards.forEachOn(from, (i, shard) -> {
-            if (fastPath && ++recoveryWithFastPath[i] == shard.recoveryFastPathSize)
-                ++recoveryWithFastPathQuorums;
-
-            if (++recovery[i] == shard.slowPathQuorumSize)
-                ++recoveryQuorums;
-        });
+        tracker.recordSuccess(from, fastPath);
 
-        if (recoveryQuorums == shards.size())
+        if (tracker.hasReachedQuorum())
             recover();
     }
 
@@ -173,7 +183,7 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
         }
 
         Timestamp executeAt;
-        if (rejectsFastPath || recoveryWithFastPathQuorums < shards.size())
+        if (rejectsFastPath || !tracker.hasMetFastPathCriteria())
         {
             executeAt = maxExecuteAt;
         }
@@ -197,9 +207,8 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
         if (isDone())
             return;
 
-        shards.forEachOn(from, (i, shard) -> {
-            if (++failure[i] >= shard.slowPathQuorumSize)
-                completeExceptionally(new accord.messages.Timeout());
-        });
+        tracker.recordFailure(from);
+        if (tracker.hasFailed())
+            completeExceptionally(new Timeout());
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Timeout.java b/accord-core/src/main/java/accord/coordinate/Timeout.java
index d405fa7..c706ff2 100644
--- a/accord-core/src/main/java/accord/coordinate/Timeout.java
+++ b/accord-core/src/main/java/accord/coordinate/Timeout.java
@@ -1,4 +1,4 @@
-package accord.messages;
+package accord.coordinate;
 
 /**
  * Thrown when a transaction exceeds its specified timeout for obtaining a result for a client
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
new file mode 100644
index 0000000..b48e1a3
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
@@ -0,0 +1,83 @@
+package accord.coordinate.tracking;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Shards;
+
+public class AbstractQuorumTracker<T extends AbstractQuorumTracker.QuorumShardTracker> extends AbstractResponseTracker<T>
+{
+    static class QuorumShardTracker extends ShardTracker
+    {
+        private final Set<Node.Id> inflight;
+        private int success = 0;
+        private int failures = 0;
+        public QuorumShardTracker(Shard shard)
+        {
+            super(shard);
+            this.inflight = new HashSet<>(shard.nodes);
+        }
+
+        public boolean onSuccess(Node.Id id)
+        {
+            if (!inflight.remove(id))
+                return false;
+            success++;
+            return true;
+        }
+
+        boolean onFailure(Node.Id id)
+        {
+            if (!inflight.remove(id))
+                return false;
+            failures++;
+            return true;
+        }
+
+        boolean hasFailed()
+        {
+            return failures >= shard.slowPathQuorumSize;
+        }
+
+        boolean hasReachedQuorum()
+        {
+            return success >= shard.slowPathQuorumSize;
+        }
+
+        boolean hasInFlight()
+        {
+            return !inflight.isEmpty();
+        }
+    }
+
+    public AbstractQuorumTracker(Shards shards, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+    {
+        super(shards, arrayFactory, trackerFactory);
+    }
+
+    // TODO: refactor to return true if this call caused the state change to failed
+    public void recordFailure(Node.Id node)
+    {
+        forEachTrackerForNode(node, QuorumShardTracker::onFailure);
+    }
+
+    public boolean hasReachedQuorum()
+    {
+        return all(QuorumShardTracker::hasReachedQuorum);
+    }
+
+    public boolean hasFailed()
+    {
+        return any(QuorumShardTracker::hasFailed);
+    }
+
+    public boolean hasInFlight()
+    {
+        return any(QuorumShardTracker::hasInFlight);
+    }
+
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
new file mode 100644
index 0000000..6989e9f
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
@@ -0,0 +1,80 @@
+package accord.coordinate.tracking;
+
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+
+abstract class AbstractResponseTracker<T extends AbstractResponseTracker.ShardTracker>
+{
+    private final Shards shards;
+    private final T[] trackers;
+
+    static class ShardTracker
+    {
+        public final Shard shard;
+
+        public ShardTracker(Shard shard)
+        {
+            this.shard = shard;
+        }
+    }
+
+    public AbstractResponseTracker(Shards shards, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+    {
+        this.shards = shards;
+        this.trackers = arrayFactory.apply(shards.size());
+        shards.forEach((i, shard) -> trackers[i] = trackerFactory.apply(shard));
+    }
+
+    void forEachTrackerForNode(Node.Id node, BiConsumer<T, Node.Id> consumer)
+    {
+        shards.forEachOn(node, (i, shard) -> consumer.accept(trackers[i], node));
+    }
+
+    int matchingTrackersForNode(Node.Id node, Predicate<T> consumer)
+    {
+        return shards.matchesOn(node, (i, shard) -> consumer.test(trackers[i]));
+    }
+
+    boolean all(Predicate<T> predicate)
+    {
+        for (T tracker : trackers)
+            if (!predicate.test(tracker))
+                return false;
+        return true;
+    }
+
+    boolean any(Predicate<T> predicate)
+    {
+        for (T tracker : trackers)
+            if (predicate.test(tracker))
+                return true;
+        return false;
+    }
+
+    <V> V accumulate(BiFunction<T, V, V> function, V start)
+    {
+        for (T tracker : trackers)
+            start = function.apply(tracker, start);
+        return start;
+    }
+
+    public Set<Node.Id> nodes()
+    {
+        return shards.nodes();
+    }
+
+    @VisibleForTesting
+    public T unsafeGet(int i)
+    {
+        return trackers[i];
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
new file mode 100644
index 0000000..b10ae83
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -0,0 +1,46 @@
+package accord.coordinate.tracking;
+
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Shards;
+
+public class FastPathTracker<T extends FastPathTracker.FastPathShardTracker> extends AbstractQuorumTracker<T>
+{
+    public abstract static class FastPathShardTracker extends QuorumTracker.QuorumShardTracker
+    {
+        protected int fastPathAccepts = 0;
+
+        public FastPathShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        public abstract boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp);
+
+        public void onSuccess(Node.Id node, boolean withFastPathTimestamp)
+        {
+            if (onSuccess(node) && includeInFastPath(node, withFastPathTimestamp))
+                fastPathAccepts++;
+        }
+
+        public abstract boolean hasMetFastPathCriteria();
+    }
+
+    public FastPathTracker(Shards shards, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+    {
+        super(shards, arrayFactory, trackerFactory);
+    }
+
+    public void recordSuccess(Node.Id node, boolean withFastPathTimestamp)
+    {
+        forEachTrackerForNode(node, (tracker, n) -> tracker.onSuccess(n, withFastPathTimestamp));
+    }
+
+    public boolean hasMetFastPathCriteria()
+    {
+        return all(FastPathShardTracker::hasMetFastPathCriteria);
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
new file mode 100644
index 0000000..b36b1ef
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -0,0 +1,17 @@
+package accord.coordinate.tracking;
+
+import accord.local.Node;
+import accord.topology.Shards;
+
+public class QuorumTracker extends AbstractQuorumTracker<QuorumTracker.QuorumShardTracker>
+{
+    public QuorumTracker(Shards shards)
+    {
+        super(shards, QuorumShardTracker[]::new, QuorumShardTracker::new);
+    }
+
+    public void recordSuccess(Node.Id node)
+    {
+        forEachTrackerForNode(node, QuorumShardTracker::onSuccess);
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
new file mode 100644
index 0000000..d24f5b7
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -0,0 +1,146 @@
+package accord.coordinate.tracking;
+
+import accord.local.Node.Id;
+import accord.topology.Shard;
+import accord.topology.Shards;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+
+public class ReadTracker extends AbstractResponseTracker<ReadTracker.ReadShardTracker>
+{
+    static class ReadShardTracker extends AbstractResponseTracker.ShardTracker
+    {
+        private final Set<Id> inflight = new HashSet<>();
+        private boolean hasData = false;
+        private int contacted;
+
+        public ReadShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        public void recordInflightRead(Id node)
+        {
+            if (!inflight.add(node))
+                throw new IllegalStateException();
+            ++contacted;
+        }
+
+        public void recordReadSuccess(Id node)
+        {
+            Preconditions.checkArgument(shard.nodes.contains(node));
+            inflight.remove(node);
+            hasData = true;
+        }
+
+        public boolean shouldRead()
+        {
+            return !hasData && inflight.isEmpty();
+        }
+
+        public void recordReadFailure(Id node)
+        {
+            inflight.remove(node);
+        }
+
+        public boolean hasCompletedRead()
+        {
+            return hasData;
+        }
+
+        public boolean hasFailed()
+        {
+            return !hasData && inflight.isEmpty() && contacted == shard.nodes.size();
+        }
+    }
+
+    private final List<Id> candidates;
+
+    public ReadTracker(Shards shards)
+    {
+        super(shards, ReadShardTracker[]::new, ReadShardTracker::new);
+        candidates = new ArrayList<>(shards.nodes());
+    }
+
+    @VisibleForTesting
+    void recordInflightRead(Id node)
+    {
+        forEachTrackerForNode(node, ReadShardTracker::recordInflightRead);
+    }
+
+    public void recordReadSuccess(Id node)
+    {
+        forEachTrackerForNode(node, ReadShardTracker::recordReadSuccess);
+    }
+
+    public void recordReadFailure(Id node)
+    {
+        forEachTrackerForNode(node, ReadShardTracker::recordReadFailure);
+    }
+
+    public boolean hasCompletedRead()
+    {
+        return all(ReadShardTracker::hasCompletedRead);
+    }
+
+    public boolean hasFailed()
+    {
+        return any(ReadShardTracker::hasFailed);
+    }
+
+    private int intersectionSize(Id node, Set<ReadShardTracker> target)
+    {
+        return matchingTrackersForNode(node, target::contains);
+    }
+
+    private int compareIntersections(Id left, Id right, Set<ReadShardTracker> target)
+    {
+        return Integer.compare(intersectionSize(left, target), intersectionSize(right, target));
+    }
+
+    /**
+     * Return the smallest set of nodes needed to satisfy required reads.
+     *
+     * Returns null if the read cannot be completed.
+     */
+    public Set<Id> computeMinimalReadSetAndMarkInflight()
+    {
+        Set<ReadShardTracker> toRead = accumulate((tracker, accumulate) -> {
+            if (!tracker.shouldRead())
+                return accumulate;
+
+            if (accumulate == null)
+                accumulate = new HashSet<>();
+
+            accumulate.add(tracker);
+            return accumulate;
+        }, null);
+
+        if (toRead == null)
+            return Collections.emptySet();
+
+        assert !toRead.isEmpty();
+        Set<Id> nodes = new HashSet<>();
+        while (!toRead.isEmpty())
+        {
+            if (candidates.isEmpty())
+                return null;
+
+            // TODO: Topology needs concept of locality/distance
+            candidates.sort((a, b) -> compareIntersections(a, b, toRead));
+
+            int i = candidates.size() - 1;
+            Id node = candidates.get(i);
+            nodes.add(node);
+            recordInflightRead(node);
+            candidates.remove(i);
+            forEachTrackerForNode(node, (tracker, ignore) -> toRead.remove(tracker));
+        }
+
+        return nodes;
+    }
+
+}
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index e8d6891..9eca55c 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -22,7 +22,7 @@ import static accord.local.Status.ReadyToExecute;
 
 public class Command implements Listener, Consumer<Listener>
 {
-    public final Instance instance;
+    public final CommandStore commandStore;
     private final TxnId txnId;
     private Txn txn;
     private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
@@ -38,9 +38,9 @@ public class Command implements Listener, Consumer<Listener>
 
     private final Listeners listeners = new Listeners();
 
-    public Command(Instance instance, TxnId id)
+    public Command(CommandStore commandStore, TxnId id)
     {
-        this.instance = instance;
+        this.commandStore = commandStore;
         this.txnId = id;
     }
 
@@ -109,17 +109,17 @@ public class Command implements Listener, Consumer<Listener>
         if (hasBeen(PreAccepted))
             return true;
 
-        Timestamp max = txn.maxConflict(instance);
+        Timestamp max = txn.maxConflict(commandStore);
         // unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
         //  - use a global logical clock to issue new timestamps; or
         //  - assign each shard _and_ process a unique id, and use both as components of the timestamp
-        Timestamp witnessed = txnId.compareTo(max) > 0 ? txnId : instance.node().uniqueNow(max);
+        Timestamp witnessed = txnId.compareTo(max) > 0 ? txnId : commandStore.uniqueNow(max);
 
         this.txn = txn;
         this.executeAt = witnessed;
         this.status = PreAccepted;
 
-        txn.register(instance, this);
+        txn.register(commandStore, this);
         listeners.forEach(this);
         return true;
     }
@@ -149,7 +149,7 @@ public class Command implements Listener, Consumer<Listener>
             if (executeAt.equals(this.executeAt))
                 return false;
 
-            instance.node().agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
+            commandStore.agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
         }
 
         witness(txn);
@@ -159,9 +159,9 @@ public class Command implements Listener, Consumer<Listener>
         this.waitingOnCommit = new TreeMap<>();
         this.waitingOnApply = new TreeMap<>();
 
-        for (TxnId id : savedDeps().on(instance.shard))
+        for (TxnId id : savedDeps().on(commandStore))
         {
-            Command command = instance.command(id);
+            Command command = commandStore.command(id);
             switch (command.status)
             {
                 default:
@@ -204,7 +204,7 @@ public class Command implements Listener, Consumer<Listener>
         else if (!hasBeen(Committed))
             commit(txn, deps, executeAt);
         else if (!executeAt.equals(this.executeAt))
-            instance.node().agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
+            commandStore.agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
 
         this.executeAt = executeAt;
         this.writes = writes;
@@ -281,7 +281,7 @@ public class Command implements Listener, Consumer<Listener>
                 listeners.forEach(this);
                 break;
             case Executed:
-                writes.apply(instance);
+                writes.apply(commandStore);
                 status = Applied;
                 listeners.forEach(this);
         }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java
new file mode 100644
index 0000000..f064e5e
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -0,0 +1,449 @@
+package accord.local;
+
+import accord.api.Agent;
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.api.Store;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import accord.topology.Topology;
+import accord.txn.Keys;
+import accord.txn.Timestamp;
+import accord.txn.TxnId;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Single threaded internal shard of accord transaction metadata
+ */
+public abstract class CommandStore
+{
+    public interface Factory
+    {
+        CommandStore create(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store);
+        Factory SYNCHRONIZED = Synchronized::new;
+        Factory SINGLE_THREAD = SingleThread::new;
+        Factory SINGLE_THREAD_DEBUG = SingleThreadDebug::new;
+    }
+
+    private final int index;
+    private final Node.Id nodeId;
+    private final Function<Timestamp, Timestamp> uniqueNow;
+    private final Agent agent;
+    private final Store store;
+
+    /**
+     * maps ranges handled by this command store to their current shards by index
+     */
+    static class RangeMapping
+    {
+        private static final RangeMapping EMPTY = new RangeMapping(KeyRanges.EMPTY, new Shard[0], Shards.EMPTY);
+        final KeyRanges ranges;
+        final Shard[] shards;
+        final Topology topology;
+
+        public RangeMapping(KeyRanges ranges, Shard[] shards, Topology topology)
+        {
+            Preconditions.checkArgument(ranges.size() == shards.length);
+            this.ranges = ranges;
+            this.shards = shards;
+            this.topology = topology;
+        }
+
+        private static class Builder
+        {
+            private final Topology localTopology;
+            private final List<KeyRange> ranges;
+            private final List<Shard> shards;
+
+            public Builder(int minSize, Topology localTopology)
+            {
+                this.localTopology = localTopology;
+                this.ranges = new ArrayList<>(minSize);
+                this.shards = new ArrayList<>(minSize);
+            }
+
+            public void addMapping(KeyRange range, Shard shard)
+            {
+                Preconditions.checkArgument(shard.range.fullyContains(range));
+                ranges.add(range);
+                shards.add(shard);
+            }
+
+            public RangeMapping build()
+            {
+                return new RangeMapping(new KeyRanges(ranges), shards.toArray(Shard[]::new), localTopology);
+            }
+        }
+    }
+
+    public CommandStore(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+    {
+        this.index = index;
+        this.nodeId = nodeId;
+        this.uniqueNow = uniqueNow;
+        this.agent = agent;
+        this.store = store;
+    }
+
+    private volatile RangeMapping rangeMap = RangeMapping.EMPTY;
+
+
+    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+    private final NavigableMap<Key, CommandsForKey> commandsForKey = new TreeMap<>();
+
+    public Command command(TxnId txnId)
+    {
+        return commands.computeIfAbsent(txnId, id -> new Command(this, id));
+    }
+
+    public boolean hasCommand(TxnId txnId)
+    {
+        return commands.containsKey(txnId);
+    }
+
+    public CommandsForKey commandsForKey(Key key)
+    {
+        return commandsForKey.computeIfAbsent(key, ignore -> new CommandsForKey());
+    }
+
+    public boolean hasCommandsForKey(Key key)
+    {
+        return commandsForKey.containsKey(key);
+    }
+
+    public Store store()
+    {
+        return store;
+    }
+
+    public Timestamp uniqueNow(Timestamp atLeast)
+    {
+        return uniqueNow.apply(atLeast);
+    }
+
+    public Agent agent()
+    {
+        return agent;
+    }
+
+    public Node.Id nodeId()
+    {
+        return nodeId;
+    }
+
+    public KeyRanges ranges()
+    {
+        // TODO: check thread safety of callers
+        return rangeMap.ranges;
+    }
+
+    public Set<Node.Id> nodesFor(Command command)
+    {
+        RangeMapping mapping = rangeMap;
+        Keys keys = command.txn().keys;
+
+        Set<Node.Id> result = new HashSet<>();
+        int lowerBound = 0;
+        for (int i=0; i<mapping.ranges.size(); i++)
+        {
+            KeyRange range = mapping.ranges.get(i);
+            int lowKeyIdx = range.lowKeyIndex(keys, lowerBound, keys.size());
+
+            if (lowKeyIdx < -keys.size())
+                break;
+
+            if (lowKeyIdx < 0)
+            {
+                // all remaining keys are greater than this range, so go to the next one
+                lowerBound = -1 - lowKeyIdx;
+                continue;
+            }
+
+            // otherwise this range intersects with the txn, so add it's shard's endpoings
+            // TODO: filter pending nodes for reads
+            result.addAll(mapping.shards[i].nodes);
+            lowerBound = lowKeyIdx;
+        }
+
+        return result;
+    }
+
+    static RangeMapping mapRanges(KeyRanges mergedRanges, Topology localTopology)
+    {
+        RangeMapping.Builder builder = new RangeMapping.Builder(mergedRanges.size(), localTopology);
+        int shardIdx = 0;
+        for (int rangeIdx=0; rangeIdx<mergedRanges.size(); rangeIdx++)
+        {
+            KeyRange mergedRange = mergedRanges.get(rangeIdx);
+            while (shardIdx < localTopology.size())
+            {
+                Shard shard = localTopology.get(shardIdx);
+
+                int cmp = shard.range.compareIntersecting(mergedRange);
+                if (cmp > 0)
+                    throw new IllegalStateException("mapped shards should always be intersecting or greater than the current shard");
+
+                if (cmp < 0)
+                {
+                    shardIdx++;
+                    continue;
+                }
+
+                if (shard.range.fullyContains(mergedRange))
+                {
+                    builder.addMapping(mergedRange, shard);
+                    break;
+                }
+                else
+                {
+                    KeyRange intersection = mergedRange.intersection(shard.range);
+                    Preconditions.checkState(intersection.start().equals(mergedRange.start()));
+                    builder.addMapping(intersection, shard);
+                    mergedRange = mergedRange.subRange(intersection.end(), mergedRange.end());
+                    shardIdx++;
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    void updateTopology(Topology topology, KeyRanges added, KeyRanges removed)
+    {
+        KeyRanges newRanges = rangeMap.ranges.difference(removed).union(added).mergeTouching();
+        rangeMap = mapRanges(newRanges, topology);
+
+        for (KeyRange range : removed)
+        {
+            NavigableMap<Key, CommandsForKey> subMap = commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive());
+            Iterator<Key> keyIterator = subMap.keySet().iterator();
+            while (keyIterator.hasNext())
+            {
+                Key key = keyIterator.next();
+                CommandsForKey forKey = commandsForKey.get(key);
+                if (forKey != null)
+                {
+                    for (Command command : forKey)
+                        if (command.txn() != null && !rangeMap.ranges.intersects(command.txn().keys))
+                            commands.remove(command.txnId());
+                }
+                keyIterator.remove();
+            }
+        }
+    }
+
+    public int index()
+    {
+        return index;
+    }
+
+    public boolean intersects(Keys keys)
+    {
+        return rangeMap.ranges.intersects(keys);
+    }
+
+    public static void onEach(Collection<CommandStore> stores, Consumer<? super CommandStore> consumer)
+    {
+        for (CommandStore store : stores)
+            store.process(consumer);
+    }
+
+    <R> void processInternal(Function<? super CommandStore, R> function, CompletableFuture<R> future)
+    {
+        try
+        {
+            future.complete(function.apply(this));
+        }
+        catch (Throwable e)
+        {
+            future.completeExceptionally(e);
+        }
+    }
+
+    void processInternal(Consumer<? super CommandStore> consumer, CompletableFuture<Void> future)
+    {
+        try
+        {
+            consumer.accept(this);
+            future.complete(null);
+        }
+        catch (Throwable e)
+        {
+            future.completeExceptionally(e);
+        }
+    }
+
+    public abstract <R> CompletionStage<R> process(Function<? super CommandStore, R> function);
+
+    public abstract CompletionStage<Void> process(Consumer<? super CommandStore> consumer);
+
+    public abstract void shutdown();
+
+    public static class Synchronized extends CommandStore
+    {
+        public Synchronized(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+        {
+            super(index, nodeId, uniqueNow, agent, store);
+        }
+
+        @Override
+        public synchronized <R> CompletionStage<R> process(Function<? super CommandStore, R> func)
+        {
+            CompletableFuture<R> future = new CompletableFuture<>();
+            processInternal(func, future);
+            return future;
+        }
+
+        @Override
+        public synchronized CompletionStage<Void> process(Consumer<? super CommandStore> consumer)
+        {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            processInternal(consumer, future);
+            return future;
+        }
+
+        @Override
+        public void shutdown() {}
+    }
+
+    public static class SingleThread extends CommandStore
+    {
+        private final ExecutorService executor;
+
+        private class FunctionWrapper<R> extends CompletableFuture<R> implements Runnable
+        {
+            private final Function<? super CommandStore, R> function;
+
+            public FunctionWrapper(Function<? super CommandStore, R> function)
+            {
+                this.function = function;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(function, this);
+            }
+        }
+
+        private class ConsumerWrapper extends CompletableFuture<Void> implements Runnable
+        {
+            private final Consumer<? super CommandStore> consumer;
+
+            public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+            {
+                this.consumer = consumer;
+            }
+
+            @Override
+            public void run()
+            {
+                processInternal(consumer, this);
+            }
+        }
+
+        public SingleThread(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+        {
+            super(index, nodeId, uniqueNow, agent, store);
+            executor = Executors.newSingleThreadExecutor(r -> {
+                Thread thread = new Thread(r);
+                thread.setName(CommandStore.class.getSimpleName() + '[' + nodeId + ':' + index + ']');
+                return thread;
+            });
+        }
+
+        @Override
+        public <R> CompletionStage<R> process(Function<? super CommandStore, R> function)
+        {
+            FunctionWrapper<R> future = new FunctionWrapper<>(function);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public CompletionStage<Void> process(Consumer<? super CommandStore> consumer)
+        {
+            ConsumerWrapper future = new ConsumerWrapper(consumer);
+            executor.execute(future);
+            return future;
+        }
+
+        @Override
+        public void shutdown()
+        {
+            executor.shutdown();
+        }
+    }
+
+    public static class SingleThreadDebug extends SingleThread
+    {
+        private final AtomicReference<Thread> expectedThread = new AtomicReference<>();
+
+        public SingleThreadDebug(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+        {
+            super(index, nodeId, uniqueNow, agent, store);
+        }
+
+        private void assertThread()
+        {
+            Thread current = Thread.currentThread();
+            Thread expected;
+            while (true)
+            {
+                expected = expectedThread.get();
+                if (expected != null)
+                    break;
+                expectedThread.compareAndSet(null, Thread.currentThread());
+            }
+            Preconditions.checkState(expected == current);
+        }
+
+        @Override
+        public Command command(TxnId txnId)
+        {
+            assertThread();
+            return super.command(txnId);
+        }
+
+        @Override
+        public boolean hasCommand(TxnId txnId)
+        {
+            assertThread();
+            return super.hasCommand(txnId);
+        }
+
+        @Override
+        public CommandsForKey commandsForKey(Key key)
+        {
+            assertThread();
+            return super.commandsForKey(key);
+        }
+
+        @Override
+        public boolean hasCommandsForKey(Key key)
+        {
+            assertThread();
+            return super.hasCommandsForKey(key);
+        }
+
+        @Override
+        <R> void processInternal(Function<? super CommandStore, R> function, CompletableFuture<R> future)
+        {
+            assertThread();
+            super.processInternal(function, future);
+        }
+
+        @Override
+        void processInternal(Consumer<? super CommandStore> consumer, CompletableFuture<Void> future)
+        {
+            assertThread();
+            super.processInternal(consumer, future);
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
new file mode 100644
index 0000000..e28f6ba
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -0,0 +1,154 @@
+package accord.local;
+
+import accord.api.Agent;
+import accord.api.KeyRange;
+import accord.api.Store;
+import accord.topology.KeyRanges;
+import accord.topology.Shards;
+import accord.topology.Topology;
+import accord.txn.Keys;
+import accord.txn.Timestamp;
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Manages the single threaded metadata shards
+ */
+public class CommandStores
+{
+    private Topology localTopology = Shards.EMPTY;
+    private final CommandStore[] commandStores;
+
+    public CommandStores(int num, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory)
+    {
+        this.commandStores = new CommandStore[num];
+        for (int i=0; i<num; i++)
+            commandStores[i] = shardFactory.create(i, nodeId, uniqueNow, agent, store);
+    }
+
+    public synchronized void shutdown()
+    {
+        for (CommandStore commandStore : commandStores)
+            commandStore.shutdown();
+    }
+
+    public Stream<CommandStore> stream()
+    {
+        return StreamSupport.stream(new ShardSpliterator(), false);
+    }
+
+    public Stream<CommandStore> forKeys(Keys keys)
+    {
+        // TODO: filter shards before sending to their thread?
+        return stream().filter(commandShard -> commandShard.intersects(keys));
+    }
+
+    static List<KeyRanges> shardRanges(KeyRanges ranges, int shards)
+    {
+        List<List<KeyRange>> sharded = new ArrayList<>(shards);
+        for (int i=0; i<shards; i++)
+            sharded.add(new ArrayList<>(ranges.size()));
+
+        for (KeyRange range : ranges)
+        {
+            KeyRanges split = range.split(shards);
+            Preconditions.checkState(split.size() <= shards);
+            for (int i=0; i<split.size(); i++)
+                sharded.get(i).add(split.get(i));
+        }
+
+        List<KeyRanges> result = new ArrayList<>(shards);
+        for (int i=0; i<shards; i++)
+        {
+            result.add(new KeyRanges(sharded.get(i).toArray(KeyRange[]::new)));
+        }
+
+        return result;
+    }
+
+    public synchronized void updateTopology(Topology newTopology)
+    {
+        KeyRanges removed = localTopology.ranges().difference(newTopology.ranges());
+        KeyRanges added = newTopology.ranges().difference(localTopology.ranges());
+        List<KeyRanges> sharded = shardRanges(added, commandStores.length);
+        stream().forEach(commands -> commands.updateTopology(newTopology, sharded.get(commands.index()), removed));
+        localTopology = newTopology;
+    }
+
+    private class ShardSpliterator implements Spliterator<CommandStore>
+    {
+        int i = 0;
+
+        @Override
+        public boolean tryAdvance(Consumer<? super CommandStore> action)
+        {
+            if (i < commandStores.length)
+            {
+                CommandStore shard = commandStores[i++];
+                try
+                {
+                    shard.process(action).toCompletableFuture().get();
+                }
+                catch (InterruptedException | ExecutionException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
+            }
+            return i < commandStores.length;
+        }
+
+        @Override
+        public void forEachRemaining(Consumer<? super CommandStore> action)
+        {
+            if (i >= commandStores.length)
+                return;
+
+            CompletableFuture<Void>[] futures = new CompletableFuture[commandStores.length - i];
+            for (; i< commandStores.length; i++)
+                futures[i] = commandStores[i].process(action).toCompletableFuture();
+
+            try
+            {
+                for (CompletableFuture<Void> future : futures)
+                    future.get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (ExecutionException e)
+            {
+                Throwable cause = e.getCause();
+                throw new RuntimeException(cause != null ? cause : e);
+            }
+        }
+
+        @Override
+        public Spliterator<CommandStore> trySplit()
+        {
+            return null;
+        }
+
+        @Override
+        public long estimateSize()
+        {
+            return commandStores.length;
+        }
+
+        @Override
+        public int characteristics()
+        {
+            return Spliterator.SIZED | Spliterator.NONNULL | Spliterator.DISTINCT | Spliterator.IMMUTABLE;
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 36802e9..8ee847b 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -1,12 +1,14 @@
 package accord.local;
 
+import java.util.Iterator;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
 import accord.txn.Timestamp;
 import accord.txn.TxnId;
+import com.google.common.collect.Iterators;
 
-public class CommandsForKey implements Listener
+public class CommandsForKey implements Listener, Iterable<Command>
 {
     // TODO: efficiency
     public final NavigableMap<Timestamp, Command> uncommitted = new TreeMap<>();
@@ -43,4 +45,10 @@ public class CommandsForKey implements Listener
         uncommitted.put(command.txnId(), command);
         command.addListener(this);
     }
+
+    @Override
+    public Iterator<Command> iterator()
+    {
+        return Iterators.concat(uncommitted.values().iterator(), committedByExecuteAt.values().iterator());
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/Instance.java b/accord-core/src/main/java/accord/local/Instance.java
deleted file mode 100644
index c842b1e..0000000
--- a/accord-core/src/main/java/accord/local/Instance.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package accord.local;
-
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import accord.api.Key;
-import accord.api.Store;
-import accord.topology.Shard;
-import accord.txn.TxnId;
-
-/**
- * node-local accord metadata per shard
- */
-public class Instance
-{
-    public final Shard shard;
-    private final Node node;
-    private final Store store;
-    private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
-    private final NavigableMap<Key, CommandsForKey> commandsForKey = new TreeMap<>();
-
-    public Instance(Shard shard, Node node, Store store)
-    {
-        this.shard = shard;
-        this.node = node;
-        this.store = store;
-    }
-
-    public Command command(TxnId txnId)
-    {
-        return commands.computeIfAbsent(txnId, id -> new Command(this, id));
-    }
-
-    public boolean hasCommand(TxnId txnId)
-    {
-        return commands.containsKey(txnId);
-    }
-
-    public CommandsForKey commandsForKey(Key key)
-    {
-        return commandsForKey.computeIfAbsent(key, ignore -> new CommandsForKey());
-    }
-
-    public boolean hasCommandsForKey(Key key)
-    {
-        return commandsForKey.containsKey(key);
-    }
-
-    public Store store()
-    {
-        return store;
-    }
-
-    public Node node()
-    {
-        return node;
-    }
-}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 7724186..05c9cb0 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -70,10 +70,14 @@ public class Node
         }
     }
 
+    public static int numCommandShards()
+    {
+        return 8; // TODO: make configurable
+    }
+
+    private final CommandStores commandStores;
     private final Id id;
     private final Topology cluster;
-    private final Shards local;
-    private final Instance[] instances;
     private final MessageSink messageSink;
     private final Random random;
 
@@ -87,20 +91,24 @@ public class Node
     private final Map<TxnId, CompletionStage<Result>> coordinating = new ConcurrentHashMap<>();
     private final Set<TxnId> pendingRecovery = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-    public Node(Id id, Topology cluster, Shards local, MessageSink messageSink, Random random, LongSupplier nowSupplier, Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler)
+    public Node(Id id, Topology cluster, MessageSink messageSink, Random random, LongSupplier nowSupplier,
+                Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStore.Factory commandStoreFactory)
     {
         this.id = id;
         this.cluster = cluster;
         this.random = random;
         this.agent = agent;
         this.now = new AtomicReference<>(new Timestamp(nowSupplier.getAsLong(), 0, id));
-        this.local = local;
         this.messageSink = messageSink;
-        this.instances = new Instance[local.size()];
         this.nowSupplier = nowSupplier;
         this.scheduler = scheduler;
-        for (int i = 0 ; i < instances.length ; ++i)
-            instances[i] = new Instance(local.get(i), this, dataSupplier.get());
+        this.commandStores = new CommandStores(numCommandShards(), id, this::uniqueNow, agent, dataSupplier.get(), commandStoreFactory);
+        this.commandStores.updateTopology(cluster.forNode(id));
+    }
+
+    public void shutdown()
+    {
+        commandStores.shutdown();
     }
 
     public Timestamp uniqueNow()
@@ -136,13 +144,12 @@ public class Node
         return cluster;
     }
 
-    public Stream<Instance> local(Keys keys)
+    public Stream<CommandStore> local(Keys keys)
     {
-        // TODO: efficiency
-        return Stream.of(local.select(keys, instances, Instance[]::new));
+        return commandStores.forKeys(keys);
     }
 
-    public Optional<Instance> local(Key key)
+    public Optional<CommandStore> local(Key key)
     {
         return local(Keys.of(key)).reduce((i1, i2) -> {
             throw new IllegalStateException("more than one instance encountered for key");
@@ -190,6 +197,18 @@ public class Node
         });
     }
 
+    public <T> void send(Collection<Id> to, Request send)
+    {
+        for (Id dst: to)
+            send(dst, send);
+    }
+
+    public <T> void send(Collection<Id> to, Request send, Callback<T> callback)
+    {
+        for (Id dst: to)
+            send(dst, send, callback);
+    }
+
     // send to a specific node
     public <T> void send(Id to, Request send, Callback<T> callback)
     {
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 2c3aeb2..785968c 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -1,7 +1,5 @@
 package accord.messages;
 
-import accord.messages.Reply;
-import accord.messages.Request;
 import accord.api.Result;
 import accord.txn.Writes;
 import accord.txn.Ballot;
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index 0ae2e76..d14c6fd 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -4,11 +4,9 @@ import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.TreeMap;
 
-import accord.local.Instance;
+import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.messages.Reply;
-import accord.messages.Request;
 import accord.txn.Timestamp;
 import accord.local.Command;
 import accord.txn.Dependencies;
@@ -113,10 +111,10 @@ public class PreAccept implements Request
         }
     }
 
-    static Dependencies calculateDeps(Instance instance, TxnId txnId, Txn txn, Timestamp executeAt)
+    static Dependencies calculateDeps(CommandStore commandStore, TxnId txnId, Txn txn, Timestamp executeAt)
     {
         NavigableMap<TxnId, Txn> deps = new TreeMap<>();
-        txn.conflictsMayExecuteBefore(instance, executeAt).forEach(conflict -> {
+        txn.conflictsMayExecuteBefore(commandStore, executeAt).forEach(conflict -> {
             if (conflict.txnId().equals(txnId))
                 return;
 
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 45f9f30..8ad2937 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -5,15 +5,9 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import accord.local.Instance;
-import accord.local.Node;
+import accord.local.*;
 import accord.local.Node.Id;
 import accord.api.Data;
-import accord.messages.Reply;
-import accord.messages.Request;
-import accord.local.Command;
-import accord.local.Listener;
-import accord.local.Status;
 import accord.txn.Txn;
 import accord.txn.TxnId;
 import accord.txn.Timestamp;
@@ -31,7 +25,7 @@ public class ReadData implements Request
 
         Data data;
         boolean isObsolete; // TODO: respond with the Executed result we have stored?
-        Set<Instance> waitingOn;
+        Set<CommandStore> waitingOn;
         Scheduled waitingOnReporter;
 
         LocalRead(TxnId txnId, Node node, Id replyToNode, long replyToMessage)
@@ -56,7 +50,7 @@ public class ReadData implements Request
             @Override
             public void run()
             {
-                Iterator<Instance> i = waitingOn.iterator();
+                Iterator<CommandStore> i = waitingOn.iterator();
                 Command blockedBy = null;
                 while (i.hasNext() && null == (blockedBy = i.next().command(txnId).blockedBy()));
                 if (blockedBy == null) return;
@@ -94,7 +88,7 @@ public class ReadData implements Request
             Data next = command.txn().read(command);
             data = data == null ? next : data.merge(next);
 
-            waitingOn.remove(command.instance);
+            waitingOn.remove(command.commandStore);
             if (waitingOn.isEmpty())
             {
                 waitingOnReporter.cancel();
@@ -108,7 +102,8 @@ public class ReadData implements Request
             {
                 isObsolete = true;
                 waitingOnReporter.cancel();
-                node.send(command.instance.shard, new Apply(command.txnId(), command.txn(), command.executeAt(), command.savedDeps(), command.writes(), command.result()));
+                // FIXME: this may result in redundant messages being sent when a shard is split across several command shards
+                node.send(command.commandStore.nodesFor(command), new Apply(command.txnId(), command.txn(), command.executeAt(), command.savedDeps(), command.writes(), command.result()));
                 node.reply(replyToNode, replyToMessage, new ReadNack());
             }
         }
@@ -117,7 +112,8 @@ public class ReadData implements Request
         {
             // TODO: simple hash set supporting concurrent modification, or else avoid concurrent modification
             waitingOn = txn.local(node).collect(Collectors.toCollection(() -> new DeterministicIdentitySet<>()));
-            waitingOn.forEach(instance -> {
+            // FIXME: fix/check thread safety
+            CommandStore.onEach(waitingOn, instance -> {
                 Command command = instance.command(txnId);
                 command.witness(txn);
                 switch (command.status())
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
index 2007215..e825bc8 100644
--- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -3,13 +3,8 @@ package accord.messages;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import accord.local.Instance;
-import accord.local.Node;
+import accord.local.*;
 import accord.local.Node.Id;
-import accord.messages.Reply;
-import accord.messages.Request;
-import accord.local.Command;
-import accord.local.Listener;
 import accord.txn.TxnId;
 import accord.txn.Keys;
 
@@ -60,7 +55,7 @@ public class WaitOnCommit implements Request
 
         synchronized void setup(TxnId txnId, Keys keys)
         {
-            List<Instance> instances = node.local(keys).collect(Collectors.toList());
+            List<CommandStore> instances = node.local(keys).collect(Collectors.toList());
             waitingOn = instances.size();
             instances.forEach(instance -> {
                 Command command = instance.command(txnId);
diff --git a/accord-core/src/main/java/accord/topology/KeyRanges.java b/accord-core/src/main/java/accord/topology/KeyRanges.java
index 90ed527..3702276 100644
--- a/accord-core/src/main/java/accord/topology/KeyRanges.java
+++ b/accord-core/src/main/java/accord/topology/KeyRanges.java
@@ -2,26 +2,57 @@ package accord.topology;
 
 import accord.api.Key;
 import accord.api.KeyRange;
+import accord.txn.Keys;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
 
-import java.util.Arrays;
+import java.util.*;
 
-public class KeyRanges
+public class KeyRanges implements Iterable<KeyRange>
 {
     public static final KeyRanges EMPTY = new KeyRanges(new KeyRange[0]);
 
+    // TODO: fix raw parameterized use
     private final KeyRange[] ranges;
 
     public KeyRanges(KeyRange[] ranges)
     {
+        Preconditions.checkNotNull(ranges);
         this.ranges = ranges;
     }
 
+    public KeyRanges(List<KeyRange> ranges)
+    {
+        this(ranges.toArray(KeyRange[]::new));
+    }
+
     @Override
     public String toString()
     {
         return Arrays.toString(ranges);
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        KeyRanges ranges1 = (KeyRanges) o;
+        return Arrays.equals(ranges, ranges1.ranges);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Arrays.hashCode(ranges);
+    }
+
+    @Override
+    public Iterator<KeyRange> iterator()
+    {
+        return Iterators.forArray(ranges);
+    }
+
     public int rangeIndexForKey(int lowerBound, int upperBound, Key key)
     {
         return Arrays.binarySearch(ranges, lowerBound, upperBound, key,
@@ -38,11 +69,122 @@ public class KeyRanges
         return ranges.length;
     }
 
+    public KeyRange get(int i)
+    {
+        return ranges[i];
+    }
+
+    public boolean isEmpty()
+    {
+        return size() == 0;
+    }
+
     public KeyRanges select(int[] indexes)
     {
         KeyRange[] selection = new KeyRange[indexes.length];
         for (int i=0; i<indexes.length; i++)
-            selection[i] = ranges[i];
+            selection[i] = ranges[indexes[i]];
         return new KeyRanges(selection);
     }
+
+    public boolean intersects(Keys keys)
+    {
+        for (int i=0; i<ranges.length; i++)
+            if (ranges[i].intersects(keys))
+                return true;
+        return false;
+    }
+
+    /**
+     * Subtracts the given set of key ranges from this
+     * @param that
+     * @return
+     */
+    public KeyRanges difference(KeyRanges that)
+    {
+        List<KeyRange> result = new ArrayList<>(this.size() + that.size());
+        int thatIdx = 0;
+
+        for (int thisIdx=0; thisIdx<this.size(); thisIdx++)
+        {
+            KeyRange thisRange = this.ranges[thisIdx];
+            while (thatIdx < that.size())
+            {
+                KeyRange thatRange = that.ranges[thatIdx];
+
+                int cmp = thisRange.compareIntersecting(thatRange);
+                if (cmp > 0)
+                {
+                    thatIdx++;
+                    continue;
+                }
+                if (cmp < 0) break;
+
+                int scmp = thisRange.start().compareTo(thatRange.start());
+                int ecmp = thisRange.end().compareTo(thatRange.end());
+
+                if (scmp < 0)
+                    result.add(thisRange.subRange(thisRange.start(), thatRange.start()));
+
+                if (ecmp <= 0)
+                {
+                    thisRange = null;
+                    break;
+                }
+                else
+                {
+                    thisRange = thisRange.subRange(thatRange.end(), thisRange.end());
+                    thatIdx++;
+                }
+            }
+            if (thisRange != null)
+                result.add(thisRange);
+        }
+        return new KeyRanges(result.toArray(KeyRange[]::new));
+    }
+
+    /**
+     * Adds a set of non-overlapping ranges
+     */
+    public KeyRanges union(KeyRanges that)
+    {
+        KeyRange[] combined = new KeyRange[this.ranges.length + that.ranges.length];
+        System.arraycopy(this.ranges, 0, combined, 0, this.ranges.length);
+        System.arraycopy(that.ranges, 0, combined, this.ranges.length, that.ranges.length);
+        Arrays.sort(combined, Comparator.comparing(KeyRange::start));
+
+        for (int i=1; i<combined.length; i++)
+            Preconditions.checkArgument(combined[i].compareIntersecting(combined[i -1]) != 0);
+
+        return new KeyRanges(combined);
+    }
+
+    public KeyRanges union(KeyRange range)
+    {
+        return union(new KeyRanges(new KeyRange[]{range}));
+    }
+
+    public KeyRanges mergeTouching()
+    {
+        if (ranges.length == 0)
+            return this;
+        List<KeyRange> result = new ArrayList<>(ranges.length);
+        KeyRange current = ranges[0];
+        for (int i=1; i<ranges.length; i++)
+        {
+            KeyRange merged = current.tryMerge(ranges[i]);
+            if (merged != null)
+            {
+                current = merged;
+            }
+            else
+            {
+                result.add(current);
+                current = ranges[i];
+            }
+        }
+        result.add(current);
+        return new KeyRanges(result.toArray(KeyRange[]::new));
+    }
+
 }
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index fcd7ee2..99357ce 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -8,11 +8,15 @@ import accord.local.Node.Id;
 import accord.api.Key;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 public class Shard
 {
     public final KeyRange range;
+    // TODO: use BTreeSet to combine these two (or introduce version that operates over long values)
     public final List<Id> nodes;
+    public final Set<Id> nodeSet;
     public final Set<Id> fastPathElectorate;
     public final int recoveryFastPathSize;
     public final int fastPathQuorumSize;
@@ -21,9 +25,10 @@ public class Shard
     public Shard(KeyRange range, List<Id> nodes, Set<Id> fastPathElectorate)
     {
         this.range = range;
-        this.nodes = nodes;
+        this.nodes = ImmutableList.copyOf(nodes);
+        this.nodeSet = ImmutableSet.copyOf(nodes);
         int f = maxToleratedFailures(nodes.size());
-        this.fastPathElectorate = fastPathElectorate;
+        this.fastPathElectorate = ImmutableSet.copyOf(fastPathElectorate);
         int e = fastPathElectorate.size();
         this.recoveryFastPathSize = (f+1)/2;
         this.slowPathQuorumSize = f + 1;
@@ -43,6 +48,11 @@ public class Shard
         return (f + electorate)/2 + 1;
     }
 
+    public int rf()
+    {
+        return nodes.size();
+    }
+
     public boolean contains(Key key)
     {
         return range.containsKey(key);
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index f568307..5f645f2 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -7,6 +7,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.IntFunction;
 import java.util.stream.IntStream;
 
@@ -15,6 +16,7 @@ import accord.local.Node.Id;
 import accord.api.Key;
 import accord.txn.Keys;
 import accord.utils.IndexedConsumer;
+import accord.utils.IndexedPredicate;
 
 public class Topology extends AbstractCollection<Shard>
 {
@@ -22,7 +24,7 @@ public class Topology extends AbstractCollection<Shard>
     final KeyRanges ranges;
     final Map<Id, Shards.NodeInfo> nodeLookup;
     final KeyRanges subsetOfRanges;
-    final int[] supersetIndexes;
+    final int[] supersetRangeIndexes;
 
     static class NodeInfo
     {
@@ -47,7 +49,7 @@ public class Topology extends AbstractCollection<Shard>
         this.ranges = new KeyRanges(Arrays.stream(shards).map(shard -> shard.range).toArray(KeyRange[]::new));
         this.shards = shards;
         this.subsetOfRanges = ranges;
-        this.supersetIndexes = IntStream.range(0, shards.length).toArray();
+        this.supersetRangeIndexes = IntStream.range(0, shards.length).toArray();
         this.nodeLookup = new HashMap<>();
         Map<Id, List<Integer>> build = new HashMap<>();
         for (int i = 0 ; i < shards.length ; ++i)
@@ -69,7 +71,7 @@ public class Topology extends AbstractCollection<Shard>
         this.ranges = ranges;
         this.nodeLookup = nodeLookup;
         this.subsetOfRanges = subsetOfRanges;
-        this.supersetIndexes = supersetIndexes;
+        this.supersetRangeIndexes = supersetIndexes;
     }
 
     public Shards forNode(Id node)
@@ -99,7 +101,7 @@ public class Topology extends AbstractCollection<Shard>
             subsetIndex = subsetOfRanges.rangeIndexForKey(subsetIndex, subsetOfRanges.size(), select.get(i));
             if (subsetIndex < 0 || subsetIndex >= subsetOfRanges.size())
                 throw new IllegalArgumentException("Range not found for " + select.get(i));
-            int supersetIndex = supersetIndexes[subsetIndex];
+            int supersetIndex = supersetRangeIndexes[subsetIndex];
             newSubset[count++] = supersetIndex;
             Shard shard = shards[supersetIndex];
             // find the first key outside this range
@@ -108,6 +110,14 @@ public class Topology extends AbstractCollection<Shard>
         if (count != newSubset.length)
             newSubset = Arrays.copyOf(newSubset, count);
         KeyRanges rangeSubset = ranges.select(newSubset);
+
+        // TODO: more efficient sharing of nodeLookup state
+        Map<Id, NodeInfo> nodeLookup = new HashMap<>();
+        for (Map.Entry<Id, NodeInfo> e : this.nodeLookup.entrySet())
+        {
+            if (intersects(newSubset, e.getValue().supersetIndexes))
+                nodeLookup.put(e.getKey(), e.getValue());
+        }
         return new Shards(shards, ranges, nodeLookup, rangeSubset, newSubset);
     }
 
@@ -118,11 +128,11 @@ public class Topology extends AbstractCollection<Shard>
     public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
     {
         Shards.NodeInfo info = nodeLookup.get(on);
-        for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
+        for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetRangeIndexes.length && k < info.supersetIndexes.length ;)
         {
             Key key = select.get(i);
-            Shard shard = shards[supersetIndexes[j]];
-            int c = supersetIndexes[j] - info.supersetIndexes[k];
+            Shard shard = shards[supersetRangeIndexes[j]];
+            int c = supersetRangeIndexes[j] - info.supersetIndexes[k];
             if (c < 0) ++j;
             else if (c > 0) ++k;
             else
@@ -139,7 +149,9 @@ public class Topology extends AbstractCollection<Shard>
     {
         // TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
         Shards.NodeInfo info = nodeLookup.get(on);
-        int[] a = supersetIndexes, b = info.supersetIndexes;
+        if (info == null)
+            return;
+        int[] a = supersetRangeIndexes, b = info.supersetIndexes;
         int ai = 0, bi = 0;
         while (ai < a.length && bi < b.length)
         {
@@ -161,19 +173,48 @@ public class Topology extends AbstractCollection<Shard>
         }
     }
 
+    public int matchesOn(Id on, IndexedPredicate<Shard> consumer)
+    {
+        // TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
+        int count = 0;
+        Shards.NodeInfo info = nodeLookup.get(on);
+        int[] a = supersetRangeIndexes, b = info.supersetIndexes;
+        int ai = 0, bi = 0;
+        while (ai < a.length && bi < b.length)
+        {
+            if (a[ai] == b[bi])
+            {
+                if (consumer.test(ai, shards[a[ai]]))
+                    ++count;
+                ++ai; ++bi;
+            }
+            else if (a[ai] < b[bi])
+            {
+                ai = Arrays.binarySearch(a, ai + 1, a.length, b[bi]);
+                if (ai < 0) ai = -1 -ai;
+            }
+            else
+            {
+                bi = Arrays.binarySearch(b, bi + 1, b.length, a[ai]);
+                if (bi < 0) bi = -1 -bi;
+            }
+        }
+        return count;
+    }
+
     public void forEach(IndexedConsumer<Shard> consumer)
     {
-        for (int i = 0 ; i < supersetIndexes.length ; ++i)
-            consumer.accept(i, shards[supersetIndexes[i]]);
+        for (int i = 0; i < supersetRangeIndexes.length ; ++i)
+            consumer.accept(i, shards[supersetRangeIndexes[i]]);
     }
 
     public <T> T[] select(Keys select, T[] indexedByShard, IntFunction<T[]> constructor)
     {
         List<T> selection = new ArrayList<>();
-        for (int i = 0, j = 0 ; i < select.size() && j < supersetIndexes.length ;)
+        for (int i = 0, j = 0 ; i < select.size() && j < supersetRangeIndexes.length ;)
         {
             Key k = select.get(i);
-            Shard shard = shards[supersetIndexes[j]];
+            Shard shard = shards[supersetRangeIndexes[j]];
 
             int c = shard.range.compareKey(k);
             if (c < 0) ++i;
@@ -187,7 +228,7 @@ public class Topology extends AbstractCollection<Shard>
     @Override
     public Iterator<Shard> iterator()
     {
-        return IntStream.of(supersetIndexes).mapToObj(i -> shards[i]).iterator();
+        return IntStream.of(supersetRangeIndexes).mapToObj(i -> shards[i]).iterator();
     }
 
     @Override
@@ -196,8 +237,38 @@ public class Topology extends AbstractCollection<Shard>
         return subsetOfRanges.size();
     }
 
+    public int maxRf()
+    {
+        int rf = Integer.MIN_VALUE;
+        for (int i : supersetRangeIndexes)
+            rf = Math.max(rf, shards[i].rf());
+        return rf;
+    }
+
     public Shard get(int index)
     {
-        return shards[supersetIndexes[index]];
+        return shards[supersetRangeIndexes[index]];
+    }
+
+    public Set<Id> nodes()
+    {
+        return nodeLookup.keySet();
+    }
+
+    public KeyRanges ranges()
+    {
+        return ranges;
+    }
+
+    private static boolean intersects(int[] is, int[] js)
+    {
+        for (int i = 0, j = 0 ; i < is.length && j < js.length ;)
+        {
+            int c = is[i] - js[j];
+            if (c < 0) ++i;
+            else if (c > 0) ++j;
+            else return true;
+        }
+        return false;
     }
 }
diff --git a/accord-core/src/main/java/accord/txn/Dependencies.java b/accord-core/src/main/java/accord/txn/Dependencies.java
index 0c6fa83..4e9e114 100644
--- a/accord-core/src/main/java/accord/txn/Dependencies.java
+++ b/accord-core/src/main/java/accord/txn/Dependencies.java
@@ -7,7 +7,7 @@ import java.util.Objects;
 import java.util.TreeMap;
 
 import accord.local.Command;
-import accord.topology.Shard;
+import accord.local.CommandStore;
 import com.google.common.annotations.VisibleForTesting;
 
 // TODO: do not send Txn
@@ -64,13 +64,12 @@ public class Dependencies implements Iterable<Entry<TxnId, Txn>>
         return deps.get(txnId);
     }
 
-    public Iterable<TxnId> on(Shard shard)
+    public Iterable<TxnId> on(CommandStore commandStore)
     {
-        // TODO: efficiency
         return deps.entrySet()
-                   .stream()
-                   .filter(e -> e.getValue().keys().stream().anyMatch(shard::contains))
-                   .map(Entry::getKey)::iterator;
+                .stream()
+                .filter(e -> commandStore.intersects(e.getValue().keys()))
+                .map(Entry::getKey)::iterator;
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/txn/Txn.java b/accord-core/src/main/java/accord/txn/Txn.java
index 1249d97..e67a6a2 100644
--- a/accord-core/src/main/java/accord/txn/Txn.java
+++ b/accord-core/src/main/java/accord/txn/Txn.java
@@ -4,10 +4,8 @@ import java.util.Comparator;
 import java.util.stream.Stream;
 
 import accord.api.*;
-import accord.local.Command;
-import accord.local.CommandsForKey;
-import accord.local.Instance;
-import accord.local.Node;
+import accord.local.*;
+import accord.topology.KeyRanges;
 
 public class Txn
 {
@@ -74,32 +72,32 @@ public class Txn
         return "read:" + read.toString() + (update != null ? ", update:" + update : "");
     }
 
-    public Data read(KeyRange range, Store store)
+    public Data read(KeyRanges range, Store store)
     {
         return read.read(range, store);
     }
 
     public Data read(Command command)
     {
-        Instance instance = command.instance;
-        return read(instance.shard.range, instance.store());
+        CommandStore commandStore = command.commandStore;
+        return read(commandStore.ranges(), commandStore.store());
     }
 
     // TODO: move these somewhere else?
-    public Stream<Instance> local(Node node)
+    public Stream<CommandStore> local(Node node)
     {
         return node.local(keys());
     }
 
-    public Timestamp maxConflict(Instance instance)
+    public Timestamp maxConflict(CommandStore commandStore)
     {
-        return maxConflict(instance, keys());
+        return maxConflict(commandStore, keys());
     }
 
-    public Stream<Command> conflictsMayExecuteBefore(Instance instance, Timestamp mayExecuteBefore)
+    public Stream<Command> conflictsMayExecuteBefore(CommandStore commandStore, Timestamp mayExecuteBefore)
     {
         return keys().stream().flatMap(key -> {
-            CommandsForKey forKey = instance.commandsForKey(key);
+            CommandsForKey forKey = commandStore.commandsForKey(key);
             return Stream.concat(
             forKey.uncommitted.headMap(mayExecuteBefore, false).values().stream(),
             // TODO: only return latest of Committed?
@@ -108,48 +106,48 @@ public class Txn
         });
     }
 
-    public Stream<Command> uncommittedStartedBefore(Instance instance, TxnId startedBefore)
+    public Stream<Command> uncommittedStartedBefore(CommandStore commandStore, TxnId startedBefore)
     {
         return keys().stream().flatMap(key -> {
-            CommandsForKey forKey = instance.commandsForKey(key);
+            CommandsForKey forKey = commandStore.commandsForKey(key);
             return forKey.uncommitted.headMap(startedBefore, false).values().stream();
         });
     }
 
-    public Stream<Command> committedStartedBefore(Instance instance, TxnId startedBefore)
+    public Stream<Command> committedStartedBefore(CommandStore commandStore, TxnId startedBefore)
     {
         return keys().stream().flatMap(key -> {
-            CommandsForKey forKey = instance.commandsForKey(key);
+            CommandsForKey forKey = commandStore.commandsForKey(key);
             return forKey.committedById.headMap(startedBefore, false).values().stream();
         });
     }
 
-    public Stream<Command> uncommittedStartedAfter(Instance instance, TxnId startedAfter)
+    public Stream<Command> uncommittedStartedAfter(CommandStore commandStore, TxnId startedAfter)
     {
         return keys().stream().flatMap(key -> {
-            CommandsForKey forKey = instance.commandsForKey(key);
+            CommandsForKey forKey = commandStore.commandsForKey(key);
             return forKey.uncommitted.tailMap(startedAfter, false).values().stream();
         });
     }
 
-    public Stream<Command> committedExecutesAfter(Instance instance, TxnId startedAfter)
+    public Stream<Command> committedExecutesAfter(CommandStore commandStore, TxnId startedAfter)
     {
         return keys().stream().flatMap(key -> {
-            CommandsForKey forKey = instance.commandsForKey(key);
+            CommandsForKey forKey = commandStore.commandsForKey(key);
             return forKey.committedByExecuteAt.tailMap(startedAfter, false).values().stream();
         });
     }
 
-    public void register(Instance instance, Command command)
+    public void register(CommandStore commandStore, Command command)
     {
-        assert instance == command.instance;
-        keys().forEach(key -> instance.commandsForKey(key).register(command));
+        assert commandStore == command.commandStore;
+        keys().forEach(key -> commandStore.commandsForKey(key).register(command));
     }
 
-    protected Timestamp maxConflict(Instance instance, Keys keys)
+    protected Timestamp maxConflict(CommandStore commandStore, Keys keys)
     {
         return keys.stream()
-                   .map(instance::commandsForKey)
+                   .map(commandStore::commandsForKey)
                    .map(CommandsForKey::max)
                    .max(Comparator.naturalOrder())
                    .orElse(Timestamp.NONE);
diff --git a/accord-core/src/main/java/accord/txn/Writes.java b/accord-core/src/main/java/accord/txn/Writes.java
index 4913e77..42d7a88 100644
--- a/accord-core/src/main/java/accord/txn/Writes.java
+++ b/accord-core/src/main/java/accord/txn/Writes.java
@@ -1,7 +1,7 @@
 package accord.txn;
 
 import accord.api.Write;
-import accord.local.Instance;
+import accord.local.CommandStore;
 
 public class Writes
 {
@@ -16,10 +16,10 @@ public class Writes
         this.write = write;
     }
 
-    public void apply(Instance instance)
+    public void apply(CommandStore commandStore)
     {
         if (write != null)
-            write.apply(instance.shard.range, executeAt, instance.store());
+            write.apply(commandStore.ranges(), executeAt, commandStore.store());
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/utils/IndexedPredicate.java b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
new file mode 100644
index 0000000..358f490
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedPredicate<V>
+{
+    boolean test(int i, V v);
+}
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 3421753..51beff3 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -1,7 +1,11 @@
 package accord;
 
+import accord.api.KeyRange;
 import accord.local.Node;
 import accord.impl.mock.MockStore;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
 import accord.txn.Txn;
 import accord.txn.Keys;
 import com.google.common.base.Preconditions;
@@ -36,6 +40,16 @@ public class Utils
         return rlist;
     }
 
+    public static KeyRanges ranges(KeyRange... ranges)
+    {
+        return new KeyRanges(ranges);
+    }
+
+    public static Shards shards(Shard... shards)
+    {
+        return new Shards(shards);
+    }
+
     public static Txn writeTxn(Keys keys)
     {
         return new Txn(keys, MockStore.READ, MockStore.QUERY, MockStore.UPDATE);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index 8f3821e..7c8444a 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -234,7 +234,6 @@ public class BurnTest
         while (true)
         {
             long seed = ThreadLocalRandom.current().nextLong();
-//            long seed = 5844871443302548687L;
             System.out.println("Seed " + seed);
             Random random = new Random(seed);
             List<Id> clients =  generateIds(true, 1 + random.nextInt(4));
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index d819b94..70ec398 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -2,9 +2,9 @@ package accord.coordinate;
 
 import accord.local.Node;
 import accord.impl.mock.MockCluster;
-import accord.impl.IntKey;
 import accord.api.Result;
 import accord.impl.mock.MockStore;
+import accord.txn.Keys;
 import accord.txn.Txn;
 import accord.txn.TxnId;
 import org.junit.jupiter.api.Assertions;
@@ -12,34 +12,62 @@ import org.junit.jupiter.api.Test;
 
 import static accord.Utils.ids;
 import static accord.Utils.writeTxn;
+import static accord.impl.IntKey.keys;
 
 public class CoordinateTest
 {
     @Test
     void simpleTest() throws Throwable
     {
-        MockCluster cluster = MockCluster.builder().build();
-        Node node = cluster.get(1);
-        Assertions.assertNotNull(node);
+        try (MockCluster cluster = MockCluster.builder().build())
+        {
+            Node node = cluster.get(1);
+            Assertions.assertNotNull(node);
 
-        TxnId txnId = new TxnId(100, 0, node.id());
-        Txn txn = writeTxn(IntKey.keys(10));
-        Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
-        Assertions.assertEquals(MockStore.RESULT, result);
+            TxnId txnId = new TxnId(100, 0, node.id());
+            Txn txn = writeTxn(keys(10));
+            Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
+            Assertions.assertEquals(MockStore.RESULT, result);
+        }
     }
 
     @Test
     void slowPathTest() throws Throwable
     {
-        MockCluster cluster = MockCluster.builder().nodes(7).replication(7).build();
-        cluster.networkFilter.isolate(ids(5, 7));
+        try (MockCluster cluster = MockCluster.builder().nodes(7).replication(7).build())
+        {
+            cluster.networkFilter.isolate(ids(5, 7));
 
-        Node node = cluster.get(1);
-        Assertions.assertNotNull(node);
+            Node node = cluster.get(1);
+            Assertions.assertNotNull(node);
 
-        TxnId txnId = new TxnId(100, 0, node.id());
-        Txn txn = writeTxn(IntKey.keys(10));
+            TxnId txnId = new TxnId(100, 0, node.id());
+            Txn txn = writeTxn(keys(10));
+            Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
+            Assertions.assertEquals(MockStore.RESULT, result);
+        }
+    }
+
+    private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
+    {
+        TxnId txnId = new TxnId(clock, 0, node.id());
+        Txn txn = writeTxn(keys);
         Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
         Assertions.assertEquals(MockStore.RESULT, result);
+        return txnId;
+    }
+
+    @Test
+    void multiKeyTest() throws Throwable
+    {
+        try (MockCluster cluster = MockCluster.builder().nodes(6).maxKey(600).build())
+        {
+            Node node = cluster.get(1);
+            Assertions.assertNotNull(node);
+
+            TxnId txnId1 = coordinate(node, 100, keys(50, 350, 550));
+            TxnId txnId2 = coordinate(node, 150, keys(250, 350, 450));
+            TxnId txnId3 = coordinate(node, 125, keys(50, 60, 70, 80, 350, 550));
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/coordinate/PreacceptTrackerTest.java b/accord-core/src/test/java/accord/coordinate/PreacceptTrackerTest.java
new file mode 100644
index 0000000..d2baf87
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/PreacceptTrackerTest.java
@@ -0,0 +1,135 @@
+package accord.coordinate;
+
+import accord.Utils;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static accord.Utils.shards;
+
+public class PreacceptTrackerTest
+{
+    private static final Node.Id[] ids = Utils.ids(5).toArray(Node.Id[]::new);
+    private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+    private static final Shards topology = TopologyUtils.initialTopology(ids, ranges, 3);
+        /*
+        (000, 100](100, 200](200, 300](300, 400](400, 500]
+        [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
+         */
+
+    private static void assertResponseState(FastPathTracker<?> responses,
+                                            boolean quorumReached,
+                                            boolean fastPathAccepted,
+                                            boolean failed,
+                                            boolean hasOutstandingResponses)
+    {
+        Assertions.assertEquals(quorumReached, responses.hasReachedQuorum());
+        Assertions.assertEquals(fastPathAccepted, responses.hasMetFastPathCriteria());
+        Assertions.assertEquals(failed, responses.hasFailed());
+        Assertions.assertEquals(hasOutstandingResponses, responses.hasInFlight());
+    }
+
+    @Test
+    void singleShard()
+    {
+        Shards subShards = shards(topology.get(0));
+        FastPathTracker responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+        responses.recordSuccess(ids[0], false);
+        assertResponseState(responses, false, false, false, true);
+
+        responses.recordSuccess(ids[1], false);
+        assertResponseState(responses, true, false, false, true);
+
+        responses.recordSuccess(ids[2], false);
+        assertResponseState(responses, true, false, false, false);
+    }
+
+    @Test
+    void singleShardFastPath()
+    {
+        Shards subShards = shards(topology.get(0));
+        FastPathTracker responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+        responses.recordSuccess(ids[0], true);
+        assertResponseState(responses, false, false, false, true);
+
+        responses.recordSuccess(ids[1], true);
+        assertResponseState(responses, true, false, false, true);
+
+        responses.recordSuccess(ids[2], true);
+        assertResponseState(responses, true, true, false, false);
+    }
+
+    /**
+     * responses from unexpected endpoints should be ignored
+     */
+    @Test
+    void unexpectedResponsesAreIgnored()
+    {
+        Shards subShards = shards(topology.get(0));
+        FastPathTracker responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+        responses.recordSuccess(ids[0], false);
+        assertResponseState(responses, false, false, false, true);
+
+        responses.recordSuccess(ids[1], false);
+        assertResponseState(responses, true, false, false, true);
+
+        Assertions.assertFalse(subShards.get(0).nodes.contains(ids[4]));
+        responses.recordSuccess(ids[4], false);
+        assertResponseState(responses, true, false, false, true);
+    }
+
+    @Test
+    void failure()
+    {
+        Shards subShards = shards(topology.get(0));
+        FastPathTracker<?> responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+        responses.recordSuccess(ids[0], true);
+        assertResponseState(responses, false, false, false, true);
+
+        responses.recordFailure(ids[1]);
+        assertResponseState(responses, false, false, false, true);
+
+        responses.recordFailure(ids[2]);
+        assertResponseState(responses, false, false, true, false);
+    }
+
+    @Test
+    void multiShard()
+    {
+        Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+        FastPathTracker<Agree.ShardTracker> responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+        /*
+        (000, 100](100, 200](200, 300]
+        [1, 2, 3] [2, 3, 4] [3, 4, 5]
+         */
+
+        Assertions.assertSame(subShards.get(0), responses.unsafeGet(0).shard);
+        Assertions.assertSame(subShards.get(1), responses.unsafeGet(1).shard);
+        Assertions.assertSame(subShards.get(2), responses.unsafeGet(2).shard);
+
+        responses.recordSuccess(ids[1], true);
+        assertResponseState(responses, false, false, false, true);
+
+        responses.recordSuccess(ids[2], true);
+        assertResponseState(responses, false, false, false, true);
+
+        responses.recordSuccess(ids[3], true);
+        // the middle shard will have reached fast path
+        Assertions.assertTrue(responses.unsafeGet(1).hasMetFastPathCriteria());
+        // but since the others haven't, it won't report it as accepted
+        assertResponseState(responses, true, false, false, true);
+
+        responses.recordSuccess(ids[0], true);
+        responses.recordSuccess(ids[4], true);
+        assertResponseState(responses, true, true, false, false);
+    }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/RecoverTest.java b/accord-core/src/test/java/accord/coordinate/RecoverTest.java
index 3f39b51..02795b4 100644
--- a/accord-core/src/test/java/accord/coordinate/RecoverTest.java
+++ b/accord-core/src/test/java/accord/coordinate/RecoverTest.java
@@ -1,11 +1,9 @@
 package accord.coordinate;
 
-import accord.local.Instance;
 import accord.local.Node;
 import accord.impl.mock.MockCluster;
 import accord.impl.IntKey;
 import accord.api.Key;
-import accord.messages.Timeout;
 import accord.local.*;
 import accord.txn.Keys;
 import accord.messages.PreAccept;
@@ -13,7 +11,6 @@ import accord.txn.Txn;
 import accord.txn.TxnId;
 
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
 
 import java.util.List;
 import java.util.concurrent.CompletionStage;
@@ -26,17 +23,16 @@ import static accord.Utils.writeTxn;
 
 public class RecoverTest
 {
-
-    private static Instance getInstance(Node node, Key key)
+    private static CommandStore getCommandShard(Node node, Key key)
     {
         return node.local(key).orElseThrow();
     }
 
     private static Command getCommand(Node node, Key key, TxnId txnId)
     {
-        Instance instance = getInstance(node, key);
-        Assertions.assertTrue(instance.hasCommand(txnId));
-        return instance.command(txnId);
+        CommandStore commandStore = getCommandShard(node, key);
+        Assertions.assertTrue(commandStore.hasCommand(txnId));
+        return commandStore.command(txnId);
     }
 
     private static void assertStatus(Node node, Key key, TxnId txnId, Status status)
@@ -49,8 +45,8 @@ public class RecoverTest
 
     private static void assertMissing(Node node, Key key, TxnId txnId)
     {
-        Instance instance = getInstance(node, key);
-        Assertions.assertFalse(instance.hasCommand(txnId));
+        CommandStore commandStore = getCommandShard(node, key);
+        Assertions.assertFalse(commandStore.hasCommand(txnId));
     }
 
     private static void assertTimeout(CompletionStage<?> f)
@@ -75,39 +71,41 @@ public class RecoverTest
     void conflictTest() throws Throwable
     {
         Key key = IntKey.key(10);
-        MockCluster cluster = MockCluster.builder().nodes(9).replication(9).build();
-        cluster.networkFilter.isolate(ids(7, 9));
-        cluster.networkFilter.addFilter(anyId(), isId(ids(5, 6)), notMessageType(PreAccept.class));
-
-        TxnId txnId1 = new TxnId(100, 0, id(100));
-        Txn txn1 = writeTxn(Keys.of(key));
-        assertTimeout(Coordinate.execute(cluster.get(1), txnId1, txn1));
-
-        TxnId txnId2 = new TxnId(50, 0, id(101));
-        Txn txn2 = writeTxn(Keys.of(key));
-        cluster.networkFilter.clear();
-        cluster.networkFilter.isolate(ids(1, 7));
-        assertTimeout(Coordinate.execute(cluster.get(9), txnId2, txn2));
-
-        cluster.nodes(ids(1, 4)).forEach(n -> assertStatus(n, key, txnId1, Status.Accepted));
-        cluster.nodes(ids(5, 6)).forEach(n -> assertStatus(n, key, txnId1, Status.PreAccepted));
-        cluster.nodes(ids(7, 9)).forEach(n -> assertMissing(n, key, txnId1));
-
-        cluster.nodes(ids(1, 7)).forEach(n -> assertMissing(n, key, txnId2));
-        cluster.nodes(ids(8, 9)).forEach(n -> assertStatus(n, key, txnId2, Status.PreAccepted));
-
-        //
-        cluster.networkFilter.clear();
-        cluster.networkFilter.isolate(ids(1, 4));
-        Coordinate.recover(cluster.get(8), txnId2, txn2).toCompletableFuture().get();
-
-        List<Node> nodes = cluster.nodes(ids(5, 9));
-        Assertions.assertTrue(txnId2.compareTo(txnId1) < 0);
-        nodes.forEach(n -> assertStatus(n, key, txnId2, Status.Applied));
-        nodes.forEach(n -> {
-            assertStatus(n, key, txnId2, Status.Applied);
-            Command command = getCommand(n, key, txnId2);
-            Assertions.assertEquals(txnId1, command.executeAt());
-        });
+        try (MockCluster cluster = MockCluster.builder().nodes(9).replication(9).build())
+        {
+            cluster.networkFilter.isolate(ids(7, 9));
+            cluster.networkFilter.addFilter(anyId(), isId(ids(5, 6)), notMessageType(PreAccept.class));
+
+            TxnId txnId1 = new TxnId(100, 0, id(100));
+            Txn txn1 = writeTxn(Keys.of(key));
+            assertTimeout(Coordinate.execute(cluster.get(1), txnId1, txn1));
+
+            TxnId txnId2 = new TxnId(50, 0, id(101));
+            Txn txn2 = writeTxn(Keys.of(key));
+            cluster.networkFilter.clear();
+            cluster.networkFilter.isolate(ids(1, 7));
+            assertTimeout(Coordinate.execute(cluster.get(9), txnId2, txn2));
+
+            cluster.nodes(ids(1, 4)).forEach(n -> assertStatus(n, key, txnId1, Status.Accepted));
+            cluster.nodes(ids(5, 6)).forEach(n -> assertStatus(n, key, txnId1, Status.PreAccepted));
+            cluster.nodes(ids(7, 9)).forEach(n -> assertMissing(n, key, txnId1));
+
+            cluster.nodes(ids(1, 7)).forEach(n -> assertMissing(n, key, txnId2));
+            cluster.nodes(ids(8, 9)).forEach(n -> assertStatus(n, key, txnId2, Status.PreAccepted));
+
+            //
+            cluster.networkFilter.clear();
+            cluster.networkFilter.isolate(ids(1, 4));
+            Coordinate.recover(cluster.get(8), txnId2, txn2).toCompletableFuture().get();
+
+            List<Node> nodes = cluster.nodes(ids(5, 9));
+            Assertions.assertTrue(txnId2.compareTo(txnId1) < 0);
+            nodes.forEach(n -> assertStatus(n, key, txnId2, Status.Applied));
+            nodes.forEach(n -> {
+                assertStatus(n, key, txnId2, Status.Applied);
+                Command command = getCommand(n, key, txnId2);
+                Assertions.assertEquals(txnId1, command.executeAt());
+            });
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
new file mode 100644
index 0000000..832274a
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
@@ -0,0 +1,107 @@
+package accord.coordinate.tracking;
+
+import accord.Utils;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static accord.Utils.shards;
+
+public class QuorumTrackerTest
+{
+    private static final Node.Id[] ids = Utils.ids(5).toArray(Node.Id[]::new);
+    private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+    private static final Shards topology = TopologyUtils.initialTopology(ids, ranges, 3);
+        /*
+        (000, 100](100, 200](200, 300](300, 400](400, 500]
+        [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
+         */
+
+    private static void assertResponseState(QuorumTracker responses,
+                                            boolean quorumReached,
+                                            boolean failed,
+                                            boolean hasOutstandingResponses)
+    {
+        Assertions.assertEquals(quorumReached, responses.hasReachedQuorum());
+        Assertions.assertEquals(failed, responses.hasFailed());
+        Assertions.assertEquals(hasOutstandingResponses, responses.hasInFlight());
+    }
+
+    @Test
+    void singleShard()
+    {
+        Shards subShards = shards(topology.get(0));
+        QuorumTracker responses = new QuorumTracker(subShards);
+
+        responses.recordSuccess(ids[0]);
+        assertResponseState(responses, false, false, true);
+
+        responses.recordSuccess(ids[1]);
+        assertResponseState(responses, true, false, true);
+
+        responses.recordSuccess(ids[2]);
+        assertResponseState(responses, true, false, false);
+    }
+
+    /**
+     * responses from unexpected endpoints should be ignored
+     */
+    @Test
+    void unexpectedResponsesAreIgnored()
+    {
+        Shards subShards = shards(topology.get(0));
+        QuorumTracker responses = new QuorumTracker(subShards);
+
+        responses.recordSuccess(ids[0]);
+        assertResponseState(responses, false, false, true);
+
+        responses.recordSuccess(ids[1]);
+        assertResponseState(responses, true, false, true);
+
+        Assertions.assertFalse(subShards.get(0).nodes.contains(ids[4]));
+        responses.recordSuccess(ids[4]);
+        assertResponseState(responses, true, false, true);
+    }
+
+    @Test
+    void failure()
+    {
+        Shards subShards = shards(topology.get(0));
+        QuorumTracker responses = new QuorumTracker(subShards);
+
+        responses.recordSuccess(ids[0]);
+        assertResponseState(responses, false, false, true);
+
+        responses.recordFailure(ids[1]);
+        assertResponseState(responses, false, false, true);
+
+        responses.recordFailure(ids[2]);
+        assertResponseState(responses, false, true, false);
+    }
+
+    @Test
+    void multiShard()
+    {
+        Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+        QuorumTracker responses = new QuorumTracker(subShards);
+        /*
+        (000, 100](100, 200](200, 300]
+        [1, 2, 3] [2, 3, 4] [3, 4, 5]
+         */
+
+        Assertions.assertSame(subShards.get(0), responses.unsafeGet(0).shard);
+        Assertions.assertSame(subShards.get(1), responses.unsafeGet(1).shard);
+        Assertions.assertSame(subShards.get(2), responses.unsafeGet(2).shard);
+
+        responses.recordSuccess(ids[1]);
+        assertResponseState(responses, false, false, true);
+        responses.recordSuccess(ids[2]);
+        assertResponseState(responses, false, false, true);
+        responses.recordSuccess(ids[3]);
+        assertResponseState(responses, true, false, true);
+    }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
new file mode 100644
index 0000000..0513dd1
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
@@ -0,0 +1,131 @@
+package accord.coordinate.tracking;
+
+import accord.Utils;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static accord.Utils.shards;
+
+public class ReadTrackerTest
+{
+    private static final Node.Id[] ids = Utils.ids(5).toArray(Node.Id[]::new);
+    private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+    private static final Shards topology = TopologyUtils.initialTopology(ids, ranges, 3);
+        /*
+        (000, 100](100, 200](200, 300](300, 400](400, 500]
+        [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
+         */
+
+    private static void assertResponseState(ReadTracker responses,
+                                            boolean complete,
+                                            boolean failed)
+    {
+        Assertions.assertEquals(complete, responses.hasCompletedRead());
+        Assertions.assertEquals(failed, responses.hasFailed());
+    }
+
+    @Test
+    void singleShard()
+    {
+        Shards subShards = shards(topology.get(0));
+        ReadTracker tracker = new ReadTracker(subShards);
+
+        tracker.recordInflightRead(ids[0]);
+        assertResponseState(tracker, false, false);
+
+        tracker.recordReadSuccess(ids[0]);
+        assertResponseState(tracker, true, false);
+    }
+
+    @Test
+    void singleShardRetry()
+    {
+        Shards subShards = shards(topology.get(0));
+        ReadTracker tracker = new ReadTracker(subShards);
+
+        tracker.recordInflightRead(ids[0]);
+        assertResponseState(tracker, false, false);
+
+        tracker.recordReadFailure(ids[0]);
+        assertResponseState(tracker, false, false);
+
+        tracker.recordInflightRead(ids[1]);
+        assertResponseState(tracker, false, false);
+
+        tracker.recordReadSuccess(ids[1]);
+        assertResponseState(tracker, true, false);
+    }
+
+    @Test
+    void singleShardFailure()
+    {
+        Shards subShards = shards(topology.get(0));
+        ReadTracker tracker = new ReadTracker(subShards);
+
+        tracker.recordInflightRead(ids[0]);
+        tracker.recordReadFailure(ids[0]);
+        assertResponseState(tracker, false, false);
+
+        tracker.recordInflightRead(ids[1]);
+        tracker.recordReadFailure(ids[1]);
+        assertResponseState(tracker, false, false);
+
+        tracker.recordInflightRead(ids[2]);
+        tracker.recordReadFailure(ids[2]);
+        assertResponseState(tracker, false, true);
+    }
+
+    @Test
+    void multiShardSuccess()
+    {
+        Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+        ReadTracker responses = new ReadTracker(subShards);
+        /*
+        (000, 100](100, 200](200, 300]
+        [1, 2, 3] [2, 3, 4] [3, 4, 5]
+         */
+
+        responses.recordInflightRead(ids[2]);
+        responses.recordReadSuccess(ids[2]);
+        assertResponseState(responses, true, false);
+    }
+
+    @Test
+    void multiShardRetryAndReadSet()
+    {
+        Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+        ReadTracker responses = new ReadTracker(subShards);
+        /*
+        (000, 100](100, 200](200, 300]
+        [1, 2, 3] [2, 3, 4] [3, 4, 5]
+         */
+
+        Assertions.assertEquals(Sets.newHashSet(ids[2]), responses.computeMinimalReadSetAndMarkInflight());
+
+        assertResponseState(responses, false, false);
+
+        responses.recordReadFailure(ids[2]);
+        assertResponseState(responses, false, false);
+
+        Assertions.assertEquals(Sets.newHashSet(ids[1], ids[3]), responses.computeMinimalReadSetAndMarkInflight());
+        assertResponseState(responses, false, false);
+
+        responses.recordReadFailure(ids[1]);
+        Assertions.assertEquals(Sets.newHashSet(ids[0]), responses.computeMinimalReadSetAndMarkInflight());
+
+        responses.recordReadSuccess(ids[3]);
+        assertResponseState(responses, false, false);
+        Assertions.assertEquals(Collections.emptySet(), responses.computeMinimalReadSetAndMarkInflight());
+
+        responses.recordReadSuccess(ids[0]);
+        assertResponseState(responses, true, false);
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 7d1c696..3a044a6 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -6,6 +6,7 @@ import java.util.zip.CRC32C;
 
 import accord.api.Key;
 import accord.api.KeyRange;
+import accord.topology.KeyRanges;
 import accord.txn.Keys;
 
 public class IntHashKey implements Key<IntHashKey>
@@ -16,6 +17,35 @@ public class IntHashKey implements Key<IntHashKey>
         {
             super(start, end);
         }
+
+        @Override
+        public KeyRange<IntHashKey> subRange(IntHashKey start, IntHashKey end)
+        {
+            return new Range(start, end);
+        }
+
+        @Override
+        public KeyRanges split(int count)
+        {
+            int startHash = start().hash;
+            int endHash = end().hash;
+            int currentSize = endHash - startHash;
+            if (currentSize < count)
+                return new KeyRanges(new KeyRange[]{this});
+            int interval =  currentSize / count;
+
+            int last = 0;
+            KeyRange[] ranges = new KeyRange[count];
+            for (int i=0; i<count; i++)
+            {
+                int subStart = i > 0 ? last : startHash;
+                int subEnd = i < count - 1 ? subStart + interval : endHash;
+                ranges[i] = new Range(new IntHashKey(Integer.MIN_VALUE, subStart),
+                                      new IntHashKey(Integer.MIN_VALUE, subEnd));
+                last = subEnd;
+            }
+            return new KeyRanges(ranges);
+        }
     }
 
     public final int key;
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index eb57364..a57f995 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -3,9 +3,11 @@ package accord.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.BiFunction;
 
 import accord.api.Key;
 import accord.api.KeyRange;
+import accord.topology.KeyRanges;
 import accord.txn.Keys;
 
 public class IntKey implements Key<IntKey>
@@ -16,6 +18,39 @@ public class IntKey implements Key<IntKey>
         {
             super(start, end);
         }
+
+        @Override
+        public KeyRange<IntKey> subRange(IntKey start, IntKey end)
+        {
+            return new Range(start, end);
+        }
+
+        @Override
+        public KeyRanges split(int count)
+        {
+            return splitRange(this, count, Range::new);
+        }
+    }
+
+    public static KeyRanges splitRange(KeyRange<IntKey> range, int count, BiFunction<IntKey, IntKey, KeyRange<IntKey>> ctor)
+    {
+        int start = range.start().key;
+        int end = range.end().key;
+        int currentSize = end - start;
+        if (currentSize < count)
+            return new KeyRanges(new KeyRange[]{range});
+        int interval =  currentSize / count;
+
+        int nextStart = start;
+        KeyRange[] ranges = new KeyRange[count];
+        for (int i=0; i<count; i++)
+        {
+            int subStart = nextStart;
+            int subEnd = i < count - 1 ? subStart + interval : end;
+            ranges[i] = ctor.apply(key(subStart), key(subEnd));
+            nextStart = subEnd;
+        }
+        return new KeyRanges(ranges);
     }
 
     public final int key;
diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/test/java/accord/impl/TopologyFactory.java
index ccdde7e..3cdd985 100644
--- a/accord-core/src/test/java/accord/impl/TopologyFactory.java
+++ b/accord-core/src/test/java/accord/impl/TopologyFactory.java
@@ -2,19 +2,17 @@ package accord.impl;
 
 
 import accord.local.Node;
-import accord.local.Node.Id;
 import accord.api.Key;
 import accord.api.KeyRange;
-import accord.topology.Shard;
+import accord.topology.KeyRanges;
 import accord.topology.Shards;
-import accord.utils.WrapAroundList;
-import accord.utils.WrapAroundSet;
 
 import java.util.*;
 
 public class TopologyFactory<K extends Key<K>>
 {
     public final int rf;
+    // TODO: convert to KeyRanges
     final KeyRange<K>[] ranges;
 
     public TopologyFactory(int rf, KeyRange<K>... ranges)
@@ -25,25 +23,7 @@ public class TopologyFactory<K extends Key<K>>
 
     public Shards toShards(Node.Id[] cluster)
     {
-        final Map<Node.Id, Integer> lookup = new HashMap<>();
-        for (int i = 0 ; i < cluster.length ; ++i)
-            lookup.put(cluster[i], i);
-
-        List<WrapAroundList<Id>> electorates = new ArrayList<>();
-        List<Set<Node.Id>> fastPathElectorates = new ArrayList<>();
-
-        for (int i = 0 ; i < cluster.length + rf - 1 ; ++i)
-        {
-            WrapAroundList<Node.Id> electorate = new WrapAroundList<>(cluster, i % cluster.length, (i + rf) % cluster.length);
-            Set<Node.Id> fastPathElectorate = new WrapAroundSet<>(lookup, electorate);
-            electorates.add(electorate);
-            fastPathElectorates.add(fastPathElectorate);
-        }
-
-        final List<Shard> shards = new ArrayList<>();
-        for (int i = 0 ; i < ranges.length ; ++i)
-            shards.add(new Shard(ranges[i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
-        return new Shards(shards.toArray(Shard[]::new));
+        return TopologyUtils.initialTopology(cluster, new KeyRanges(ranges), rf);
     }
 
     public Shards toShards(List<Node.Id> cluster)
diff --git a/accord-core/src/test/java/accord/impl/TopologyUtils.java b/accord-core/src/test/java/accord/impl/TopologyUtils.java
new file mode 100644
index 0000000..ade2863
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/TopologyUtils.java
@@ -0,0 +1,57 @@
+package accord.impl;
+
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import accord.utils.WrapAroundList;
+import accord.utils.WrapAroundSet;
+
+import java.util.*;
+
+public class TopologyUtils
+{
+    public static KeyRanges initialRanges(int num, int maxKey)
+    {
+        int rangeSize = maxKey / num;
+        KeyRange<IntKey>[] ranges = new KeyRange[num];
+        int end = 0;
+        for (int i=0; i<num; i++)
+        {
+            int start = end;
+            end = i == num - 1 ? maxKey : start + rangeSize;
+            ranges[i] = IntKey.range(start, end);
+        }
+        return new KeyRanges(ranges);
+    }
+
+    public static <K extends Key<K>> Shards initialTopology(Node.Id[] cluster, KeyRanges ranges, int rf)
+    {
+        final Map<Node.Id, Integer> lookup = new HashMap<>();
+        for (int i = 0 ; i < cluster.length ; ++i)
+            lookup.put(cluster[i], i);
+
+        List<WrapAroundList<Node.Id>> electorates = new ArrayList<>();
+        List<Set<Node.Id>> fastPathElectorates = new ArrayList<>();
+
+        for (int i = 0 ; i < cluster.length + rf - 1 ; ++i)
+        {
+            WrapAroundList<Node.Id> electorate = new WrapAroundList<>(cluster, i % cluster.length, (i + rf) % cluster.length);
+            Set<Node.Id> fastPathElectorate = new WrapAroundSet<>(lookup, electorate);
+            electorates.add(electorate);
+            fastPathElectorates.add(fastPathElectorate);
+        }
+
+        final List<Shard> shards = new ArrayList<>();
+        for (int i = 0 ; i < ranges.size() ; ++i)
+            shards.add(new Shard(ranges.get(i), electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
+        return new Shards(shards.toArray(Shard[]::new));
+    }
+
+    public static <K extends Key<K>> Shards initialTopology(List<Node.Id> cluster, KeyRanges ranges, int rf)
+    {
+        return initialTopology(cluster.toArray(Node.Id[]::new), ranges, rf);
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/TopologyUtilsTest.java b/accord-core/src/test/java/accord/impl/TopologyUtilsTest.java
new file mode 100644
index 0000000..66f9bea
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/TopologyUtilsTest.java
@@ -0,0 +1,17 @@
+package accord.impl;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static accord.Utils.ranges;
+import static accord.impl.IntKey.range;
+
+public class TopologyUtilsTest
+{
+    @Test
+    void initialRangesTest()
+    {
+        Assertions.assertEquals(ranges(range(0, 100), range(100, 200), range(200, 300)),
+                                TopologyUtils.initialRanges(3, 300));
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 5ffafe9..a00727b 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -17,6 +17,7 @@ import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
@@ -30,7 +31,6 @@ import accord.topology.Shards;
 
 public class Cluster implements Scheduler
 {
-
     final Function<Id, Node> lookup;
     final PendingQueue pending;
     final Consumer<Packet> responseSink;
@@ -143,23 +143,30 @@ public class Cluster implements Scheduler
     {
         Shards shards = topologyFactory.toShards(nodes);
         Map<Id, Node> lookup = new HashMap<>();
-        Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
-        for (Id node : nodes)
-            lookup.put(node, new Node(node, shards, shards.forNode(node), sinks.create(node, randomSupplier.get()),
-                                      randomSupplier.get(), nowSupplier.get(), ListStore::new, ListAgent.INSTANCE, sinks));
-
-        List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
-        sinks.recurring(() ->
+        try
         {
-            Collections.shuffle(nodesList, randomSupplier.get());
-            int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
-            sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
-        }, 5L, TimeUnit.SECONDS);
-
-        Packet next;
-        while ((next = in.get()) != null)
-            sinks.add(next);
-
-        while (sinks.processPending());
+            Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
+            for (Id node : nodes)
+                lookup.put(node, new Node(node, shards, sinks.create(node, randomSupplier.get()), randomSupplier.get(),
+                                          nowSupplier.get(), ListStore::new, ListAgent.INSTANCE, sinks, CommandStore.Factory.SYNCHRONIZED));
+
+            List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
+            sinks.recurring(() ->
+                            {
+                                Collections.shuffle(nodesList, randomSupplier.get());
+                                int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
+                                sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
+                            }, 5L, TimeUnit.SECONDS);
+
+            Packet next;
+            while ((next = in.get()) != null)
+                sinks.add(next);
+
+            while (sinks.processPending());
+        }
+        finally
+        {
+            lookup.values().forEach(Node::shutdown);
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index 0482567..f48877e 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -6,10 +6,10 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import accord.coordinate.Timeout;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.MessageSink;
-import accord.messages.Timeout;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.Request;
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index f70ffeb..57c5317 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -1,6 +1,7 @@
 package accord.impl.list;
 
 import accord.api.*;
+import accord.topology.KeyRanges;
 import accord.txn.Keys;
 
 import static java.lang.Math.max;
@@ -15,15 +16,20 @@ public class ListRead implements Read
     }
 
     @Override
-    public Data read(KeyRange range, Store store)
+    public Data read(KeyRanges ranges, Store store)
     {
         ListStore s = (ListStore)store;
         ListData result = new ListData();
-        int lowIdx = range.lowKeyIndex(keys);
-        if (lowIdx < 0)
-            return result;
-        for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
-            result.put(keys.get(i), s.get(keys.get(i)));
+        for (KeyRange range : ranges)
+        {
+            int lowIdx = range.lowKeyIndex(keys);
+            if (lowIdx < -keys.size())
+                return result;
+            if (lowIdx < 0)
+                continue;
+            for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
+                result.put(keys.get(i), s.get(keys.get(i)));
+        }
         return result;
     }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 20694f1..0f9aeaf 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -8,18 +8,22 @@ import accord.api.Key;
 import accord.api.KeyRange;
 import accord.api.Store;
 import accord.api.Write;
+import accord.topology.KeyRanges;
 import accord.txn.Timestamp;
 import accord.utils.Timestamped;
 
 public class ListWrite extends TreeMap<Key, int[]> implements Write
 {
     @Override
-    public void apply(KeyRange range, Timestamp executeAt, Store store)
+    public void apply(KeyRanges ranges, Timestamp executeAt, Store store)
     {
         ListStore s = (ListStore) store;
-        NavigableMap<Key, int[]> selection = subMap(range.start(), range.startInclusive(),
-                                                    range.end(), range.endInclusive());
-        for (Map.Entry<Key, int[]> e : selection.entrySet())
-            s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+        for (KeyRange range : ranges)
+        {
+            NavigableMap<Key, int[]> selection = subMap(range.start(), range.startInclusive(),
+                                                        range.end(), range.endInclusive());
+            for (Map.Entry<Key, int[]> e : selection.entrySet())
+                s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 8580448..a55b533 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -1,19 +1,20 @@
 package accord.impl.mock;
 
 import accord.NetworkFilter;
+import accord.coordinate.Timeout;
+import accord.impl.TopologyUtils;
+import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.messages.Timeout;
+import accord.topology.KeyRanges;
 import accord.utils.ThreadPoolScheduler;
 import accord.txn.TxnId;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.Request;
-import accord.impl.IntKey;
 import accord.impl.TestAgent;
 import accord.topology.Shards;
 import accord.topology.Topology;
-import accord.impl.TopologyFactory;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,7 +26,7 @@ import java.util.function.LongSupplier;
 
 import static accord.Utils.id;
 
-public class MockCluster implements Network
+public class MockCluster implements Network, AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(MockCluster.class);
 
@@ -46,6 +47,12 @@ public class MockCluster implements Network
         init();
     }
 
+    @Override
+    public void close()
+    {
+        nodes.values().forEach(Node::shutdown);
+    }
+
     private synchronized Id nextNodeId()
     {
         return id(nextNodeId++);
@@ -61,35 +68,34 @@ public class MockCluster implements Network
         return System.currentTimeMillis();
     }
 
-    private Node createNode(Id id, Shards local, Topology topology)
+    private Node createNode(Id id, Topology topology)
     {
         MockStore store = new MockStore();
         return new Node(id,
-                topology,
-                local,
-                new SimpleMessageSink(id, this),
-                new Random(random.nextLong()),
-                this::now,
-                () -> store,
-                new TestAgent(),
-                new ThreadPoolScheduler());
+                        topology,
+                        new SimpleMessageSink(id, this),
+                        new Random(random.nextLong()),
+                        this::now,
+                        () -> store,
+                        new TestAgent(),
+                        new ThreadPoolScheduler(),
+                        CommandStore.Factory.SINGLE_THREAD);
     }
 
     private void init()
     {
-        Preconditions.checkArgument(config.initialNodes == config.replication, "TODO");
         List<Id> ids = new ArrayList<>(config.initialNodes);
         for (int i=0; i<config.initialNodes; i++)
         {
             Id nextId = nextNodeId();
             ids.add(nextId);
         }
-        TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(config.replication, IntKey.range(0, config.maxKey));
-        Shards topology = topologyFactory.toShards(ids);
+        KeyRanges ranges = TopologyUtils.initialRanges(config.initialNodes, config.maxKey);
+        Shards topology = TopologyUtils.initialTopology(ids, ranges, config.replication);
         for (int i=0; i<config.initialNodes; i++)
         {
             Id id = ids.get(i);
-            Node node = createNode(id, topology.forNode(id), topology);
+            Node node = createNode(id, topology);
             nodes.put(id, node);
         }
     }
diff --git a/accord-core/src/test/java/accord/local/CommandStoreTest.java b/accord-core/src/test/java/accord/local/CommandStoreTest.java
new file mode 100644
index 0000000..8974475
--- /dev/null
+++ b/accord-core/src/test/java/accord/local/CommandStoreTest.java
@@ -0,0 +1,92 @@
+package accord.local;
+
+import accord.Utils;
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.impl.TopologyUtils;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Topology;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static accord.impl.IntKey.key;
+
+public class CommandStoreTest
+{
+    @Test
+    void topologyChangeTest()
+    {
+        List<Node.Id> ids = Utils.ids(5);
+        KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+        Topology topology = TopologyUtils.initialTopology(ids, ranges, 3);
+        Topology local = topology.forNode(ids.get(0));
+
+        KeyRanges shards = CommandStores.shardRanges(local.ranges(), 10).get(0);
+        Assertions.assertEquals(ranges(r(0, 10), r(300, 310), r(400, 410)), shards);
+
+        CommandStore commandStore = new CommandStore.Synchronized(0, ids.get(0), null, null, null);
+        commandStore.updateTopology(topology, shards.union(r(350, 360)), KeyRanges.EMPTY);
+        commandStore.commandsForKey(key(355));
+        commandStore.commandsForKey(key(356));
+        commandStore.commandsForKey(key(357));
+
+        Assertions.assertTrue(commandStore.hasCommandsForKey(key(355)));
+        Assertions.assertTrue(commandStore.hasCommandsForKey(key(356)));
+        Assertions.assertTrue(commandStore.hasCommandsForKey(key(357)));
+
+        commandStore.updateTopology(topology, KeyRanges.EMPTY, ranges(r(355, 360)));
+
+        Assertions.assertTrue(commandStore.hasCommandsForKey(key(355)));
+        Assertions.assertFalse(commandStore.hasCommandsForKey(key(356)));
+        Assertions.assertFalse(commandStore.hasCommandsForKey(key(357)));
+    }
+
+    private static Shard[] shards(Topology topology, int... indexes)
+    {
+        Shard[] shards = new Shard[indexes.length];
+
+        for (int i=0; i<indexes.length; i++)
+            shards[i] = topology.get(indexes[i]);
+        return shards;
+    }
+
+    private static void assertMapping(KeyRanges ranges, Shard[] shards, CommandStore.RangeMapping mapping)
+    {
+        Assertions.assertEquals(ranges, mapping.ranges);
+        Assertions.assertArrayEquals(shards, mapping.shards);
+    }
+
+    private static KeyRanges ranges(KeyRange... ranges)
+    {
+        return new KeyRanges(ranges);
+    }
+
+    private static KeyRange<IntKey> r(int start, int end)
+    {
+        return IntKey.range(start, end);
+    }
+
+    @Test
+    void mapRangesTest()
+    {
+        List<Node.Id> ids = Utils.ids(5);
+        KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+        Topology topology = TopologyUtils.initialTopology(ids, ranges, 3);
+        Topology local = topology.forNode(ids.get(0));
+
+        KeyRanges shards = CommandStores.shardRanges(local.ranges(), 10).get(0);
+        Assertions.assertEquals(ranges(r(0, 10), r(300, 310), r(400, 410)), shards);
+
+        assertMapping(shards, shards(local, 0, 1, 2),
+                      CommandStore.mapRanges(shards, local));
+        assertMapping(ranges(r(0, 10), r(300, 310), r(390, 400), r(400, 410)), shards(local, 0, 1, 1, 2),
+                      CommandStore.mapRanges(ranges(r(0, 10), r(300, 310), r(390, 410)), local));
+
+        assertMapping(ranges(r(0, 10), r(300, 310), r(350, 360), r(400, 410)), shards(local, 0, 1, 1, 2),
+                      CommandStore.mapRanges(ranges(r(0, 10), r(300, 310), r(350, 360), r(400, 410)), local));
+
+    }
+}
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 3082f70..cdba24b 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -1,6 +1,5 @@
 package accord.messages;
 
-import accord.local.Instance;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.MessageSink;
@@ -42,7 +41,8 @@ public class PreAcceptTest
         Random random = new Random();
         MockStore store = new MockStore();
         Scheduler scheduler = new ThreadPoolScheduler();
-        return new Node(nodeId, TOPOLOGY, TOPOLOGY.forNode(nodeId), messageSink, random, clock, () -> store, new TestAgent(), scheduler);
+        return new Node(nodeId, TOPOLOGY, messageSink, random, clock, () -> store,
+                        new TestAgent(), scheduler, CommandStore.Factory.SINGLE_THREAD);
     }
 
     @Test
@@ -52,23 +52,30 @@ public class PreAcceptTest
         Clock clock = new Clock(100);
         Node node = createNode(ID1, messageSink, clock);
 
-        IntKey key = IntKey.key(10);
-        Instance instance = node.local(key).orElseThrow();
-        Assertions.assertFalse(instance.hasCommandsForKey(key));
-
-        TxnId txnId = clock.idForNode(ID2);
-        Txn txn = writeTxn(Keys.of(key));
-        PreAccept preAccept = new PreAccept(txnId, txn);
-        clock.increment(10);
-        preAccept.process(node, ID2, 0);
-
-        Command command = instance.commandsForKey(key).uncommitted.get(txnId);
-        Assertions.assertEquals(Status.PreAccepted, command.status());
-
-        messageSink.assertHistorySizes(0, 1);
-        Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
-        Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, new Dependencies()),
-                                messageSink.responses.get(0).payload);
+        try
+        {
+            IntKey key = IntKey.key(10);
+            CommandStore commandStore = node.local(key).orElseThrow();
+            Assertions.assertFalse(commandStore.hasCommandsForKey(key));
+
+            TxnId txnId = clock.idForNode(ID2);
+            Txn txn = writeTxn(Keys.of(key));
+            PreAccept preAccept = new PreAccept(txnId, txn);
+            clock.increment(10);
+            preAccept.process(node, ID2, 0);
+
+            Command command = commandStore.commandsForKey(key).uncommitted.get(txnId);
+            Assertions.assertEquals(Status.PreAccepted, command.status());
+
+            messageSink.assertHistorySizes(0, 1);
+            Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
+            Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, new Dependencies()),
+                                    messageSink.responses.get(0).payload);
+        }
+        finally
+        {
+            node.shutdown();
+        }
     }
 
     @Test
@@ -77,15 +84,21 @@ public class PreAcceptTest
         RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
         Clock clock = new Clock(100);
         Node node = createNode(ID1, messageSink, clock);
-
-        IntKey key = IntKey.key(10);
-        Instance instance = node.local(key).orElseThrow();
-        Assertions.assertFalse(instance.hasCommandsForKey(key));
-
-        TxnId txnId = clock.idForNode(ID2);
-        Txn txn = writeTxn(Keys.of(key));
-        PreAccept preAccept = new PreAccept(txnId, txn);
-        preAccept.process(node, ID2, 0);
+        try
+        {
+            IntKey key = IntKey.key(10);
+            CommandStore commandStore = node.local(key).orElseThrow();
+            Assertions.assertFalse(commandStore.hasCommandsForKey(key));
+
+            TxnId txnId = clock.idForNode(ID2);
+            Txn txn = writeTxn(Keys.of(key));
+            PreAccept preAccept = new PreAccept(txnId, txn);
+            preAccept.process(node, ID2, 0);
+        }
+        finally
+        {
+            node.shutdown();
+        }
     }
 
     @Test
@@ -99,22 +112,29 @@ public class PreAcceptTest
         RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
         Clock clock = new Clock(100);
         Node node = createNode(ID1, messageSink, clock);
-
-        IntKey key1 = IntKey.key(10);
-        PreAccept preAccept1 = new PreAccept(clock.idForNode(ID2), writeTxn(Keys.of(key1)));
-        preAccept1.process(node, ID2, 0);
-
-        messageSink.clearHistory();
-        IntKey key2 = IntKey.key(11);
-        TxnId txnId2 = new TxnId(50, 0, ID3);
-        PreAccept preAccept2 = new PreAccept(txnId2, writeTxn(Keys.of(key1, key2)));
-        clock.increment(10);
-        preAccept2.process(node, ID3, 0);
-
-        messageSink.assertHistorySizes(0, 1);
-        Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
-        Dependencies expectedDeps = new Dependencies();
-        Assertions.assertEquals(new PreAccept.PreAcceptOk(new TxnId(110, 0, ID1), expectedDeps),
-                                messageSink.responses.get(0).payload);
+        try
+        {
+
+            IntKey key1 = IntKey.key(10);
+            PreAccept preAccept1 = new PreAccept(clock.idForNode(ID2), writeTxn(Keys.of(key1)));
+            preAccept1.process(node, ID2, 0);
+
+            messageSink.clearHistory();
+            IntKey key2 = IntKey.key(11);
+            TxnId txnId2 = new TxnId(50, 0, ID3);
+            PreAccept preAccept2 = new PreAccept(txnId2, writeTxn(Keys.of(key1, key2)));
+            clock.increment(10);
+            preAccept2.process(node, ID3, 0);
+
+            messageSink.assertHistorySizes(0, 1);
+            Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
+            Dependencies expectedDeps = new Dependencies();
+            Assertions.assertEquals(new PreAccept.PreAcceptOk(new TxnId(110, 0, ID1), expectedDeps),
+                                    messageSink.responses.get(0).payload);
+        }
+        finally
+        {
+            node.shutdown();
+        }
     }
 }
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java b/accord-core/src/test/java/accord/topology/TopologyTest.java
index 76a1fa9..154e0f6 100644
--- a/accord-core/src/test/java/accord/topology/TopologyTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -18,7 +18,6 @@ import static accord.impl.IntKey.range;
 
 public class TopologyTest
 {
-
     private static void assertRangeForKey(Topology topology, int key, int start, int end)
     {
         Key expectedKey = key(key);
@@ -55,12 +54,28 @@ public class TopologyTest
         return IntKey.range(start, end);
     }
 
+    private static void assertNoRangeForKey(Topology topology, int key)
+    {
+        try
+        {
+            topology.forKey(key(key));
+            Assertions.fail("Expected exception");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // noop
+        }
+    }
+
     @Test
     void forKeyTest()
     {
-        Topology topology = topology(r(0, 100), r(100, 200), r(200, 300));
+        Topology topology = topology(r(0, 100), r(100, 200), r(300, 400));
+        assertNoRangeForKey(topology, -50);
         assertRangeForKey(topology, 50, 0, 100);
         assertRangeForKey(topology, 100, 0, 100);
+        assertNoRangeForKey(topology, 250);
+        assertRangeForKey(topology, 350, 300, 400);
     }
 
     @Test
@@ -68,10 +83,4 @@ public class TopologyTest
     {
 
     }
-
-    @Test
-    void sequentialRanges()
-    {
-        // TODO: confirm non-sequential ranges are handled properly
-    }
 }
diff --git a/accord-core/src/test/java/accord/utils/KeyRangeTest.java b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
index b6ce4a4..b14efa4 100644
--- a/accord-core/src/test/java/accord/utils/KeyRangeTest.java
+++ b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
@@ -3,6 +3,7 @@ package accord.utils;
 import accord.api.Key;
 import accord.api.KeyRange;
 import accord.impl.IntKey;
+import accord.topology.KeyRanges;
 import accord.txn.Keys;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -14,14 +15,59 @@ public class KeyRangeTest
         return new IntKey(v);
     }
 
+    private static KeyRange<IntKey> r(int start, int end)
+    {
+        return IntKey.range(start, end);
+    }
+
+    private static class EndInclusiveIntRange extends KeyRange.EndInclusive<IntKey>
+    {
+        public EndInclusiveIntRange(IntKey start, IntKey end)
+        {
+            super(start, end);
+        }
+
+        @Override
+        public KeyRange<IntKey> subRange(IntKey start, IntKey end)
+        {
+            return new EndInclusiveIntRange(start, end);
+        }
+
+        @Override
+        public KeyRanges split(int count)
+        {
+            return IntKey.splitRange(this, count, EndInclusiveIntRange::new);
+        }
+    }
+
+    private static class StartInclusiveIntRange extends KeyRange.StartInclusive<IntKey>
+    {
+        public StartInclusiveIntRange(IntKey start, IntKey end)
+        {
+            super(start, end);
+        }
+
+        @Override
+        public KeyRange<IntKey> subRange(IntKey start, IntKey end)
+        {
+            return new StartInclusiveIntRange(start, end);
+        }
+
+        @Override
+        public KeyRanges split(int count)
+        {
+            return IntKey.splitRange(this, count, StartInclusiveIntRange::new);
+        }
+    }
+
     static KeyRange<IntKey> rangeEndIncl(int start, int end)
     {
-        return new KeyRange.EndInclusive<>(k(start), k(end)) {};
+        return new EndInclusiveIntRange(k(start), k(end));
     }
 
     static KeyRange<IntKey> rangeStartIncl(int start, int end)
     {
-        return new KeyRange.StartInclusive<>(k(start), k(end)) {};
+        return new StartInclusiveIntRange(k(start), k(end));
     }
 
     static Keys keys(int... values)
@@ -108,7 +154,7 @@ public class KeyRangeTest
         assertHigherKeyIndex(7, rangeStartIncl(20, 25), keys);
     }
 
-    private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+    private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys, int lowerBound, int upperBound)
     {
         if (expectedIdx >= 0 && expectedIdx < keys.size())
         {
@@ -116,14 +162,20 @@ public class KeyRangeTest
         }
         else
         {
-            Assertions.assertFalse(range.containsKey(keys.get(0)));
-            Assertions.assertFalse(range.containsKey(keys.get(keys.size() - 1)));
+            Assertions.assertFalse(range.containsKey(keys.get(lowerBound)));
+            Assertions.assertFalse(range.containsKey(keys.get(upperBound - 1)));
         }
 
-        int actualIdx = range.lowKeyIndex(keys);
+        int actualIdx = range.lowKeyIndex(keys, lowerBound, upperBound);
         Assertions.assertEquals(expectedIdx, actualIdx);
     }
 
+    private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+    {
+
+        assertLowKeyIndex(expectedIdx, range, keys, 0, keys.size());
+    }
+
     @Test
     void lowKeyIndexTest()
     {
@@ -142,9 +194,98 @@ public class KeyRangeTest
         assertLowKeyIndex(6, rangeEndIncl(15, 20), keys);
         assertLowKeyIndex(5, rangeStartIncl(15, 20), keys);
 
-        assertLowKeyIndex(7, rangeEndIncl(16, 20), keys);
+        assertLowKeyIndex(-8, rangeEndIncl(16, 20), keys);
         assertLowKeyIndex(6, rangeStartIncl(16, 20), keys);
-        assertLowKeyIndex(7, rangeEndIncl(20, 25), keys);
-        assertLowKeyIndex(7, rangeStartIncl(20, 25), keys);
+        assertLowKeyIndex(-8, rangeEndIncl(20, 25), keys);
+        assertLowKeyIndex(-8, rangeStartIncl(20, 25), keys);
+
+        // non-intersecting
+        assertLowKeyIndex(-2, rangeStartIncl(12, 14), keys(10, 16));
+        assertLowKeyIndex(-2, rangeStartIncl(12, 14), keys(10, 16, 18), 1, 3);
+        assertLowKeyIndex(-3, rangeStartIncl(12, 14), keys(10, 16, 18), 2, 3);
+    }
+
+    @Test
+    void fullyContainsTest()
+    {
+        Assertions.assertTrue(r(100, 200).fullyContains(r(100, 200)));
+        Assertions.assertTrue(r(100, 200).fullyContains(r(150, 200)));
+        Assertions.assertTrue(r(100, 200).fullyContains(r(100, 150)));
+        Assertions.assertTrue(r(100, 200).fullyContains(r(125, 175)));
+
+        Assertions.assertFalse(r(100, 200).fullyContains(r(50, 60)));
+        Assertions.assertFalse(r(100, 200).fullyContains(r(100, 250)));
+        Assertions.assertFalse(r(100, 200).fullyContains(r(150, 250)));
+        Assertions.assertFalse(r(100, 200).fullyContains(r(50, 200)));
+        Assertions.assertFalse(r(100, 200).fullyContains(r(50, 150)));
+        Assertions.assertFalse(r(100, 200).fullyContains(r(250, 260)));
+    }
+
+    @Test
+    void compareIntersectingTest()
+    {
+        Assertions.assertEquals(1, r(100, 200).compareIntersecting(r(0, 100)));
+        Assertions.assertEquals(1, r(100, 200).compareIntersecting(r(0, 99)));
+
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(0, 101)));
+
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(99, 199)));
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(99, 200)));
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(99, 201)));
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(101, 199)));
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(125, 175)));
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(100, 201)));
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(101, 201)));
+
+        Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(199, 300)));
+
+        Assertions.assertEquals(-1, r(100, 200).compareIntersecting(r(200, 300)));
+        Assertions.assertEquals(-1, r(100, 200).compareIntersecting(r(201, 300)));
+    }
+
+    private static void assertIntersection(KeyRange<IntKey> expected, KeyRange<IntKey> a, KeyRange<IntKey> b)
+    {
+        Assertions.assertEquals(expected, a.intersection(b));
+        Assertions.assertEquals(expected, b.intersection(a));
+    }
+
+    @Test
+    void intersectionTest()
+    {
+        assertIntersection(r(25, 75), r(0, 75), r(25, 100));
+        assertIntersection(r(0, 75), r(0, 75), r(0, 100));
+        assertIntersection(r(25, 100), r(0, 100), r(25, 100));
+        assertIntersection(r(25, 75), r(0, 100), r(25, 75));
+        assertIntersection(r(0, 100), r(0, 100), r(0, 100));
+    }
+
+    @Test
+    void intersectsTest()
+    {
+        KeyRange range = r(100, 200);
+        Assertions.assertTrue(range.intersects(keys(50, 150, 250)));
+        Assertions.assertTrue(range.intersects(keys(150, 250)));
+        Assertions.assertTrue(range.intersects(keys(50, 150)));
+
+        Assertions.assertFalse(range.intersects(keys()));
+        Assertions.assertFalse(range.intersects(keys(50, 75)));
+        Assertions.assertFalse(range.intersects(keys(50, 75, 250, 300)));
+        Assertions.assertFalse(range.intersects(keys(250, 300)));
+    }
+
+    @Test
+    void tryMergeTest()
+    {
+        // touching
+        Assertions.assertEquals(r(0, 100), r(0, 50).tryMerge(r(50, 100)));
+        Assertions.assertEquals(r(0, 100), r(50, 100).tryMerge(r(0, 50)));
+
+        // intersecting
+        Assertions.assertEquals(r(0, 100), r(0, 75).tryMerge(r(25, 100)));
+        Assertions.assertEquals(r(0, 100), r(25, 100).tryMerge(r(0, 75)));
+
+        // can't merge
+        Assertions.assertNull(r(0, 40).tryMerge(r(60, 100)));
+        Assertions.assertNull(r(60, 100).tryMerge(r(0, 40)));
     }
 }
diff --git a/accord-core/src/test/java/accord/utils/KeyRangesTest.java b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
index 4b3990e..421a73f 100644
--- a/accord-core/src/test/java/accord/utils/KeyRangesTest.java
+++ b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
@@ -28,4 +28,56 @@ public class KeyRangesTest
         Assertions.assertEquals(1, ranges.rangeIndexForKey(IntKey.key(350)));
         Assertions.assertEquals(-3, ranges.rangeIndexForKey(IntKey.key(450)));
     }
+
+    @Test
+    void differenceTest()
+    {
+        Assertions.assertEquals(ranges(r(100, 125), r(175, 200)),
+                                ranges(r(100, 200)).difference(
+                                        ranges(r(125, 175))));
+        Assertions.assertEquals(ranges(r(125, 175)),
+                                ranges(r(100, 200)).difference(
+                                        ranges(r(100, 125), r(175, 200))));
+        Assertions.assertEquals(ranges(r(100, 175)),
+                                ranges(r(100, 200)).difference(
+                                        ranges(r(0, 75), r(175, 200))));
+        Assertions.assertEquals(ranges(r(100, 200)),
+                                ranges(r(100, 200)).difference(
+                                        ranges(r(0, 75), r(200, 205))));
+
+        Assertions.assertEquals(ranges(r(125, 175), r(300, 350)),
+                                ranges(r(100, 200), r(250, 350)).difference(
+                                        ranges(r(0, 125), r(175, 300))));
+        Assertions.assertEquals(ranges(r(125, 200), r(300, 350)),
+                                ranges(r(100, 200), r(250, 350)).difference(
+                                        ranges(r(0, 125), r(225, 300))));
+
+        Assertions.assertEquals(ranges(r(125, 135), r(140, 160), r(175, 200)),
+                                ranges(r(100, 200)).difference(
+                                        ranges(r(0, 125), r(135, 140), r(160, 170), r(170, 175))));
+    }
+
+    @Test
+    void addTest()
+    {
+        Assertions.assertEquals(ranges(r(0, 50), r(50, 100), r(100, 150), r(150, 200)),
+                                ranges(r(0, 50), r(100, 150)).union(ranges(r(50, 100), r(150, 200))));
+
+        Assertions.assertThrows(IllegalArgumentException.class, () -> ranges(r(0, 50)).union(ranges(r(25, 75))));
+    }
+
+    @Test
+    void mergeTouchingTest()
+    {
+        Assertions.assertEquals(ranges(r(0, 400)), ranges(r(0, 100), r(100, 200), r(200, 300), r(300, 400)).mergeTouching());
+        Assertions.assertEquals(ranges(r(0, 200), r(300, 400)), ranges(r(0, 100), r(100, 200), r(300, 400)).mergeTouching());
+        Assertions.assertEquals(ranges(r(0, 100), r(200, 400)), ranges(r(0, 100), r(200, 300), r(300, 400)).mergeTouching());
+    }
+
+    @Test
+    void selectTest()
+    {
+        KeyRanges testRanges = ranges(r(0, 100), r(100, 200), r(200, 300), r(300, 400), r(400, 500));
+        Assertions.assertEquals(ranges(testRanges.get(1), testRanges.get(3)), testRanges.select(new int[]{1, 3}));
+    }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index d9585a2..e35235b 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -13,10 +13,11 @@ import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import accord.coordinate.Timeout;
+import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.MessageSink;
-import accord.messages.Timeout;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.Request;
@@ -254,23 +255,30 @@ public class Cluster implements Scheduler
     {
         Shards shards = topologyFactory.toShards(nodes);
         Map<Id, Node> lookup = new HashMap<>();
-        Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
-        for (Id node : nodes)
-            lookup.put(node, new Node(node, shards, shards.forNode(node), sinks.create(node, randomSupplier.get()),
-                                      randomSupplier.get(), nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks));
-
-        List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
-        sinks.recurring(() ->
+        try
         {
-            Collections.shuffle(nodesList, randomSupplier.get());
-            int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
-            sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
-        }, 5L, TimeUnit.SECONDS);
+            Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
+            for (Id node : nodes)
+                lookup.put(node, new Node(node, shards, sinks.create(node, randomSupplier.get()), randomSupplier.get(),
+                                          nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStore.Factory.SINGLE_THREAD));
+
+            List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
+            sinks.recurring(() ->
+                            {
+                                Collections.shuffle(nodesList, randomSupplier.get());
+                                int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
+                                sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
+                            }, 5L, TimeUnit.SECONDS);
 
-        Packet next;
-        while ((next = in.get()) != null)
-            sinks.add(next);
+            Packet next;
+            while ((next = in.get()) != null)
+                sinks.add(next);
 
-        while (sinks.processPending());
+            while (sinks.processPending());
+        }
+        finally
+        {
+            lookup.values().forEach(Node::shutdown);
+        }
     }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index 208df48..c557016 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -4,6 +4,9 @@ import java.io.IOException;
 
 import accord.api.Key;
 import accord.api.KeyRange;
+import accord.topology.KeyRanges;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 import com.google.gson.TypeAdapter;
 import com.google.gson.stream.JsonReader;
 import com.google.gson.stream.JsonWriter;
@@ -16,6 +19,36 @@ public class MaelstromKey extends Datum<MaelstromKey> implements Key<MaelstromKe
         {
             super(start, end);
         }
+
+        @Override
+        public KeyRange<MaelstromKey> subRange(MaelstromKey start, MaelstromKey end)
+        {
+            return new Range(start, end);
+        }
+
+        @Override
+        public KeyRanges split(int count)
+        {
+            Preconditions.checkArgument(start().kind == Kind.HASH);
+            Preconditions.checkArgument(end().kind == Kind.HASH);
+            long startHash = ((Hash) start().value).hash;
+            long endHash = end().value != null ? ((Hash) end().value).hash : Integer.MAX_VALUE;
+            long currentSize = endHash - startHash;
+            if (currentSize < count)
+                return new KeyRanges(new KeyRange[]{this});
+            long interval =  currentSize / count;
+
+            long subEnd = 0;
+            KeyRange[] ranges = new KeyRange[count];
+            for (int i=0; i<count; i++)
+            {
+                long subStart = i > 0 ? subEnd : startHash;
+                subEnd = i < count - 1 ? subStart + interval : endHash;
+                ranges[i] = new Range(new MaelstromKey(Kind.HASH, new Hash(Ints.checkedCast(subStart))),
+                                      i < count - 1 ? new MaelstromKey(Kind.HASH, new Hash(Ints.checkedCast(subEnd))) : end());
+            }
+            return new KeyRanges(ranges);
+        }
     }
 
     public MaelstromKey(Kind kind, Object value)
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
index 7773ba8..142532e 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -1,6 +1,7 @@
 package accord.maelstrom;
 
 import accord.api.*;
+import accord.topology.KeyRanges;
 import accord.txn.Keys;
 
 import static java.lang.Math.max;
@@ -15,15 +16,20 @@ public class MaelstromRead implements Read
     }
 
     @Override
-    public Data read(KeyRange range, Store store)
+    public Data read(KeyRanges ranges, Store store)
     {
         MaelstromStore s = (MaelstromStore)store;
         MaelstromData result = new MaelstromData();
-        int lowIdx = range.lowKeyIndex(keys);
-        if (lowIdx < 0)
-            return result;
-        for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
-            result.put(keys.get(i), s.get(keys.get(i)));
+        for (KeyRange range : ranges)
+        {
+            int lowIdx = range.lowKeyIndex(keys);
+            if (lowIdx < -keys.size())
+                return result;
+            if (lowIdx < 0)
+                continue;
+            for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
+                result.put(keys.get(i), s.get(keys.get(i)));
+        }
         return result;
     }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index 31ec395..b2216aa 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -8,18 +8,22 @@ import accord.api.Key;
 import accord.api.KeyRange;
 import accord.api.Store;
 import accord.api.Write;
+import accord.topology.KeyRanges;
 import accord.txn.Timestamp;
 import accord.utils.Timestamped;
 
 public class MaelstromWrite extends TreeMap<Key, Value> implements Write
 {
     @Override
-    public void apply(KeyRange range, Timestamp executeAt, Store store)
+    public void apply(KeyRanges ranges, Timestamp executeAt, Store store)
     {
         MaelstromStore s = (MaelstromStore) store;
-        NavigableMap<Key, Value> selection = subMap(range.start(), range.startInclusive(),
-                                                    range.end(), range.endInclusive());
-        for (Map.Entry<Key, Value> e : selection.entrySet())
-            s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+        for (KeyRange range : ranges)
+        {
+            NavigableMap<Key, Value> selection = subMap(range.start(), range.startInclusive(),
+                                                        range.end(), range.endInclusive());
+            for (Map.Entry<Key, Value> e : selection.entrySet())
+                s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+        }
     }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 19b1ee6..30e0e3a 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -13,10 +13,11 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import accord.coordinate.Timeout;
+import accord.local.CommandStore;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
-import accord.messages.Timeout;
 import accord.utils.ThreadPoolScheduler;
 import accord.maelstrom.Packet.Type;
 import accord.api.MessageSink;
@@ -137,43 +138,50 @@ public class Main
             MaelstromInit init = (MaelstromInit) packet.body;
             shards = topologyFactory.toShards(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
-            on = new Node(init.self, shards, shards.forNode(init.self), sink, new Random(), System::currentTimeMillis, MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler);
+            on = new Node(init.self, shards, sink, new Random(), System::currentTimeMillis, MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStore.Factory.SINGLE_THREAD);
             err.println("Initialized node " + init.self);
             err.flush();
             sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
         }
-        while (true)
+        try
         {
-            String line = in.get();
-            if (line == null)
+            while (true)
             {
-                err.println("Received EOF; terminating");
-                err.flush();
-                scheduler.stop();
-                err.println("Terminated");
+                String line = in.get();
+                if (line == null)
+                {
+                    err.println("Received EOF; terminating");
+                    err.flush();
+                    scheduler.stop();
+                    err.println("Terminated");
+                    err.flush();
+                    return;
+                }
+                err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
                 err.flush();
-                return;
-            }
-            err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
-            err.flush();
-            Packet next = Packet.parse(line);
-            switch (next.body.type)
-            {
-                case txn:
-                    on.receive((MaelstromRequest)next.body, next.src, next.body.msg_id);
-                    break;
-                default:
-                    if (next.body.in_reply_to > Body.SENTINEL_MSG_ID)
-                    {
-                        Reply reply = (Reply)((Wrapper)next.body).body;
-                        CallbackInfo callback = reply.isFinal() ? sink.callbacks.remove(next.body.in_reply_to)
-                                                                : sink.callbacks.get(next.body.in_reply_to);
-                        if (callback != null)
-                            scheduler.now(() -> callback.callback.onSuccess(next.src, reply));
-                    }
-                    else on.receive((Request)((Wrapper)next.body).body, next.src, next.body.msg_id);
+                Packet next = Packet.parse(line);
+                switch (next.body.type)
+                {
+                    case txn:
+                        on.receive((MaelstromRequest)next.body, next.src, next.body.msg_id);
+                        break;
+                    default:
+                        if (next.body.in_reply_to > Body.SENTINEL_MSG_ID)
+                        {
+                            Reply reply = (Reply)((Wrapper)next.body).body;
+                            CallbackInfo callback = reply.isFinal() ? sink.callbacks.remove(next.body.in_reply_to)
+                                    : sink.callbacks.get(next.body.in_reply_to);
+                            if (callback != null)
+                                scheduler.now(() -> callback.callback.onSuccess(next.src, reply));
+                        }
+                        else on.receive((Request)((Wrapper)next.body).body, next.src, next.body.msg_id);
+                }
             }
         }
+        finally
+        {
+            on.shutdown();
+        }
     }
 
     public static void main(String[] args) throws IOException

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org