You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/11/22 10:08:29 UTC

[cassandra-accord] branch trunk updated: Refactor response tracking to improve efficiency and clarity; introduce dedicated property tests; re-activate fast-path during range movements

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05b3376  Refactor response tracking to improve efficiency and clarity; introduce dedicated property tests; re-activate fast-path during range movements
05b3376 is described below

commit 05b33767b538d41620910a417d27ed78ce4f8d9c
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Fri Nov 4 11:35:27 2022 +0000

    Refactor response tracking to improve efficiency and clarity; introduce dedicated property tests; re-activate fast-path during range movements
    
    patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18056
---
 .../src/main/java/accord/api/ProgressLog.java      |   2 +-
 .../src/main/java/accord/api/TopologySorter.java   |  26 ++
 .../java/accord/coordinate/AnyReadCoordinator.java | 115 --------
 .../src/main/java/accord/coordinate/CheckOn.java   |  41 ++-
 .../main/java/accord/coordinate/CheckShards.java   |  22 +-
 .../main/java/accord/coordinate/CollectDeps.java   |   7 +-
 .../main/java/accord/coordinate/Coordinate.java    | 192 +++----------
 .../src/main/java/accord/coordinate/Execute.java   |  38 +--
 .../QuorumTracker.java => Exhausted.java}          |  31 +--
 .../src/main/java/accord/coordinate/FetchData.java |   9 +-
 .../main/java/accord/coordinate/FindHomeKey.java   |   4 +-
 .../src/main/java/accord/coordinate/FindRoute.java |   7 +-
 .../java/accord/coordinate/InformHomeOfTxn.java    |   9 +-
 .../main/java/accord/coordinate/Invalidate.java    |  71 +++--
 .../main/java/accord/coordinate/MaybeRecover.java  |  11 +-
 .../src/main/java/accord/coordinate/Persist.java   |   3 +-
 .../src/main/java/accord/coordinate/Propose.java   |  16 +-
 .../accord/coordinate/QuorumReadCoordinator.java   | 213 ---------------
 .../java/accord/coordinate/ReadCoordinator.java    | 187 +++++++++++++
 .../src/main/java/accord/coordinate/Recover.java   |  81 ++----
 .../java/accord/coordinate/RecoverWithHomeKey.java |  14 +-
 .../java/accord/coordinate/RecoverWithRoute.java   |  22 +-
 .../coordinate/tracking/AbstractQuorumTracker.java | 119 --------
 .../tracking/AbstractResponseTracker.java          | 189 -------------
 .../coordinate/tracking/AbstractTracker.java       | 203 ++++++++++++++
 .../coordinate/tracking/FastPathTracker.java       | 142 ++++++++--
 .../accord/coordinate/tracking/QuorumTracker.java  |  65 ++++-
 .../accord/coordinate/tracking/ReadTracker.java    | 303 ++++++++++++---------
 .../coordinate/tracking/RecoveryTracker.java       |  75 +++++
 .../accord/coordinate/tracking/RequestStatus.java  |   8 +
 .../accord/coordinate/tracking/ShardTracker.java   |  17 ++
 .../java/accord/impl/InMemoryCommandStore.java     |   1 +
 .../java/accord/impl/InMemoryCommandsForKey.java   |   1 +
 .../java/accord/impl/SizeOfIntersectionSorter.java |  51 ++++
 accord-core/src/main/java/accord/local/Node.java   |   4 +-
 .../src/main/java/accord/local/SaveStatus.java     |   1 -
 .../main/java/accord/local/SyncCommandStores.java  |   1 +
 .../java/accord/messages/BeginInvalidation.java    |   6 +-
 .../src/main/java/accord/messages/ReadData.java    |   1 +
 .../src/main/java/accord/topology/Shard.java       |   5 +
 .../main/java/accord/topology/ShardSelection.java  |   7 +
 .../src/main/java/accord/topology/Topologies.java  | 103 ++++---
 .../src/main/java/accord/topology/Topology.java    |  73 +++--
 .../main/java/accord/topology/TopologyManager.java |  35 ++-
 .../IndexedFunction.java}                          |  26 +-
 .../IndexedTriFunction.java}                       |  26 +-
 accord-core/src/test/java/accord/Utils.java        |   4 +-
 .../src/test/java/accord/burn/BurnTest.java        |   2 +-
 .../java/accord/coordinate/CoordinateTest.java     |   7 +-
 .../accord/coordinate/PreAcceptTrackerTest.java    |  18 +-
 .../java/accord/coordinate/TopologyChangeTest.java |  59 ----
 .../tracking/FastPathTrackerReconciler.java        |  59 ++++
 .../tracking/QuorumTrackerReconciler.java          |  56 ++++
 .../coordinate/tracking/QuorumTrackerTest.java     |  40 +--
 .../coordinate/tracking/ReadTrackerReconciler.java |  84 ++++++
 .../coordinate/tracking/ReadTrackerTest.java       |  84 ++++--
 .../tracking/RecoveryTrackerReconciler.java        |  58 ++++
 .../coordinate/tracking/TrackerReconciler.java     | 116 ++++++++
 .../coordinate/tracking/TrackerReconcilerTest.java |  49 ++++
 .../src/test/java/accord/impl/TopologyFactory.java |   8 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   4 +-
 .../test/java/accord/impl/list/ListRequest.java    |   8 +-
 .../src/test/java/accord/impl/mock/EpochSync.java  |   6 +-
 .../test/java/accord/impl/mock/MockCluster.java    |   7 +-
 .../src/test/java/accord/local/CommandTest.java    |   2 +-
 .../test/java/accord/messages/PreAcceptTest.java   |   1 +
 .../java/accord/messages/TxnRequestScopeTest.java  |   5 +-
 .../java/accord/topology/TopologyManagerTest.java  |  16 +-
 .../java/accord/topology/TopologyRandomizer.java   |  27 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |   4 +-
 .../src/main/java/accord/maelstrom/Main.java       |   3 +-
 71 files changed, 1872 insertions(+), 1438 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java b/accord-core/src/main/java/accord/api/ProgressLog.java
index cf535d1..47dfb6a 100644
--- a/accord-core/src/main/java/accord/api/ProgressLog.java
+++ b/accord-core/src/main/java/accord/api/ProgressLog.java
@@ -46,7 +46,7 @@ import accord.primitives.*;
  *
  *  - Non-home shards may also be informed of transactions that are blocking the progress of other transactions.
  *    If the {@code waitingOn} transaction that is blocking progress is uncommitted it is required that the progress
- *    log invoke {@link CheckOn#checkOnUncommitted} for the transaction if no {@link #committed} is witnessed.
+ *    log invoke {@link accord.coordinate.FetchData#fetch} for the transaction if no {@link #committed} is witnessed.
  *
  *  - Members of the home shard will be informed of a transaction to monitor by the invocation of {@link #preaccepted} or
  *    {@link #accepted}. If this is not followed closely by {@link #committed}, {@link accord.coordinate.MaybeRecover} should
diff --git a/accord-core/src/main/java/accord/api/TopologySorter.java b/accord-core/src/main/java/accord/api/TopologySorter.java
new file mode 100644
index 0000000..c3587b2
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/TopologySorter.java
@@ -0,0 +1,26 @@
+package accord.api;
+
+import accord.local.Node;
+import accord.topology.ShardSelection;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+
+public interface TopologySorter
+{
+    interface Supplier
+    {
+        TopologySorter get(Topology topologies);
+        TopologySorter get(Topologies topologies);
+    }
+
+    interface StaticSorter extends Supplier, TopologySorter
+    {
+        default TopologySorter get(Topology topologies) { return this; }
+        default TopologySorter get(Topologies topologies) { return this; }
+    }
+
+    /**
+     * Compare two nodes that occur in some topologies, so that the one that is *least* preferable sorts first
+     */
+    int compare(Node.Id node1, Node.Id node2, ShardSelection shards);
+}
diff --git a/accord-core/src/main/java/accord/coordinate/AnyReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/AnyReadCoordinator.java
deleted file mode 100644
index 0a4bfa3..0000000
--- a/accord-core/src/main/java/accord/coordinate/AnyReadCoordinator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package accord.coordinate;
-
-import java.util.Set;
-
-import accord.coordinate.tracking.ReadTracker;
-import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.Callback;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-
-abstract class AnyReadCoordinator<Reply> implements Callback<Reply>
-{
-    enum Action { Abort, TryAlternative, Accept, Success }
-
-    final Node node;
-    final TxnId txnId;
-    final ReadTracker<ReadShardTracker> tracker;
-    private boolean isDone;
-    private Throwable failure;
-
-    AnyReadCoordinator(Node node, Topologies topologies, TxnId txnId)
-    {
-        this.node = node;
-        this.txnId = txnId;
-        this.tracker = new ReadTracker<>(topologies, ReadShardTracker[]::new, ReadShardTracker::new);
-    }
-
-    void start()
-    {
-        start(tracker.computeMinimalReadSetAndMarkInflight());
-    }
-
-    void start(Set<Id> nodes)
-    {
-        contact(nodes);
-    }
-
-    abstract void contact(Set<Id> nodes);
-    abstract Action process(Id from, Reply reply);
-    abstract void onSuccess();
-    abstract void onFailure(Throwable t);
-
-    @Override
-    public void onSuccess(Id from, Reply reply)
-    {
-        if (isDone)
-            return;
-
-        switch (process(from, reply))
-        {
-            default: throw new IllegalStateException();
-            case Abort:
-                isDone = true;
-                break;
-
-            case TryAlternative:
-                onSlowResponse(from);
-                break;
-
-            case Accept:
-                if (!(tracker.recordReadSuccess(from) && tracker.hasCompletedRead()))
-                    break;
-
-            case Success:
-                isDone = true;
-                onSuccess();
-                if (failure != null)
-                    node.agent().onHandledException(failure); // TODO: introduce dedicated Agent method for this case
-        }
-    }
-
-    @Override
-    public void onSlowResponse(Id from)
-    {
-        tracker.recordSlowRead(from);
-        Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
-        if (readFrom != null)
-            contact(readFrom);
-    }
-
-    @Override
-    public void onFailure(Id from, Throwable failure)
-    {
-        if (isDone)
-            return;
-
-        if (tracker.recordReadFailure(from))
-        {
-            if (this.failure == null) this.failure = failure;
-            else this.failure.addSuppressed(failure);
-
-            Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
-            if (readFrom != null)
-            {
-                contact(readFrom);
-            }
-            else if (tracker.hasFailed())
-            {
-                isDone = true;
-                onFailure(this.failure);
-            }
-        }
-    }
-
-    @Override
-    public void onCallbackFailure(Id from, Throwable failure)
-    {
-        isDone = true;
-        if (this.failure != null)
-            failure.addSuppressed(this.failure);
-        onFailure(failure);
-    }
-}
diff --git a/accord-core/src/main/java/accord/coordinate/CheckOn.java b/accord-core/src/main/java/accord/coordinate/CheckOn.java
index 81e6580..a20b7ff 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckOn.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckOn.java
@@ -76,14 +76,9 @@ public class CheckOn extends CheckShards
     // TODO: many callers only need to consult precisely executeAt.epoch remotely
     public static CheckOn checkOn(Known sufficientStatus, Node node, TxnId txnId, AbstractRoute route, long srcEpoch, long untilLocalEpoch, BiConsumer<? super CheckStatusOkFull, Throwable> callback)
     {
-        CheckOn checkOnCommitted = new CheckOn(node, sufficientStatus, txnId, route, srcEpoch, untilLocalEpoch, callback);
-        checkOnCommitted.start();
-        return checkOnCommitted;
-    }
-
-    public static CheckOn checkOnUncommitted(Node node, TxnId txnId, AbstractRoute route, long untilLocalEpoch, BiConsumer<? super CheckStatusOkFull, Throwable> callback)
-    {
-        return checkOn(ExecutionOrder, node, txnId, route, txnId.epoch, untilLocalEpoch, callback);
+        CheckOn checkOn = new CheckOn(node, sufficientStatus, txnId, route, srcEpoch, untilLocalEpoch, callback);
+        checkOn.start();
+        return checkOn;
     }
 
     protected AbstractRoute route()
@@ -94,23 +89,39 @@ public class CheckOn extends CheckShards
     @Override
     protected boolean isSufficient(Id from, CheckStatusOk ok)
     {
-        return ((CheckStatusOkFull)ok).sufficientFor(route()).compareTo(sufficient) >= 0;
+        KeyRanges rangesForNode = topologies().computeRangesForNode(from);
+        PartialRoute scope = this.route.slice(rangesForNode);
+        return isSufficient(scope, ok);
     }
 
     @Override
-    protected void onDone(Done done, Throwable failure)
+    protected boolean isSufficient(CheckStatusOk ok)
     {
+        return isSufficient(route, ok);
+    }
+
+    protected boolean isSufficient(AbstractRoute scope, CheckStatusOk ok)
+    {
+        return ((CheckStatusOkFull)ok).sufficientFor(scope).compareTo(sufficient) >= 0;
+    }
+
+    @Override
+    protected void onDone(Success success, Throwable failure)
+    {
+        Preconditions.checkState((success == null) != (failure == null));
         if (failure != null)
         {
             callback.accept(null, failure);
         }
-        else if (merged == null || merged.saveStatus == NotWitnessed)
-        {
-            callback.accept(CheckStatusOkFull.NOT_WITNESSED, null);
-        }
         else
         {
-            new OnDone().start();
+            if (success == Success.Success)
+                Preconditions.checkState(isSufficient(merged));
+
+            if (merged.saveStatus == NotWitnessed)
+                callback.accept(CheckStatusOkFull.NOT_WITNESSED, null);
+            else
+                new OnDone().start();
         }
     }
 
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index a642b9b..913db02 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -1,7 +1,5 @@
 package accord.coordinate;
 
-import java.util.Set;
-
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.messages.CheckStatus;
@@ -16,7 +14,7 @@ import accord.topology.Topologies;
  * A result of null indicates the transaction is globally persistent
  * A result of CheckStatusOk indicates the maximum status found for the transaction, which may be used to assess progress
  */
-public abstract class CheckShards extends QuorumReadCoordinator<CheckStatusReply>
+public abstract class CheckShards extends ReadCoordinator<CheckStatusReply>
 {
     final RoutingKeys contactKeys;
 
@@ -44,24 +42,22 @@ public abstract class CheckShards extends QuorumReadCoordinator<CheckStatusReply
     }
 
     @Override
-    protected void contact(Set<Id> nodes)
+    protected void contact(Id id)
     {
-        node.send(nodes, new CheckStatus(txnId, contactKeys, txnId.epoch, untilRemoteEpoch, includeInfo), this);
+        node.send(id, new CheckStatus(txnId, contactKeys, txnId.epoch, untilRemoteEpoch, includeInfo), this);
     }
 
-    protected abstract boolean isSufficient(Id from, CheckStatusOk ok);
+    protected boolean isSufficient(Id from, CheckStatusOk ok) { return isSufficient(ok); }
+    protected abstract boolean isSufficient(CheckStatusOk ok);
 
-    protected Action check(Id from, CheckStatusOk ok)
+    protected Action checkSufficient(Id from, CheckStatusOk ok)
     {
         if (isSufficient(from, ok))
-            return Action.Accept;
+            return Action.Approve;
 
-        return Action.AcceptQuorum;
+        return Action.ApproveIfQuorum;
     }
 
-    @Override
-    protected abstract void onDone(Done done, Throwable failure);
-
     @Override
     protected Action process(Id from, CheckStatusReply reply)
     {
@@ -72,7 +68,7 @@ public abstract class CheckShards extends QuorumReadCoordinator<CheckStatusReply
             if (merged == null) merged = ok;
             else merged = merged.merge(ok);
 
-            return check(from, ok);
+            return checkSufficient(from, ok);
         }
         else
         {
diff --git a/accord-core/src/main/java/accord/coordinate/CollectDeps.java b/accord-core/src/main/java/accord/coordinate/CollectDeps.java
index de55cef..5b615bd 100644
--- a/accord-core/src/main/java/accord/coordinate/CollectDeps.java
+++ b/accord-core/src/main/java/accord/coordinate/CollectDeps.java
@@ -17,6 +17,9 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
 
+import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.coordinate.tracking.RequestStatus.Success;
+
 class CollectDeps implements Callback<GetDepsOk>
 {
     final Node node;
@@ -57,14 +60,14 @@ class CollectDeps implements Callback<GetDepsOk>
             return;
 
         oks.add(ok);
-        if (tracker.success(from))
+        if (tracker.recordSuccess(from) == Success)
             onQuorum();
     }
 
     @Override
     public void onFailure(Id from, Throwable failure)
     {
-        if (tracker.failure(from))
+        if (tracker.recordFailure(from) == Failed)
         {
             isDone = true;
             callback.accept(null, new Timeout(txnId, route.homeKey));
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java b/accord-core/src/main/java/accord/coordinate/Coordinate.java
index a126dc9..588f72c 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordinate.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -18,16 +18,13 @@
 
 package accord.coordinate;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.function.BiConsumer;
 
 import accord.api.Result;
 import accord.coordinate.tracking.FastPathTracker;
+import accord.coordinate.tracking.RequestStatus;
 import accord.primitives.Route;
-import accord.topology.Shard;
 import accord.topology.Topologies;
 import accord.primitives.Ballot;
 import accord.messages.Callback;
@@ -40,7 +37,7 @@ import accord.messages.PreAccept.PreAcceptOk;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.messages.PreAccept.PreAcceptReply;
-import com.google.common.collect.Sets;
+import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
@@ -51,110 +48,19 @@ import static accord.messages.Commit.Invalidate.commitInvalidate;
 /**
  * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
  * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO: dedicated burn test to validate outcomes
  */
 public class Coordinate extends AsyncFuture<Result> implements Callback<PreAcceptReply>, BiConsumer<Result, Throwable>
 {
-    static class ShardTracker extends FastPathTracker.FastPathShardTracker
-    {
-        public ShardTracker(Shard shard)
-        {
-            super(shard);
-        }
-
-        @Override
-        public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
-        {
-            return withFastPathTimestamp && shard.fastPathElectorate.contains(node);
-        }
-
-        @Override
-        public boolean hasMetFastPathCriteria()
-        {
-            return fastPathAccepts >= shard.fastPathQuorumSize;
-        }
-    }
-
-    static class PreacceptTracker extends FastPathTracker<ShardTracker>
-    {
-        volatile long supersedingEpoch = -1;
-        private final boolean fastPathPermitted;
-        private final Set<Id> successes = new HashSet<>();
-        private Set<Id> failures;
-
-        public PreacceptTracker(Topologies topologies, boolean fastPathPermitted)
-        {
-            super(topologies, Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
-            this.fastPathPermitted = fastPathPermitted;
-        }
-
-        public PreacceptTracker(Topologies topologies)
-        {
-            this(topologies, topologies.fastPathPermitted());
-        }
-
-        @Override
-        public boolean failure(Id node)
-        {
-            if (failures == null)
-                failures = new HashSet<>();
-            failures.add(node);
-            return super.failure(node);
-        }
-
-        @Override
-        public void recordSuccess(Id node, boolean withFastPathTimestamp)
-        {
-            successes.add(node);
-            super.recordSuccess(node, withFastPathTimestamp);
-        }
-
-        public void recordSuccess(Id node)
-        {
-            recordSuccess(node, false);
-        }
-
-        public synchronized boolean recordSupersedingEpoch(long epoch)
-        {
-            if (epoch <= supersedingEpoch)
-                return false;
-            supersedingEpoch = epoch;
-            return true;
-        }
-
-        public boolean hasSupersedingEpoch()
-        {
-            return supersedingEpoch > 0;
-        }
-
-        public PreacceptTracker withUpdatedTopologies(Topologies topologies)
-        {
-            PreacceptTracker tracker = new PreacceptTracker(topologies, false);
-            successes.forEach(tracker::recordSuccess);
-            if (failures != null)
-                failures.forEach(tracker::failure);
-            return tracker;
-        }
-
-        @Override
-        public boolean hasMetFastPathCriteria()
-        {
-            return fastPathPermitted && super.hasMetFastPathCriteria();
-        }
-
-        boolean shouldSlowPathAccept()
-        {
-            return (!fastPathPermitted || !hasInFlight()) && hasReachedQuorum();
-        }
-    }
-
     final Node node;
     final TxnId txnId;
     final Txn txn;
     final Route route;
 
-    private PreacceptTracker tracker;
-    private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
+    private FastPathTracker tracker;
     private boolean preAcceptIsDone;
+    private final List<PreAcceptOk> successes;
 
     private Coordinate(Node node, TxnId txnId, Txn txn, Route route)
     {
@@ -163,7 +69,8 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
         this.txn = txn;
         this.route = route;
         Topologies topologies = node.topology().withUnsyncedEpochs(route, txnId, txnId);
-        this.tracker = new PreacceptTracker(topologies);
+        this.tracker = new FastPathTracker(topologies);
+        this.successes = new ArrayList<>(tracker.topologies().estimateUniqueNodes());
     }
 
     private void start()
@@ -186,15 +93,18 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
         if (preAcceptIsDone)
             return;
 
-        if (tracker.failure(from))
+        switch (tracker.recordFailure(from))
         {
-            preAcceptIsDone = true;
-            tryFailure(new Timeout(txnId, route.homeKey));
+            default: throw new AssertionError();
+            case NoChange:
+                break;
+            case Failed:
+                preAcceptIsDone = true;
+                tryFailure(new Timeout(txnId, route.homeKey));
+                break;
+            case Success:
+                onPreAccepted();
         }
-
-        // if no other responses are expected and the slow quorum has been satisfied, proceed
-        if (tracker.shouldSlowPathAccept())
-            onPreAccepted();
     }
 
     @Override
@@ -203,29 +113,6 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
         tryFailure(failure);
     }
 
-    // TODO (soon): I don't think we need to preaccept in later epochs? the Sync logic should take care of it for us,
-    //              since either we haven't synced between a majority and the earlier epochs are still involved for
-    //              later preaccepts, or they have been sync'd and the earlier transactions are known to the later epochs
-    //              (also, we aren't doing this on recovery, and everything works...)
-    private synchronized void onEpochUpdate()
-    {
-        if (!tracker.hasSupersedingEpoch())
-            return;
-        Topologies newTopologies = node.topology().withUnsyncedEpochs(txn.keys(), txnId.epoch, tracker.supersedingEpoch);
-        if (newTopologies.currentEpoch() < tracker.supersedingEpoch)
-            return;
-        Set<Id> previousNodes = tracker.nodes();
-        tracker = tracker.withUpdatedTopologies(newTopologies);
-
-        // send messages to new nodes
-        Set<Id> needMessages = Sets.difference(tracker.nodes(), previousNodes);
-        if (!needMessages.isEmpty())
-            node.send(needMessages, to -> new PreAccept(to, newTopologies, txnId, txn, route), this);
-
-        if (tracker.shouldSlowPathAccept())
-            onPreAccepted();
-    }
-
     public synchronized void onSuccess(Id from, PreAcceptReply reply)
     {
         if (preAcceptIsDone)
@@ -239,40 +126,32 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
         }
 
         PreAcceptOk ok = (PreAcceptOk) reply;
-        preAcceptOks.add(ok);
+        successes.add(ok);
 
         boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
-        tracker.recordSuccess(from, fastPath);
-
-        // TODO: we should only update epoch if we need to in order to reach quorum
-        if (!fastPath && ok.witnessedAt.epoch > txnId.epoch && tracker.recordSupersedingEpoch(ok.witnessedAt.epoch))
-        {
-            node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch);
-            node.topology().awaitEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate);
-        }
-
-        if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || tracker.shouldSlowPathAccept()))
-            onPreAccepted(); // note, can already have invoked onPreAccepted in onEpochUpdate
+        // TODO: update formalisation (and proof), as we do not seek additional pre-accepts from later epochs.
+        //       instead we rely on accept to do our work: a quorum of accept in the later epoch
+        //       and its effect on preaccepted timestamps and the deps it returns create our sync point.
+        if (tracker.recordSuccess(from, fastPath) == RequestStatus.Success)
+            onPreAccepted();
     }
 
-    private void onPreAccepted()
+    private synchronized void onPreAccepted()
     {
-        if (preAcceptIsDone)
-            return;
-
+        Preconditions.checkState(!preAcceptIsDone);
         preAcceptIsDone = true;
-        if (tracker.hasMetFastPathCriteria())
+
+        if (tracker.hasFastPathAccepted())
         {
-            preAcceptIsDone = true;
-            Deps deps = Deps.merge(preAcceptOks, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
+            Deps deps = Deps.merge(successes, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
             Execute.execute(node, txnId, txn, route, txnId, deps, this);
         }
         else
         {
-            Deps deps = Deps.merge(preAcceptOks, ok -> ok.deps);
+            Deps deps = Deps.merge(successes, ok -> ok.deps);
             Timestamp executeAt; {
                 Timestamp accumulate = Timestamp.NONE;
-                for (PreAcceptOk preAcceptOk : preAcceptOks)
+                for (PreAcceptOk preAcceptOk : successes)
                     accumulate = Timestamp.max(accumulate, preAcceptOk.witnessedAt);
                 executeAt = accumulate;
             }
@@ -296,7 +175,12 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
             }
             else
             {
-                node.withEpoch(executeAt.epoch, () -> Propose.propose(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, executeAt, deps, this));
+                node.withEpoch(executeAt.epoch, () -> {
+                    Topologies topologies = tracker.topologies();
+                    if (executeAt.epoch > txnId.epoch)
+                        topologies = node.topology().withUnsyncedEpochs(txn, txnId.epoch, executeAt.epoch);
+                    Propose.propose(node, topologies, Ballot.ZERO, txnId, txn, route, executeAt, deps, this);
+                });
             }
         }
     }
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 9d4103e..3bac876 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -34,10 +34,10 @@ import accord.local.Node.Id;
 import accord.messages.Commit;
 import accord.topology.Topology;
 
-import static accord.coordinate.AnyReadCoordinator.Action.Accept;
+import static accord.coordinate.ReadCoordinator.Action.Approve;
 import static accord.messages.Commit.Kind.Maximal;
 
-class Execute extends AnyReadCoordinator<ReadReply>
+class Execute extends ReadCoordinator<ReadReply>
 {
     final Txn txn;
     final Keys readScope;
@@ -78,26 +78,26 @@ class Execute extends AnyReadCoordinator<ReadReply>
     }
 
     @Override
-    void start(Set<Id> readSet)
+    protected void start(Set<Id> readSet)
     {
         Commit.commitMinimalAndRead(node, applyTo, txnId, txn, route, readScope, executeAt, deps, readSet, this);
     }
 
     @Override
-    void contact(Set<Id> nodes)
+    public void contact(Id to)
     {
-        node.send(nodes, to -> new ReadData(to, tracker.topologies(), txnId, readScope, executeAt), this);
+        node.send(to, new ReadData(to, topologies(), txnId, readScope, executeAt), this);
     }
 
     @Override
-    Action process(Id from, ReadReply reply)
+    protected Action process(Id from, ReadReply reply)
     {
         if (reply.isOk())
         {
             Data next = ((ReadOk) reply).data;
             if (next != null)
                 data = data == null ? next : data.merge(next);
-            return Accept;
+            return Approve;
         }
 
         ReadNack nack = (ReadNack) reply;
@@ -120,19 +120,21 @@ class Execute extends AnyReadCoordinator<ReadReply>
         }
     }
 
-    @Override
-    void onSuccess()
-    {
-        Result result = txn.result(txnId, data);
-        callback.accept(result, null);
-        // avoid re-calculating topologies if it is unchanged
-        Topologies sendTo = txnId.epoch == executeAt.epoch ? applyTo : node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
-        Persist.persist(node, sendTo, applyTo, txnId, route, txn, executeAt, deps, txn.execute(executeAt, data), result);
-    }
 
     @Override
-    public void onFailure(Throwable failure)
+    protected void onDone(Success success, Throwable failure)
     {
-        callback.accept(null, failure);
+        if (failure == null)
+        {
+            Result result = txn.result(txnId, data);
+            callback.accept(result, null);
+            // avoid re-calculating topologies if it is unchanged
+            Topologies sendTo = txnId.epoch == executeAt.epoch ? applyTo : node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
+            Persist.persist(node, sendTo, applyTo, txnId, route, txn, executeAt, deps, txn.execute(executeAt, data), result);
+        }
+        else
+        {
+            callback.accept(null, failure);
+        }
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/Exhausted.java
similarity index 51%
copy from accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to accord-core/src/main/java/accord/coordinate/Exhausted.java
index 428c340..70bc02d 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/Exhausted.java
@@ -16,29 +16,20 @@
  * limitations under the License.
  */
 
-package accord.coordinate.tracking;
+package accord.coordinate;
 
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
-import accord.local.Node;
-import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
+import accord.api.RoutingKey;
+import accord.primitives.TxnId;
 
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
-{
-    public QuorumTracker(Topologies topologies)
-    {
-        super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
-    }
-
-    public QuorumTracker(Topology topology)
-    {
-        super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
-    }
+import javax.annotation.Nullable;
 
-    // return true iff hasReachedQuorum()
-    public boolean success(Node.Id node)
+/**
+ * Thrown when a transaction exceeds its specified timeout for obtaining a result for a client
+ */
+public class Exhausted extends CoordinateFailed
+{
+    public Exhausted(TxnId txnId, @Nullable RoutingKey homeKey)
     {
-        return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
+        super(txnId, homeKey);
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 6037d07..8dd066f 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -67,15 +67,8 @@ public class FetchData
         return fetchInternal(ranges, phase, node, txnId, route.sliceStrict(ranges), executeAt, untilLocalEpoch, callback);
     }
 
-    private static Object fetchInternal(Known target, Node node, TxnId txnId, PartialRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)
+    private static Object fetchInternal(KeyRanges ranges, Known target, Node node, TxnId txnId, PartialRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)
     {
-        KeyRanges ranges = node.topology().localRangesForEpochs(txnId.epoch, untilLocalEpoch);
-        return fetchInternal(ranges, target, node, txnId, route, executeAt, untilLocalEpoch, callback);
-    }
-
-    private static Object fetchInternal(KeyRanges localRanges, Known target, Node node, TxnId txnId, PartialRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)
-    {
-        KeyRanges ranges = node.topology().localRangesForEpochs(txnId.epoch, untilLocalEpoch);
         PartialRoute fetch = route.sliceStrict(ranges);
         long srcEpoch = target == ExecutionOrder || executeAt == null ? txnId.epoch : executeAt.epoch;
         return CheckOn.checkOn(target, node, txnId, fetch, srcEpoch, untilLocalEpoch, (ok, fail) -> {
diff --git a/accord-core/src/main/java/accord/coordinate/FindHomeKey.java b/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
index c7075de..0972c8e 100644
--- a/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
+++ b/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
@@ -30,13 +30,13 @@ public class FindHomeKey extends CheckShards
     }
 
     @Override
-    protected boolean isSufficient(Id from, CheckStatusOk ok)
+    protected boolean isSufficient(CheckStatusOk ok)
     {
         return ok.homeKey != null;
     }
 
     @Override
-    protected void onDone(Done done, Throwable failure)
+    protected void onDone(Success success, Throwable failure)
     {
         if (failure != null) callback.accept(null, failure);
         else callback.accept(merged == null ? null : merged.homeKey, null);
diff --git a/accord-core/src/main/java/accord/coordinate/FindRoute.java b/accord-core/src/main/java/accord/coordinate/FindRoute.java
index 1ce1bb6..721110e 100644
--- a/accord-core/src/main/java/accord/coordinate/FindRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/FindRoute.java
@@ -51,15 +51,16 @@ public class FindRoute extends CheckShards
     }
 
     @Override
-    protected boolean isSufficient(Id from, CheckStatusOk ok)
+    protected boolean isSufficient(CheckStatusOk ok)
     {
         return ok.route instanceof Route;
     }
 
     @Override
-    protected void onDone(Done done, Throwable failure)
+    protected void onDone(Success success, Throwable failure)
     {
         if (failure != null) callback.accept(null, failure);
-        else callback.accept(merged == null || merged.route == null ? null : new Result(merged), null);
+        else if (success == Success.Success) callback.accept(new Result(merged), null);
+        else callback.accept(null, null);
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
index 363e28f..9ab76fc 100644
--- a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
@@ -19,7 +19,7 @@
 package accord.coordinate;
 
 import accord.api.RoutingKey;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.messages.Callback;
@@ -30,6 +30,9 @@ import accord.primitives.TxnId;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
+
 public class InformHomeOfTxn extends AsyncFuture<Void> implements Callback<SimpleReply>
 {
     final TxnId txnId;
@@ -61,7 +64,7 @@ public class InformHomeOfTxn extends AsyncFuture<Void> implements Callback<Simpl
         {
             default:
             case Ok:
-                if (tracker.success(from))
+                if (tracker.onSuccess(null) == Success)
                     trySuccess(null);
                 break;
 
@@ -78,7 +81,7 @@ public class InformHomeOfTxn extends AsyncFuture<Void> implements Callback<Simpl
         else this.failure.addSuppressed(failure);
 
         // TODO: if we fail and have an incorrect topology, trigger refresh
-        if (tracker.failure(from))
+        if (tracker.onFailure(null) == Fail)
             tryFailure(this.failure);
     }
 
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index 1aae425..fa056c8 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -22,11 +22,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.BiConsumer;
 
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.SaveStatus;
 import accord.primitives.*;
 import com.google.common.base.Preconditions;
 
 import accord.api.RoutingKey;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.Status;
@@ -38,25 +39,28 @@ import accord.messages.Callback;
 import accord.topology.Shard;
 
 import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
 import static accord.local.PreLoadContext.contextFor;
-import static accord.local.Status.Accepted;
-import static accord.local.Status.PreAccepted;
+import static accord.local.Status.*;
+import static accord.local.Status.Known.Definition;
 import static accord.messages.Commit.Invalidate.commitInvalidate;
 import static accord.primitives.ProgressToken.INVALIDATED;
 
 public class Invalidate implements Callback<InvalidateReply>
 {
-    final Node node;
-    final Ballot ballot;
-    final TxnId txnId;
-    final RoutingKeys informKeys;
-    final RoutingKey invalidateWithKey;
-    final Status recoverIfAtLeast;
-    final BiConsumer<Outcome, Throwable> callback;
-
-    boolean isDone;
-    final List<InvalidateOk> invalidateOks = new ArrayList<>();
-    final QuorumShardTracker preacceptTracker;
+    private final Node node;
+    private final Ballot ballot;
+    private final TxnId txnId;
+    private final RoutingKeys informKeys;
+    private final RoutingKey invalidateWithKey;
+    private final Status recoverIfAtLeast;
+    private final BiConsumer<Outcome, Throwable> callback;
+
+    private boolean isDone;
+    private boolean isBallotPromised;
+    private final List<InvalidateOk> invalidateOks = new ArrayList<>();
+    private final QuorumTracker.QuorumShardTracker tracker;
 
     private Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, Status recoverIfAtLeast, BiConsumer<Outcome, Throwable> callback)
     {
@@ -68,15 +72,17 @@ public class Invalidate implements Callback<InvalidateReply>
         this.informKeys = informKeys;
         this.invalidateWithKey = invalidateWithKey;
         this.recoverIfAtLeast = recoverIfAtLeast;
-        this.preacceptTracker = new QuorumShardTracker(shard);
+        this.tracker = new QuorumTracker.QuorumShardTracker(shard);
     }
 
-    public static Invalidate invalidateIfNotWitnessed(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
+    public static Invalidate invalidate(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
     {
         return invalidate(node, txnId, informKeys, invalidateWithKey, PreAccepted, callback);
     }
 
-    public static Invalidate invalidate(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
+    // TODO (now): (file separately) this is a bug, as it is possible for a preaccept to still be in-flight.
+    //             It's not even safe to do it after an initial Invalidate round unless we ensure that recovery never pre-accepts with txnId (which we should also enforce)
+    public static Invalidate invalidateIfNotAccepted(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
     {
         return invalidate(node, txnId, informKeys, invalidateWithKey, Accepted, callback);
     }
@@ -93,7 +99,7 @@ public class Invalidate implements Callback<InvalidateReply>
     @Override
     public synchronized void onSuccess(Id from, InvalidateReply reply)
     {
-        if (isDone || preacceptTracker.hasReachedQuorum())
+        if (isDone || isBallotPromised)
             return;
 
         if (!reply.isOk())
@@ -113,19 +119,22 @@ public class Invalidate implements Callback<InvalidateReply>
 
         InvalidateOk ok = (InvalidateOk) reply;
         invalidateOks.add(ok);
-        if (preacceptTracker.success(from))
+        if (tracker.onSuccess(from) == Success)
             invalidate();
     }
 
     private void invalidate()
     {
+        Preconditions.checkState(!isBallotPromised);
+        isBallotPromised = true;
         // first look to see if it has already been
         {
-            Status maxStatus = invalidateOks.stream().map(ok -> ok.status).max(Comparable::compareTo).orElseThrow(IllegalStateException::new);
             Route route = InvalidateOk.findRoute(invalidateOks);
             RoutingKey homeKey = route != null ? route.homeKey : InvalidateOk.findHomeKey(invalidateOks);
+            SaveStatus maxStatus = invalidateOks.stream().map(ok -> ok.status).max(Comparable::compareTo).orElseThrow(IllegalStateException::new);
+            Known maxKnown = invalidateOks.stream().map(ok -> ok.status.known).max(Comparable::compareTo).orElseThrow(IllegalStateException::new);
 
-            switch (maxStatus)
+            switch (maxStatus.status)
             {
                 default: throw new IllegalStateException();
                 case AcceptedInvalidate:
@@ -133,13 +142,13 @@ public class Invalidate implements Callback<InvalidateReply>
                 case NotWitnessed:
                     break;
 
-                    case PreAccepted:
+                case PreAccepted:
                 case Accepted:
                     // note: we do not attempt to calculate PreAccept outcome here, we rely on the caller to tell us
                     // what is safe to do. If the caller knows no decision was reached with PreAccept, we can safely
                     // invalidate if we see PreAccept, and only need to recover if we see Accept
                     // TODO: if we see Accept, go straight to propose to save some unnecessary work
-                    if (recoverIfAtLeast.compareTo(maxStatus) > 0)
+                    if (recoverIfAtLeast.compareTo(maxStatus.status) > 0)
                         break;
 
                 case Committed:
@@ -151,12 +160,11 @@ public class Invalidate implements Callback<InvalidateReply>
                     {
                         RecoverWithRoute.recover(node, ballot, txnId, route, callback);
                     }
-                    else if (homeKey != null && homeKey.equals(invalidateWithKey))
-                    {
-                        throw new IllegalStateException("Received a reply from a node that must have known the route, but that did not include it");
-                    }
                     else if (homeKey != null)
                     {
+                        if (homeKey.equals(invalidateWithKey) && maxKnown.compareTo(Definition) >= 0)
+                            throw new IllegalStateException("Received a reply from a node that must have known the route, but that did not include it");
+
                         RecoverWithHomeKey.recover(node, txnId, homeKey, callback);
                     }
                     else
@@ -179,6 +187,11 @@ public class Invalidate implements Callback<InvalidateReply>
         // if we have witnessed the transaction, but are able to invalidate, do we want to proceed?
         // Probably simplest to do so, but perhaps better for user if we don't.
         proposeInvalidate(node, ballot, txnId, invalidateWithKey, (success, fail) -> {
+            /**
+             * We're now inside our *exactly once* callback we registered with proposeInvalidate, and we need to
+             * make sure we honour our own exactly once semantics with {@code callback}.
+             * So we are responsible for all exception handling.
+             */
             isDone = true;
             if (fail != null)
             {
@@ -211,10 +224,10 @@ public class Invalidate implements Callback<InvalidateReply>
     @Override
     public void onFailure(Id from, Throwable failure)
     {
-        if (isDone)
+        if (isDone || isBallotPromised)
             return;
 
-        if (preacceptTracker.failure(from))
+        if (tracker.onFailure(from) == Fail)
         {
             isDone = true;
             callback.accept(null, new Timeout(txnId, null));
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index d360f2d..067b258 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -62,7 +62,7 @@ public class MaybeRecover extends CheckShards
     }
 
     @Override
-    protected boolean isSufficient(Id from, CheckStatusOk ok)
+    protected boolean isSufficient(CheckStatusOk ok)
     {
         return hasMadeProgress(ok);
     }
@@ -74,18 +74,15 @@ public class MaybeRecover extends CheckShards
     }
 
     @Override
-    protected void onDone(Done done, Throwable fail)
+    protected void onDone(Success success, Throwable fail)
     {
         if (fail != null)
         {
             callback.accept(null, fail);
         }
-        else if (merged == null)
-        {
-            callback.accept(null, new Timeout(txnId, homeKey));
-        }
         else
         {
+            Preconditions.checkState(merged != null);
             switch (merged.saveStatus.known)
             {
                 default: throw new AssertionError();
@@ -96,7 +93,7 @@ public class MaybeRecover extends CheckShards
                         RoutingKeys someKeys = reduceNonNull(RoutingKeys::union, this.contactKeys, merged.route, route);
                         // for correctness reasons, we have not necessarily preempted the initial pre-accept round and
                         // may have raced with it, so we must attempt to recover anything we see pre-accepted.
-                        Invalidate.invalidateIfNotWitnessed(node, txnId, someKeys, homeKey, callback);
+                        Invalidate.invalidate(node, txnId, someKeys, homeKey, callback);
                         break;
                     }
                 case ExecutionOrder:
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java
index c8be9f5..c4c427e 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -40,6 +40,7 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 
+import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.local.Status.Durability.Durable;
 import static accord.local.Status.Durability.Universal;
 
@@ -90,7 +91,7 @@ public class Persist implements Callback<ApplyReply>
             case Redundant:
             case Applied:
                 persistedOn.add(from);
-                if (tracker.success(from))
+                if (tracker.recordSuccess(from) == Success)
                 {
                     if (!isDone)
                     {
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java
index 6126e41..2c851d0 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -24,8 +24,10 @@ import java.util.function.BiConsumer;
 
 import accord.api.Result;
 import accord.api.RoutingKey;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
+import accord.coordinate.tracking.AbstractTracker.ShardOutcomes;
 import accord.coordinate.tracking.QuorumTracker;
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.coordinate.tracking.RequestStatus;
 import accord.messages.Callback;
 import accord.primitives.Route;
 import accord.topology.Shard;
@@ -40,7 +42,9 @@ import accord.primitives.TxnId;
 import accord.messages.Accept;
 import accord.messages.Accept.AcceptOk;
 import accord.messages.Accept.AcceptReply;
-import com.google.common.base.Preconditions;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static accord.coordinate.tracking.RequestStatus.Failed;
 
 class Propose implements Callback<AcceptReply>
 {
@@ -102,7 +106,7 @@ class Propose implements Callback<AcceptReply>
             case Success:
                 AcceptOk ok = (AcceptOk) reply;
                 acceptOks.add(ok);
-                if (acceptTracker.success(from))
+                if (acceptTracker.recordSuccess(from) == RequestStatus.Success)
                     onAccepted();
         }
     }
@@ -110,7 +114,7 @@ class Propose implements Callback<AcceptReply>
     @Override
     public void onFailure(Id from, Throwable failure)
     {
-        if (acceptTracker.failure(from))
+        if (acceptTracker.recordFailure(from) == Failed)
         {
             isDone = true;
             callback.accept(null, new Timeout(txnId, route.homeKey));
@@ -174,7 +178,7 @@ class Propose implements Callback<AcceptReply>
                 return;
             }
 
-            if (acceptTracker.success(from))
+            if (acceptTracker.onSuccess(from) == ShardOutcomes.Success)
             {
                 isDone = true;
                 callback.accept(null, null);
@@ -184,7 +188,7 @@ class Propose implements Callback<AcceptReply>
         @Override
         public void onFailure(Id from, Throwable failure)
         {
-            if (acceptTracker.failure(from))
+            if (acceptTracker.onFailure(from) == Fail)
             {
                 isDone = true;
                 callback.accept(null, new Timeout(txnId, null));
diff --git a/accord-core/src/main/java/accord/coordinate/QuorumReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/QuorumReadCoordinator.java
deleted file mode 100644
index f084753..0000000
--- a/accord-core/src/main/java/accord/coordinate/QuorumReadCoordinator.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package accord.coordinate;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-
-import accord.coordinate.tracking.ReadTracker;
-import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.Callback;
-import accord.primitives.TxnId;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-
-// TODO: this class needs cleaning up
-//       we should also escalate the number of nodes we contact on each failure to succeed
-abstract class QuorumReadCoordinator<Reply> implements Callback<Reply>
-{
-    protected enum Action { Abort, Reject, AcceptQuorum, Accept }
-
-    protected enum Done { Exhausted, ReachedQuorum, Success }
-
-    static class QuorumReadShardTracker extends ReadShardTracker
-    {
-        private int responseCount;
-        public QuorumReadShardTracker(Shard shard)
-        {
-            super(shard);
-        }
-
-        public boolean recordReadResponse(Id node)
-        {
-            Preconditions.checkArgument(shard.nodes.contains(node));
-            ++responseCount;
-            --inflight;
-            return true;
-        }
-
-        @Override
-        public boolean recordReadSuccess(Id node)
-        {
-            if (!super.recordReadSuccess(node))
-                return false;
-
-            ++responseCount;
-            return true;
-        }
-
-        public boolean hasFailed()
-        {
-            return super.hasFailed() && !hasReachedQuorum();
-        }
-
-        public boolean hasReachedQuorum()
-        {
-            return responseCount >= shard.slowPathQuorumSize;
-        }
-    }
-
-    static class Tracker extends ReadTracker<QuorumReadShardTracker>
-    {
-        public Tracker(Topologies topologies)
-        {
-            super(topologies, QuorumReadShardTracker[]::new, QuorumReadShardTracker::new);
-        }
-
-        void recordReadResponse(Id node)
-        {
-            recordResponse(node);
-            forEachTrackerForNode(node, QuorumReadShardTracker::recordReadResponse);
-        }
-
-        public boolean hasReachedQuorum()
-        {
-            return all(QuorumReadShardTracker::hasReachedQuorum);
-        }
-    }
-
-    final Node node;
-    final TxnId txnId;
-    protected final Tracker tracker;
-    private boolean isDone;
-    private Throwable failure;
-    Map<Id, Reply> debug = new HashMap<>();
-
-    QuorumReadCoordinator(Node node, Topologies topologies, TxnId txnId)
-    {
-        this.node = node;
-        this.txnId = txnId;
-        this.tracker = new Tracker(topologies);
-    }
-
-     protected void start()
-    {
-        contact(tracker.computeMinimalReadSetAndMarkInflight());
-    }
-
-    protected abstract void contact(Set<Id> nodes);
-    protected abstract Action process(Id from, Reply reply);
-
-    protected abstract void onDone(Done done, Throwable failure);
-
-    @Override
-    public void onSuccess(Id from, Reply reply)
-    {
-        debug.put(from, reply);
-        if (isDone)
-            return;
-
-        switch (process(from, reply))
-        {
-            default: throw new IllegalStateException();
-            case Abort:
-                isDone = true;
-                break;
-
-            case Reject:
-                tracker.recordReadFailure(from);
-                tryOneMore();
-                break;
-
-            case AcceptQuorum:
-                tracker.recordReadResponse(from);
-                if (!finishIfQuorum())
-                    tryOneMore();
-                break;
-
-            case Accept:
-                tracker.recordReadSuccess(from);
-                if (!tracker.hasCompletedRead())
-                {
-                    if (!finishIfQuorum())
-                        finishIfFailure();
-                    break;
-                }
-
-                isDone = true;
-                onDone(Done.Success, null);
-        }
-    }
-
-    @Override
-    public void onSlowResponse(Id from)
-    {
-        tracker.recordSlowRead(from);
-        Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
-        if (readFrom != null)
-            contact(readFrom);
-    }
-
-    @Override
-    public void onFailure(Id from, Throwable failure)
-    {
-        debug.put(from, null);
-        if (isDone)
-            return;
-
-        if (this.failure == null) this.failure = failure;
-        else this.failure.addSuppressed(failure);
-
-        if (tracker.recordReadFailure(from))
-            tryOneMore();
-    }
-
-    private void tryOneMore()
-    {
-        Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
-        if (readFrom != null) contact(readFrom);
-        else finishIfFailure();
-    }
-
-    private boolean finishIfQuorum()
-    {
-        if (!tracker.hasReachedQuorum())
-            return false;
-
-        isDone = true;
-        onDone(Done.ReachedQuorum, null);
-        return true;
-    }
-
-    private void finishIfFailure()
-    {
-        if (tracker.hasFailed())
-        {
-            isDone = true;
-            onDone(null, failure);
-        }
-        else if (!tracker.hasInFlight())
-        {
-            isDone = true;
-            onDone(Done.Exhausted, null);
-        }
-    }
-
-    @Override
-    public void onCallbackFailure(Id from, Throwable failure)
-    {
-        isDone = true;
-        if (this.failure != null)
-            failure.addSuppressed(this.failure);
-        onDone(null, failure);
-    }
-
-    protected Set<Id> nodes()
-    {
-        return tracker.nodes();
-    }
-
-}
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
new file mode 100644
index 0000000..4154b21
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -0,0 +1,187 @@
+package accord.coordinate;
+
+import accord.coordinate.tracking.RequestStatus;
+import accord.messages.Callback;
+import com.google.common.base.Preconditions;
+
+import accord.coordinate.tracking.ReadTracker;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import java.util.*;
+
+import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
+
+abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends ReadTracker implements Callback<Reply>
+{
+    protected enum Action
+    {
+        /**
+         * Immediately fail the coordination
+         */
+        Abort,
+
+        /**
+         * This response is unsuitable for the purposes of this coordination, whether individually or as a quorum.
+         */
+        Reject,
+
+        /**
+         * An intermediate response has been received that suggests a full response may be delayed; another replica
+         * should be contacted for its response. This is currently used when a read is lacking necessary information
+         * (such as a commit) in order to serve the response, and so additional information is sent by the coordinator.
+         */
+        TryAlternative,
+
+        /**
+         * This response is unsuitable by itself, but if a quorum of such responses is received for the shard
+         * we will Success.Quorum
+         */
+        ApproveIfQuorum,
+
+        /**
+         * This response is suitable by itself; if we receive such a response from each shard we will complete
+         * successfully with Success.Success
+         */
+        Approve
+    }
+    protected enum Success { Quorum, Success }
+
+    final Node node;
+    final TxnId txnId;
+    private boolean isDone;
+    private Throwable failure;
+    Map<Id, Object> debug = new HashMap<>();
+
+    ReadCoordinator(Node node, Topologies topologies, TxnId txnId)
+    {
+        super(topologies);
+        this.node = node;
+        this.txnId = txnId;
+    }
+
+    protected abstract Action process(Id from, Reply reply);
+    protected abstract void onDone(Success success, Throwable failure);
+    protected abstract void contact(Id to);
+
+    @Override
+    public void onSuccess(Id from, Reply reply)
+    {
+        if (debug != null) debug.put(from, reply);
+        if (isDone)
+            return;
+
+        switch (process(from, reply))
+        {
+            default: throw new IllegalStateException();
+            case Abort:
+                isDone = true;
+                break;
+
+            case TryAlternative:
+                Preconditions.checkState(!reply.isFinal());
+                onSlowResponse(from);
+                break;
+
+            case Reject:
+                handle(recordReadFailure(from));
+                break;
+
+            case ApproveIfQuorum:
+                handle(recordQuorumReadSuccess(from));
+                break;
+
+            case Approve:
+                handle(recordReadSuccess(from));
+        }
+    }
+
+    public void onSlowResponse(Id from)
+    {
+        handle(recordSlowResponse(from));
+    }
+
+    @Override
+    public void onFailure(Id from, Throwable failure)
+    {
+        if (debug != null)
+            debug.put(from, null);
+
+        if (isDone)
+            return;
+
+        if (this.failure == null) this.failure = failure;
+        else this.failure.addSuppressed(failure);
+
+        handle(recordReadFailure(from));
+    }
+
+    @Override
+    public void onCallbackFailure(Id from, Throwable failure)
+    {
+        if (isDone)
+        {
+            node.agent().onUncaughtException(failure);
+            return;
+        }
+
+        if (this.failure != null)
+            failure.addSuppressed(this.failure);
+        this.failure = failure;
+        finishOnFailure();
+    }
+
+    protected void finishOnFailure()
+    {
+        Preconditions.checkState(!isDone);
+        isDone = true;
+        if (failure == null)
+            failure = new Exhausted(txnId, null);
+        onDone(null, failure);
+    }
+
+    private void handle(RequestStatus result)
+    {
+        switch (result)
+        {
+            default: throw new AssertionError();
+            case NoChange:
+                break;
+            case Success:
+                Preconditions.checkState(!isDone);
+                isDone = true;
+                onDone(waitingOnData == 0 ? Success.Success : Success.Quorum, null);
+                break;
+            case Failed:
+                finishOnFailure();
+        }
+    }
+
+    protected void start(Set<Id> to)
+    {
+        to.forEach(this::contact);
+    }
+
+    public void start()
+    {
+        Set<Id> contact = newHashSetWithExpectedSize(maxShardsPerEpoch());
+        if (trySendMore(Set::add, contact) != RequestStatus.NoChange)
+            throw new IllegalStateException();
+        start(contact);
+    }
+
+    @Override
+    protected RequestStatus trySendMore()
+    {
+        // TODO: due to potential re-entrancy into this method, if the node we are contacting is unavailable
+        //  so onFailure is invoked immediately, for the moment we copy nodes to an intermediate list.
+        //  would be better to prevent reentrancy either by detecting this inside trySendMore or else queueing
+        //  callbacks externally, so two may not be in-flight at once
+        List<Id> contact = new ArrayList<>(1);
+        RequestStatus status = trySendMore(List::add, contact);
+        contact.forEach(this::contact);
+        return status;
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java
index 37aab37..59b8339 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -25,14 +25,12 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
+import accord.coordinate.tracking.*;
 import accord.primitives.*;
 import accord.messages.Commit;
 import com.google.common.base.Preconditions;
 
 import accord.api.Result;
-import accord.coordinate.tracking.FastPathTracker;
-import accord.coordinate.tracking.QuorumTracker;
-import accord.topology.Shard;
 import accord.topology.Topologies;
 import accord.messages.Callback;
 import accord.local.Node;
@@ -49,8 +47,9 @@ import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.Promise;
 
 import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
+import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.messages.BeginRecovery.RecoverOk.maxAcceptedOrLater;
-import static accord.messages.Commit.Invalidate.commitInvalidate;
 
 // TODO: rename to Recover (verb); rename Recover message to not clash
 public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throwable>
@@ -65,7 +64,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
         AwaitCommit(Node node, TxnId txnId, RoutingKeys someKeys)
         {
             Topology topology = node.topology().globalForEpoch(txnId.epoch).forKeys(someKeys);
-            this.tracker = new QuorumTracker(topology);
+            this.tracker = new QuorumTracker(new Topologies.Single(node.topology().sorter(), topology));
             node.send(topology.nodes(), to -> new WaitOnCommit(to, topology, txnId, someKeys), this);
         }
 
@@ -74,7 +73,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
         {
             if (isDone()) return;
 
-            if (tracker.success(from))
+            if (tracker.recordSuccess(from) == Success)
                 trySuccess(null);
         }
 
@@ -83,7 +82,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
         {
             if (isDone()) return;
 
-            if (tracker.failure(from))
+            if (tracker.recordFailure(from) == Failed)
                 tryFailure(new Timeout(txnId, route.homeKey));
         }
 
@@ -113,43 +112,17 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
         return future;
     }
 
-    // TODO: not sure it makes sense to extend FastPathTracker, as intent here is a bit different
-    static class ShardTracker extends FastPathTracker.FastPathShardTracker
-    {
-        int responsesFromElectorate;
-        public ShardTracker(Shard shard)
-        {
-            super(shard);
-        }
-
-        @Override
-        public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
-        {
-            if (!shard.fastPathElectorate.contains(node))
-                return false;
-
-            ++responsesFromElectorate;
-            return withFastPathTimestamp;
-        }
-
-        @Override
-        public boolean hasMetFastPathCriteria()
-        {
-            int fastPathRejections = responsesFromElectorate - fastPathAccepts;
-            return fastPathRejections <= shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
-        }
-    }
-
-    final Node node;
-    final Ballot ballot;
-    final TxnId txnId;
-    final Txn txn;
-    final Route route;
-    final BiConsumer<Outcome, Throwable> callback;
+    private final Node node;
+    private final Ballot ballot;
+    private final TxnId txnId;
+    private final Txn txn;
+    private final Route route;
+    private final BiConsumer<Outcome, Throwable> callback;
     private boolean isDone;
 
-    final List<RecoverOk> recoverOks = new ArrayList<>();
-    final FastPathTracker<ShardTracker> tracker;
+    private final List<RecoverOk> recoverOks = new ArrayList<>();
+    private final RecoveryTracker tracker;
+    private boolean isBallotPromised;
 
     private Recover(Node node, Ballot ballot, TxnId txnId, Txn txn, Route route, BiConsumer<Outcome, Throwable> callback, Topologies topologies)
     {
@@ -160,7 +133,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
         this.route = route;
         this.callback = callback;
         assert topologies.oldestEpoch() == topologies.currentEpoch() && topologies.currentEpoch() == txnId.epoch;
-        this.tracker = new FastPathTracker<>(topologies, ShardTracker[]::new, ShardTracker::new);
+        this.tracker = new RecoveryTracker(topologies);
     }
 
     @Override
@@ -203,7 +176,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
     @Override
     public synchronized void onSuccess(Id from, RecoverReply reply)
     {
-        if (isDone || tracker.hasReachedQuorum())
+        if (isDone || isBallotPromised)
             return;
 
         if (!reply.isOk())
@@ -215,14 +188,15 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
         RecoverOk ok = (RecoverOk) reply;
         recoverOks.add(ok);
         boolean fastPath = ok.executeAt.compareTo(txnId) == 0;
-        tracker.recordSuccess(from, fastPath);
-
-        if (tracker.hasReachedQuorum())
+        if (tracker.recordSuccess(from, fastPath) == Success)
             recover();
     }
 
     private void recover()
     {
+        Preconditions.checkState(!isBallotPromised);
+        isBallotPromised = true;
+
         // first look for the most recent Accept; if present, go straight to proposing it again
         RecoverOk acceptOrCommit = maxAcceptedOrLater(recoverOks);
         if (acceptOrCommit != null)
@@ -280,21 +254,12 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
             }
         }
 
-        if (!tracker.hasMetFastPathCriteria())
+        if (tracker.rejectsFastPath() || recoverOks.stream().anyMatch(ok -> ok.rejectsFastPath))
         {
             invalidate();
             return;
         }
 
-        for (RecoverOk ok : recoverOks)
-        {
-            if (ok.rejectsFastPath)
-            {
-                invalidate();
-                return;
-            }
-        }
-
         // should all be PreAccept
         Deps deps = mergeDeps();
         Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok -> ok.earlierAcceptedNoWitness);
@@ -356,7 +321,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
         if (isDone)
             return;
 
-        if (tracker.failure(from))
+        if (tracker.recordFailure(from) == Failed)
             accept(null, new Timeout(txnId, route.homeKey));
     }
 
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java b/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
index 28672e3..fe735e3 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
@@ -4,7 +4,6 @@ import java.util.function.BiConsumer;
 
 import accord.api.RoutingKey;
 import accord.local.Node;
-import accord.local.Node.Id;
 import accord.messages.CheckStatus.CheckStatusOk;
 import accord.messages.CheckStatus.IncludeInfo;
 import accord.primitives.Route;
@@ -41,13 +40,13 @@ public class RecoverWithHomeKey extends CheckShards implements BiConsumer<Object
     }
 
     @Override
-    protected boolean isSufficient(Id from, CheckStatusOk ok)
+    protected boolean isSufficient(CheckStatusOk ok)
     {
         return ok.route != null;
     }
 
     @Override
-    protected void onDone(Done done, Throwable fail)
+    protected void onDone(Success success, Throwable fail)
     {
         if (fail != null)
         {
@@ -55,19 +54,16 @@ public class RecoverWithHomeKey extends CheckShards implements BiConsumer<Object
         }
         else if (merged == null || !(merged.route instanceof Route))
         {
-            switch (done)
+            switch (success)
             {
                 default: throw new IllegalStateException();
-                case Exhausted:
-                    callback.accept(null, new Timeout(txnId, homeKey));
-                    return;
                 case Success:
                     // home shard must know full Route. Our success criteria is that the response contained a route,
                     // so reaching here without a response or one without a full Route is a bug.
                     callback.accept(null, new IllegalStateException());
                     return;
-                case ReachedQuorum:
-                    Invalidate.invalidate(node, txnId, contactKeys, homeKey, callback);
+                case Quorum:
+                    Invalidate.invalidateIfNotAccepted(node, txnId, contactKeys, homeKey, callback);
             }
         }
         else
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 911fb8a..b60bbaa 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -1,6 +1,5 @@
 package accord.coordinate;
 
-import java.util.Set;
 import java.util.function.BiConsumer;
 
 import accord.local.Status.Known;
@@ -59,13 +58,26 @@ public class RecoverWithRoute extends CheckShards
     }
 
     @Override
-    protected void contact(Set<Id> nodes)
+    public void contact(Id to)
     {
-        node.send(nodes, to -> new CheckStatus(to, tracker.topologies(), txnId, route, IncludeInfo.All), this);
+        node.send(to, new CheckStatus(to, topologies(), txnId, route, IncludeInfo.All), this);
     }
 
     @Override
     protected boolean isSufficient(Id from, CheckStatusOk ok)
+    {
+        KeyRanges rangesForNode = topologies().forEpoch(txnId.epoch).rangesForNode(from);
+        PartialRoute route = this.route.slice(rangesForNode);
+        return isSufficient(route, ok);
+    }
+
+    @Override
+    protected boolean isSufficient(CheckStatusOk ok)
+    {
+        return isSufficient(route, merged);
+    }
+
+    protected boolean isSufficient(AbstractRoute route, CheckStatusOk ok)
     {
         CheckStatusOkFull full = (CheckStatusOkFull)ok;
         Known sufficientTo = full.sufficientFor(route);
@@ -75,14 +87,12 @@ public class RecoverWithRoute extends CheckShards
         if (sufficientTo == Known.Invalidation)
             return true;
 
-        KeyRanges rangesForNode = tracker.topologies().forEpoch(txnId.epoch).rangesForNode(from);
-        PartialRoute route = this.route.slice(rangesForNode);
         Preconditions.checkState(full.partialTxn.covers(route));
         return true;
     }
 
     @Override
-    protected void onDone(Done done, Throwable failure)
+    protected void onDone(Success success, Throwable failure)
     {
         if (failure != null)
         {
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
deleted file mode 100644
index ab8d73b..0000000
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.coordinate.tracking;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-
-import accord.local.Node;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-
-public class AbstractQuorumTracker<T extends AbstractQuorumTracker.QuorumShardTracker> extends AbstractResponseTracker<T>
-{
-    public static class QuorumShardTracker extends ShardTracker
-    {
-        private final Set<Node.Id> inflight;
-        private int success = 0;
-        private int failures = 0;
-        public QuorumShardTracker(Shard shard)
-        {
-            super(shard);
-            this.inflight = new HashSet<>(shard.nodes);
-        }
-
-        protected boolean oneSuccess(Node.Id id)
-        {
-            if (!inflight.remove(id))
-                return false;
-            success++;
-            return true;
-        }
-
-        // return true iff hasReachedQuorum()
-        public boolean success(Node.Id id)
-        {
-            oneSuccess(id);
-            return hasReachedQuorum();
-        }
-
-        // return true iff hasFailed()
-        public boolean failure(Node.Id id)
-        {
-            if (!inflight.remove(id))
-                return false;
-            failures++;
-            return hasFailed();
-        }
-
-        public boolean hasFailed()
-        {
-            return failures > shard.maxFailures;
-        }
-
-        public boolean hasFailures()
-        {
-            return failures > 0;
-        }
-
-        public boolean hasReachedQuorum()
-        {
-            return success >= shard.slowPathQuorumSize;
-        }
-
-        public boolean hasInFlight()
-        {
-            return !inflight.isEmpty();
-        }
-    }
-
-    public AbstractQuorumTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
-    {
-        super(topologies, arrayFactory, trackerFactory);
-    }
-
-    // return true iff hasFailed()
-    public boolean failure(Node.Id node)
-    {
-        return anyForNode(node, QuorumShardTracker::failure);
-    }
-
-    public boolean hasReachedQuorum()
-    {
-        return all(QuorumShardTracker::hasReachedQuorum);
-    }
-
-    public boolean hasFailed()
-    {
-        return any(QuorumShardTracker::hasFailed);
-    }
-
-    public boolean hasFailures()
-    {
-        return any(QuorumShardTracker::hasFailed);
-    }
-
-    public boolean hasInFlight()
-    {
-        return any(QuorumShardTracker::hasInFlight);
-    }
-
-}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
deleted file mode 100644
index 9793c87..0000000
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.coordinate.tracking;
-
-import accord.local.Node;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-import accord.utils.IndexedIntFunction;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import java.util.*;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.BiPredicate;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-import java.util.function.Predicate;
-
-public abstract class AbstractResponseTracker<T extends AbstractResponseTracker.ShardTracker>
-{
-    private static final int[] SINGLETON_OFFSETS = new int[0];
-    private final Topologies topologies;
-    private final T[] trackers;
-    private final int[] offsets;
-
-    public static class ShardTracker
-    {
-        public final Shard shard;
-
-        public ShardTracker(Shard shard)
-        {
-            this.shard = shard;
-        }
-    }
-
-    public AbstractResponseTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
-    {
-        Preconditions.checkArgument(topologies.totalShards() > 0);
-        this.topologies = topologies;
-        this.trackers = arrayFactory.apply(topologies.totalShards());
-        if (topologies.size() > 1)
-        {
-            this.offsets = new int[topologies.size() - 1];
-            int offset = topologies.get(0).size();
-            for (int i=1, mi=topologies.size(); i<mi; i++)
-            {
-                this.offsets[i - 1] = offset;
-                offset += topologies.get(i).size();
-            }
-        }
-        else
-        {
-            this.offsets = SINGLETON_OFFSETS;
-        }
-
-        this.topologies.forEach((i, topology) -> {
-            int offset = topologyOffset(i);
-            topology.forEach((j, shard) -> trackers[offset + j] = trackerFactory.apply(shard));
-        });
-    }
-
-    protected int topologyOffset(int topologyIdx)
-    {
-        return topologyIdx > 0 ? offsets[topologyIdx - 1] : 0;
-    }
-
-    protected int topologyLength(int topologyIdx)
-    {
-        if (topologyIdx > offsets.length)
-            throw new IndexOutOfBoundsException();
-
-        int endIdx = topologyIdx == offsets.length ? trackers.length : topologyOffset(topologyIdx + 1);
-        return endIdx - topologyOffset(topologyIdx);
-    }
-
-    public Topologies topologies()
-    {
-        return topologies;
-    }
-
-    protected void forEachTrackerForNode(Node.Id node, BiConsumer<? super T, Node.Id> consumer)
-    {
-        this.topologies.forEach((i, topology) -> {
-            int offset = topologyOffset(i);
-            topology.forEachOn(node, (j, shard) -> consumer.accept(trackers[offset + j], node));
-        });
-    }
-
-    // does not abort early, to ensure all trackers are updated (introduce Lazy variant if necessary)
-    protected boolean anyForNode(Node.Id node, BiPredicate<? super T, Node.Id> consumer)
-    {
-        return matchingTrackersForNode(node, consumer, Integer.MAX_VALUE) > 0;
-    }
-
-    // does not abort early, to ensure all trackers are updated (introduce Lazy variant if necessary)
-    protected boolean allForNode(Node.Id node, BiPredicate<T, Node.Id> consumer)
-    {
-        return nonMatchingTrackersForNode(node, consumer, Integer.MAX_VALUE) == 0;
-    }
-
-    protected int nonMatchingTrackersForNode(Node.Id node, BiPredicate<? super T, Node.Id> consumer, int limit)
-    {
-        return foldlForNode(node, (shardIndex, shard, v) -> consumer.test(trackers[shardIndex], node) ? v : v + 1, 0, limit);
-    }
-
-    protected int matchingTrackersForNode(Node.Id node, BiPredicate<? super T, Node.Id> consumer, int limit)
-    {
-        return foldlForNode(node, (shardIndex, shard, v) -> consumer.test(trackers[shardIndex], node) ? v + 1 : v, 0, limit);
-    }
-
-    protected int matchingTrackersForNode(Node.Id node, Predicate<? super T> consumer)
-    {
-        return foldlForNode(node, (shardIndex, shard, v) -> consumer.test(trackers[shardIndex]) ? v + 1 : v, 0, Integer.MAX_VALUE);
-    }
-
-    protected int foldlForNode(Node.Id node, IndexedIntFunction<Shard> function, int initialValue, int terminalValue)
-    {
-        for (int i = 0 ; i < topologies.size() && initialValue != terminalValue ; ++i)
-        {
-            initialValue = topologies.get(i).foldlIntOn(node, function, topologyOffset(i), initialValue, terminalValue);
-        }
-        return initialValue;
-    }
-
-    protected boolean all(Predicate<T> predicate)
-    {
-        for (T tracker : trackers)
-            if (!predicate.test(tracker))
-                return false;
-        return true;
-    }
-
-    protected boolean any(Predicate<T> predicate)
-    {
-        for (T tracker : trackers)
-            if (predicate.test(tracker))
-                return true;
-        return false;
-    }
-
-    protected <V> V foldl(BiFunction<T, V, V> function, V accumulator)
-    {
-        for (T tracker : trackers)
-            accumulator = function.apply(tracker, accumulator);
-        return accumulator;
-    }
-
-    public Set<Node.Id> nodes()
-    {
-        return topologies.nodes();
-    }
-
-    @VisibleForTesting
-    public T unsafeGet(int topologyIdx, int shardIdx)
-    {
-        if (shardIdx >= topologyLength(topologyIdx))
-            throw new IndexOutOfBoundsException();
-        return trackers[topologyOffset(topologyIdx) + shardIdx];
-    }
-
-    int trackerCount()
-    {
-        return trackers.length;
-    }
-
-    public T unsafeGet(int i)
-    {
-        Preconditions.checkArgument(offsets.length == 0);
-        return unsafeGet(0, i);
-    }
-}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
new file mode 100644
index 0000000..7717b9b
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.local.Node.Id;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+
+public abstract class AbstractTracker<ST extends ShardTracker, P>
+{
+    /**
+     * Represents the logical result of a ShardFunction applied to a ShardTracker,
+     * encapsulating also any modification it should make to the AbstractTracker
+     * containing the shard.
+     */
+    public interface ShardOutcome<T extends AbstractTracker<?, ?>>
+    {
+        ShardOutcomes apply(T tracker);
+    }
+
+    public enum ShardOutcomes implements ShardOutcome<AbstractTracker<?, ?>>
+    {
+        Fail, SendMore, Success, NoChange;
+
+        static ShardOutcomes min(ShardOutcomes a, ShardOutcomes b)
+        {
+            return a.compareTo(b) <= 0 ? a : b;
+        }
+
+        @Override
+        public ShardOutcomes apply(AbstractTracker<?, ?> tracker)
+        {
+            if (this == Success)
+                --tracker.waitingOnShards;
+            return this;
+        }
+    }
+
+    final Topologies topologies;
+    protected final ST[] trackers;
+    protected final int maxShardsPerEpoch;
+    protected int waitingOnShards;
+
+    AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, Function<Shard, ST> trackerFactory)
+    {
+        Preconditions.checkArgument(topologies.totalShards() > 0);
+        int topologyCount = topologies.size();
+        int maxShardsPerEpoch = topologies.get(0).size();
+        int shardCount = maxShardsPerEpoch;
+        for (int i = 1 ; i < topologyCount ; ++i)
+        {
+            int size = topologies.get(i).size();
+            maxShardsPerEpoch = Math.max(maxShardsPerEpoch, size);
+            shardCount += size;
+        }
+        this.topologies = topologies;
+        this.trackers = arrayFactory.apply(topologyCount * maxShardsPerEpoch);
+        for (int i = 0 ; i < topologyCount ; ++i)
+        {
+            Topology topology = topologies.get(i);
+            int size = topology.size();
+            for (int j = 0; j < size; ++j)
+                trackers[i * maxShardsPerEpoch + j] = trackerFactory.apply(topology.get(j));
+        }
+        this.maxShardsPerEpoch = maxShardsPerEpoch;
+        this.waitingOnShards = shardCount;
+    }
+
+    protected int topologyOffset(int topologyIdx)
+    {
+        return topologyIdx * maxShardsPerEpoch();
+    }
+
+    public Topologies topologies()
+    {
+        return topologies;
+    }
+
+    protected RequestStatus trySendMore() { throw new UnsupportedOperationException(); }
+
+    <T extends AbstractTracker<ST, P>>
+    RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param)
+    {
+        return recordResponse(self, node, function, param, topologies.size());
+    }
+
+    <T extends AbstractTracker<ST, P>>
+    RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param, int topologyLimit)
+    {
+        Preconditions.checkState(self == this); // we just accept self as parameter for type safety
+        ShardOutcomes minShardStatus = ShardOutcomes.NoChange;
+        int maxShards = maxShardsPerEpoch();
+        for (int i = 0; i < topologyLimit && minShardStatus != ShardOutcomes.Fail; ++i)
+        {
+            minShardStatus = topologies.get(i).mapReduceOn(node, i * maxShards, AbstractTracker::apply, self, function, param, ShardOutcomes::min, minShardStatus);
+        }
+
+        switch (minShardStatus)
+        {
+            default: throw new AssertionError();
+            case SendMore:
+                return trySendMore();
+            case Success:
+                if (waitingOnShards == 0)
+                    return RequestStatus.Success;
+            case NoChange:
+                return RequestStatus.NoChange;
+            case Fail:
+                return RequestStatus.Failed;
+        }
+    }
+
+    static <ST extends ShardTracker, P, T extends AbstractTracker<ST, P>>
+    ShardOutcomes apply(int trackerIndex, T tracker, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param)
+    {
+        return function.apply(tracker.trackers[trackerIndex], param).apply(tracker);
+    }
+
+    public boolean any(Predicate<ST> test)
+    {
+        for (ST tracker : trackers)
+        {
+            if (test.test(tracker))
+                return true;
+        }
+        return false;
+    }
+
+    public boolean all(Predicate<ST> test)
+    {
+        for (ST tracker : trackers)
+        {
+            if (!test.test(tracker))
+                return false;
+        }
+        return true;
+    }
+
+    public boolean hasFailed()
+    {
+        return any(ShardTracker::hasFailed);
+    }
+
+    public boolean hasInFlight()
+    {
+        return any(ShardTracker::hasInFlight);
+    }
+
+    public boolean hasReachedQuorum()
+    {
+        return all(ShardTracker::hasReachedQuorum);
+    }
+
+    public Set<Id> nodes()
+    {
+        return topologies.nodes();
+    }
+
+    @VisibleForTesting
+    public ST unsafeGet(int topologyIdx, int shardIdx)
+    {
+        if (shardIdx >= maxShardsPerEpoch())
+            throw new IndexOutOfBoundsException();
+        return trackers[topologyOffset(topologyIdx) + shardIdx];
+    }
+
+    protected int maxShardsPerEpoch()
+    {
+        return maxShardsPerEpoch;
+    }
+
+    public ST unsafeGet(int i)
+    {
+        Preconditions.checkArgument(topologies.size() == 1);
+        return unsafeGet(0, i);
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
index 568ddf7..389a285 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -18,47 +18,155 @@
 
 package accord.coordinate.tracking;
 
-import java.util.function.Function;
-import java.util.function.IntFunction;
-
 import accord.local.Node;
 import accord.topology.Shard;
 import accord.topology.Topologies;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.function.BiFunction;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
 
-public class FastPathTracker<T extends FastPathTracker.FastPathShardTracker> extends AbstractQuorumTracker<T>
+public class FastPathTracker extends AbstractTracker<FastPathTracker.FastPathShardTracker, Node.Id>
 {
-    public abstract static class FastPathShardTracker extends QuorumTracker.QuorumShardTracker
+    private static final ShardOutcome<FastPathTracker> NewFastPathSuccess = tracker -> {
+        --tracker.waitingOnShards;
+        --tracker.waitingOnFastPathSuccess;
+        return ShardOutcomes.Success;
+    };
+
+    public static class FastPathShardTracker extends ShardTracker
     {
-        protected int fastPathAccepts = 0;
+        protected int fastPathAccepts, accepts;
+        protected int fastPathFailures, failures;
 
         public FastPathShardTracker(Shard shard)
         {
             super(shard);
         }
 
-        public abstract boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp);
+        // return NewQuorumSuccess ONLY once fast path is rejected
+        public ShardOutcome<? super FastPathTracker> onQuorumSuccess(Node.Id node)
+        {
+            ++accepts;
+            if (!shard.fastPathElectorate.contains(node))
+                return quorumIfRejectsFastPath();
+
+            ++fastPathFailures;
+            if (isNewFastPathReject() && hasReachedQuorum())
+                return Success;
+
+            if (isNewSlowPathSuccess() && hasRejectedFastPath())
+                return Success;
+
+            return NoChange;
+        }
+
+        public ShardOutcome<? super FastPathTracker> onMaybeFastPathSuccess(Node.Id node)
+        {
+            ++accepts;
+            if (shard.fastPathElectorate.contains(node))
+            {
+                ++fastPathAccepts;
+                if (isNewFastPathSuccess())
+                    return NewFastPathSuccess;
+            }
+            return quorumIfRejectsFastPath();
+        }
+
+        public ShardOutcome<? super FastPathTracker> onFailure(@Nonnull Node.Id from)
+        {
+            if (++failures > shard.maxFailures)
+                return Fail;
+
+            if (shard.fastPathElectorate.contains(from)) {
+                ++fastPathFailures;
+
+                if (isNewFastPathReject() && accepts >= shard.slowPathQuorumSize)
+                    return Success;
+            }
+
+            return NoChange;
+        }
+
+        private ShardOutcome<? super FastPathTracker> quorumIfRejectsFastPath()
+        {
+            return isNewSlowPathSuccess() && hasRejectedFastPath() ? Success : NoChange;
+        }
+
+        private boolean isNewSlowPathSuccess()
+        {
+            return accepts == shard.slowPathQuorumSize;
+        }
 
-        public void onSuccess(Node.Id node, boolean withFastPathTimestamp)
+        private boolean isNewFastPathReject()
         {
-            if (oneSuccess(node) && includeInFastPath(node, withFastPathTimestamp))
-                fastPathAccepts++;
+            return fastPathFailures == 1 + shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
         }
 
-        public abstract boolean hasMetFastPathCriteria();
+        private boolean isNewFastPathSuccess()
+        {
+            return fastPathAccepts == shard.fastPathQuorumSize;
+        }
+
+        @VisibleForTesting
+        public boolean hasMetFastPathCriteria()
+        {
+            return fastPathAccepts >= shard.fastPathQuorumSize;
+        }
+
+        @VisibleForTesting
+        public boolean hasRejectedFastPath()
+        {
+            return shard.rejectsFastPath(fastPathFailures);
+        }
+
+        boolean hasInFlight()
+        {
+            return accepts + failures < shard.rf();
+        }
+
+        boolean hasReachedQuorum()
+        {
+            return accepts >= shard.slowPathQuorumSize;
+        }
+
+        boolean hasFailed()
+        {
+            return failures > shard.maxFailures;
+        }
+    }
+
+    int waitingOnFastPathSuccess; // if we reach zero, we have determined the fast path outcome of every shard
+    public FastPathTracker(Topologies topologies)
+    {
+        super(topologies, FastPathShardTracker[]::new, FastPathShardTracker::new);
+        this.waitingOnFastPathSuccess = super.waitingOnShards;
+    }
+
+    public RequestStatus recordSuccess(Node.Id from, boolean withFastPathTimestamp)
+    {
+        if (withFastPathTimestamp)
+            return recordResponse(from, FastPathShardTracker::onMaybeFastPathSuccess);
+
+        return recordResponse(from, FastPathShardTracker::onQuorumSuccess);
     }
 
-    public FastPathTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+    public RequestStatus recordFailure(Node.Id from)
     {
-        super(topologies, arrayFactory, trackerFactory);
+        return recordResponse(from, FastPathShardTracker::onFailure);
     }
 
-    public void recordSuccess(Node.Id node, boolean withFastPathTimestamp)
+    protected RequestStatus recordResponse(Node.Id node, BiFunction<? super FastPathShardTracker, Node.Id, ? extends ShardOutcome<? super FastPathTracker>> function)
     {
-        forEachTrackerForNode(node, (tracker, n) -> tracker.onSuccess(n, withFastPathTimestamp));
+        return recordResponse(this, node, function, node);
     }
 
-    public boolean hasMetFastPathCriteria()
+    public boolean hasFastPathAccepted()
     {
-        return all(FastPathShardTracker::hasMetFastPathCriteria);
+        return waitingOnFastPathSuccess == 0;
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
index 428c340..ba06181 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -18,27 +18,74 @@
 
 package accord.coordinate.tracking;
 
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
 import accord.local.Node;
+import accord.topology.Shard;
 import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
 
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class QuorumTracker extends AbstractTracker<QuorumTracker.QuorumShardTracker, Object>
 {
+    public static class QuorumShardTracker extends ShardTracker
+    {
+        protected int successes;
+        protected int failures;
+
+        public QuorumShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        public ShardOutcomes onSuccess(Object ignore)
+        {
+            return ++successes == shard.slowPathQuorumSize ? Success : NoChange;
+        }
+
+        // return true iff hasFailed()
+        public ShardOutcomes onFailure(Object ignore)
+        {
+            return ++failures > shard.maxFailures ? Fail : NoChange;
+        }
+
+        public boolean hasReachedQuorum()
+        {
+            return successes >= shard.slowPathQuorumSize;
+        }
+
+        boolean hasInFlight()
+        {
+            return successes + failures < shard.rf();
+        }
+
+        boolean hasFailures()
+        {
+            return failures > 0;
+        }
+
+        boolean hasFailed()
+        {
+            return failures > shard.maxFailures;
+        }
+    }
+
     public QuorumTracker(Topologies topologies)
     {
         super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
     }
 
-    public QuorumTracker(Topology topology)
+    public RequestStatus recordSuccess(Node.Id node)
+    {
+        return recordResponse(this, node, QuorumShardTracker::onSuccess, null);
+    }
+
+    // return true iff hasFailed()
+    public RequestStatus recordFailure(Node.Id node)
     {
-        super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
+        return recordResponse(this, node, QuorumShardTracker::onFailure, null);
     }
 
-    // return true iff hasReachedQuorum()
-    public boolean success(Node.Id node)
+    public boolean hasFailures()
     {
-        return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
+        return any(QuorumShardTracker::hasFailures);
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
index 509da5e..316a7a0 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -18,232 +18,281 @@
 
 package accord.coordinate.tracking;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.IntFunction;
+import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 
+import accord.topology.Shard;
+import accord.topology.ShardSelection;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
-import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
 import accord.local.Node.Id;
-import accord.topology.Shard;
 import accord.topology.Topologies;
 
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
 import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
 
-public class ReadTracker<T extends ReadShardTracker> extends AbstractResponseTracker<T>
+public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker, Boolean>
 {
+    private static final ShardOutcome<ReadTracker> DataSuccess = tracker -> {
+        --tracker.waitingOnShards;
+        --tracker.waitingOnData;
+        return ShardOutcomes.Success;
+    };
+
     public static class ReadShardTracker extends ShardTracker
     {
-        private boolean hasData = false;
+        protected boolean hasData = false;
+        protected int quorum; // if !hasData, a slowPathQuorum will trigger success
         protected int inflight;
-        private int contacted;
-        private int slow;
+        protected int contacted;
+        protected int slow;
 
         public ReadShardTracker(Shard shard)
         {
             super(shard);
         }
 
-        public void recordInflightRead(Id node)
+        public ShardOutcome<? super ReadTracker> recordInFlightRead(boolean ignore)
         {
             ++contacted;
             ++inflight;
+            return NoChange;
         }
 
-        // TODO: this is clunky, restructure the tracker to handle this more cleanly
-        // record a node as contacted, even though it isn't
-        public void recordContacted(Id node)
+        public ShardOutcome<? super ReadTracker> recordSlowResponse(boolean ignore)
         {
-            ++contacted;
+            Preconditions.checkState(!hasFailed());
+            ++slow;
+
+            if (shouldRead() && canRead())
+                return SendMore;
+
+            return NoChange;
         }
 
-        public void recordSlowRead(Id node)
+        /**
+         * Have received a requested data payload with desired contents
+         */
+        public ShardOutcome<? super ReadTracker> recordReadSuccess(boolean isSlow)
         {
-            ++slow;
+            Preconditions.checkState(inflight > 0);
+            boolean hadSucceeded = hasSucceeded();
+            --inflight;
+            if (isSlow) --slow;
+            hasData = true;
+            return hadSucceeded ? NoChange : DataSuccess;
         }
 
-        public void unrecordSlowRead(Id node)
+        public ShardOutcome<? super ReadTracker> recordQuorumReadSuccess(boolean isSlow)
         {
-            --slow;
+            Preconditions.checkState(inflight > 0);
+            boolean hadSucceeded = hasSucceeded();
+            --inflight;
+            ++quorum;
+            if (isSlow) --slow;
+
+            if (hadSucceeded)
+                return NoChange;
+
+            if (quorum == shard.slowPathQuorumSize)
+                return Success;
+
+            return ensureProgressOrFail();
         }
 
-        public boolean recordReadSuccess(Id node)
+        public ShardOutcomes recordReadFailure(boolean isSlow)
         {
-            Preconditions.checkArgument(shard.nodes.contains(node));
             Preconditions.checkState(inflight > 0);
             --inflight;
-            hasData = true;
-            return true;
+            if (isSlow) --slow;
+
+            return ensureProgressOrFail();
         }
 
-        public boolean shouldRead()
+        private ShardOutcomes ensureProgressOrFail()
         {
-            return !hasData && inflight == slow;
+            if (!shouldRead())
+                return NoChange;
+
+            if (canRead())
+                return SendMore;
+
+            return hasInFlight() ? NoChange : Fail;
         }
 
-        public boolean recordReadFailure(Id node)
+        public boolean hasReachedQuorum()
         {
-            Preconditions.checkState(inflight > 0);
-            --inflight;
-            return true;
+            return quorum >= shard.slowPathQuorumSize;
         }
 
-        public boolean hasCompletedRead()
+        public boolean hasSucceeded()
         {
-            return hasData;
+            return hasData() || hasReachedQuorum();
+        }
+
+        @Override
+        boolean hasInFlight()
+        {
+            return inflight > 0;
+        }
+
+        public boolean shouldRead()
+        {
+            return !hasSucceeded() && inflight == slow;
+        }
+
+        public boolean canRead()
+        {
+            return contacted < shard.rf();
         }
 
         public boolean hasFailed()
         {
-            return !hasData && inflight == 0 && contacted == shard.nodes.size();
+            return !hasData && inflight == 0 && contacted == shard.nodes.size() && !hasReachedQuorum();
+        }
+
+        public boolean hasData()
+        {
+            return hasData;
         }
     }
 
     // TODO: abstract the candidate selection process so the implementation may prioritise based on distance/health etc
-    private final List<Id> candidates;
-    private final Set<Id> inflight;
+    // TODO: faster Id sets and arrays using primitive ints when unambiguous
+    final Set<Id> inflight;
+    final List<Id> candidates;
     private Set<Id> slow;
+    protected int waitingOnData;
 
-    public static ReadTracker<ReadShardTracker> create(Topologies topologies)
-    {
-        return new ReadTracker<>(topologies, ReadShardTracker[]::new, ReadShardTracker::new);
-    }
-
-    public ReadTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+    public ReadTracker(Topologies topologies)
     {
-        super(topologies, arrayFactory, trackerFactory);
-        candidates = new ArrayList<>(topologies.nodes());
-        inflight = newHashSetWithExpectedSize(trackerCount());
+        super(topologies, ReadShardTracker[]::new, ReadShardTracker::new);
+        this.candidates = new ArrayList<>(topologies.nodes()); // TODO: copyOfNodesAsList to avoid unnecessary copies
+        this.inflight = newHashSetWithExpectedSize(maxShardsPerEpoch());
+        this.waitingOnData = waitingOnShards;
     }
 
     @VisibleForTesting
-    void recordInflightRead(Id node)
+    protected void recordInFlightRead(Id node)
     {
         if (!inflight.add(node))
             throw new IllegalStateException(node + " already in flight");
 
-        forEachTrackerForNode(node, ReadShardTracker::recordInflightRead);
-    }
-
-    @VisibleForTesting
-    private void recordContacted(Id node)
-    {
-        forEachTrackerForNode(node, ReadShardTracker::recordContacted);
-    }
-
-    public void recordSlowRead(Id node)
-    {
-        if (slow == null)
-            slow = newHashSetWithExpectedSize(trackerCount());
-
-        if (slow.add(node))
-        {
-            forEachTrackerForNode(node, ReadShardTracker::recordSlowRead);
-        }
+        recordResponse(this, node, ReadShardTracker::recordInFlightRead, false);
     }
 
-    protected void recordResponse(Id node)
+    private boolean receiveResponseIsSlow(Id node)
     {
         if (!inflight.remove(node))
             throw new IllegalStateException("Nothing in flight for " + node);
 
-        if (slow != null && slow.remove(node))
-            forEachTrackerForNode(node, ReadShardTracker::unrecordSlowRead);
+        return slow != null && slow.remove(node);
     }
 
-    public boolean recordReadSuccess(Id node)
+    /**
+     * Record a response that immediately satisfies the criteria for the shards the node participates in
+     */
+    protected RequestStatus recordSlowResponse(Id from)
     {
-        recordResponse(node);
-        return anyForNode(node, ReadShardTracker::recordReadSuccess);
-    }
+        if (!inflight.contains(from))
+            throw new IllegalStateException();
 
-    public boolean recordReadFailure(Id node)
-    {
-        recordResponse(node);
-        return anyForNode(node, ReadShardTracker::recordReadFailure);
-    }
+        if (slow == null)
+            slow = newHashSetWithExpectedSize(maxShardsPerEpoch());
 
-    public boolean hasCompletedRead()
-    {
-        return all(ReadShardTracker::hasCompletedRead);
+        if (!slow.add(from)) // we can mark slow responses due to ReadCoordinator.TryAlternative OR onSlowResponse
+            return RequestStatus.NoChange;
+
+        return recordResponse(this, from, ReadShardTracker::recordSlowResponse, true);
     }
 
-    public boolean hasInFlight()
+    /**
+     * Record a response that immediately satisfies the criteria for the shards the node participates in
+     */
+    protected RequestStatus recordReadSuccess(Id from)
     {
-        return !inflight.isEmpty();
+        return recordResponse(from, ReadShardTracker::recordReadSuccess);
     }
 
-    public boolean hasFailed()
+    /**
+     * Record a response that contributes to a potential quorum decision (i.e. accept once we have such a quorum)
+     */
+    protected RequestStatus recordQuorumReadSuccess(Id from)
     {
-        return any(ReadShardTracker::hasFailed);
+        return recordResponse(from, ReadShardTracker::recordQuorumReadSuccess);
     }
 
-    private int intersectionSize(Id node, Set<ReadShardTracker> target)
+    /**
+     * Record a failure response
+     */
+    protected RequestStatus recordReadFailure(Id from)
     {
-        return matchingTrackersForNode(node, target::contains);
+        return recordResponse(from, ReadShardTracker::recordReadFailure);
     }
 
-    private int compareIntersections(Id left, Id right, Set<ReadShardTracker> target)
+    protected RequestStatus recordResponse(Id from, BiFunction<? super ReadShardTracker, Boolean, ? extends ShardOutcome<? super ReadTracker>> function)
     {
-        return Integer.compare(intersectionSize(left, target), intersectionSize(right, target));
+        boolean isSlow = receiveResponseIsSlow(from);
+        return recordResponse(this, from, function, isSlow);
     }
 
-    /**
-     * Return the smallest set of nodes needed to satisfy required reads.
-     *
-     * Returns null if the read cannot be completed.
-     *
-     * TODO: prioritisation of nodes should be implementation-defined
-     */
-    public Set<Id> computeMinimalReadSetAndMarkInflight()
+    public <T1> RequestStatus trySendMore(BiConsumer<T1, Id> contact, T1 with)
     {
-        Set<ReadShardTracker> toRead = foldl((tracker, accumulate) -> {
-            if (!tracker.shouldRead())
-                return accumulate;
+        ShardSelection toRead;
+        {
+            ShardSelection tmp = null;
+            for (int i = 0 ; i < trackers.length ; ++i)
+            {
+                ReadShardTracker tracker = trackers[i];
+                if (tracker == null || !tracker.shouldRead() || !tracker.canRead())
+                    continue;
 
-            if (accumulate == null)
-                accumulate = new LinkedHashSet<>(); // determinism
+                if (tmp == null)
+                    tmp = new ShardSelection(); // determinism
 
-            accumulate.add(tracker);
-            return accumulate;
-        }, null);
+                tmp.set(i);
+            }
+            toRead = tmp;
+        }
 
-        if (toRead == null)
-            return Collections.emptySet();
+        Preconditions.checkState(toRead != null, "We were asked to read more, but found no shards in need of reading more");
 
-        assert !toRead.isEmpty();
-        Set<Id> nodes = new HashSet<>();
-        while (!toRead.isEmpty())
+        // TODO: maybe for each additional candidate do one linear compare run to find better secondary match
+        //       OR at least discount candidates that do not contribute additional knowledge beyond those additional
+        //       candidates already contacted, since implementations are likely to sort primarily by health
+        candidates.sort((a, b) -> topologies().compare(a, b, toRead));
+        int i = candidates.size() - 1;
+        while (i >= 0)
         {
-            if (candidates.isEmpty())
-            {
-                if (!nodes.isEmpty())
-                    nodes.forEach(this::recordContacted);
-                return null;
-            }
+            Id candidate = candidates.get(i);
+            topologies().forEach((ti, topology) -> {
+                int offset = topologyOffset(ti);
+                topology.forEachOn(candidate, (si, s) -> toRead.clear(offset + si));
+            });
 
-            // TODO: Topology needs concept of locality/distance
-            candidates.sort((a, b) -> compareIntersections(a, b, toRead));
+            if (toRead.isEmpty())
+                break;
 
-            int i = candidates.size() - 1;
-            Id node = candidates.get(i);
-            nodes.add(node);
-            candidates.remove(i);
-            forEachTrackerForNode(node, (tracker, ignore) -> toRead.remove(tracker));
+            --i;
         }
 
-        // must recordInFlightRead after loop, as we might return null if the reads are insufficient to make progress
-        // but in this case we need the tracker to
-        nodes.forEach(this::recordInflightRead);
+        if (!toRead.isEmpty())
+            return RequestStatus.NoChange;
 
-        return nodes;
+        for (int j = candidates.size() - 1; j >= i; --j)
+        {
+            Id candidate = candidates.get(j);
+            recordInFlightRead(candidate);
+            contact.accept(with, candidate);
+            candidates.remove(j);
+        }
+        return RequestStatus.NoChange;
     }
 
+    public boolean hasData()
+    {
+        return all(ReadShardTracker::hasData);
+    }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/RecoveryTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/RecoveryTracker.java
new file mode 100644
index 0000000..10c8e28
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/RecoveryTracker.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class RecoveryTracker extends AbstractTracker<RecoveryTracker.RecoveryShardTracker, Node.Id>
+{
+    public static class RecoveryShardTracker extends QuorumShardTracker
+    {
+        protected int fastPathRejects = 0;
+
+        RecoveryShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        ShardOutcomes onSuccessRejectFastPath(Node.Id from)
+        {
+            if (shard.fastPathElectorate.contains(from))
+                ++fastPathRejects;
+            return onSuccess(from);
+        }
+
+        boolean rejectsFastPath()
+        {
+            return fastPathRejects > shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
+        }
+    }
+
+    public RecoveryTracker(Topologies topologies)
+    {
+        super(topologies, RecoveryShardTracker[]::new, RecoveryShardTracker::new);
+    }
+
+    public RequestStatus recordSuccess(Node.Id node, boolean withFastPathTimestamp)
+    {
+        if (withFastPathTimestamp)
+            return recordResponse(this, node, RecoveryShardTracker::onSuccess, node);
+
+        return recordResponse(this, node, RecoveryShardTracker::onSuccessRejectFastPath, node);
+    }
+
+    // return true iff hasFailed()
+    public RequestStatus recordFailure(Node.Id from)
+    {
+        return recordResponse(this, from, RecoveryShardTracker::onFailure, from);
+    }
+
+    public boolean rejectsFastPath()
+    {
+        return any(RecoveryShardTracker::rejectsFastPath);
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/RequestStatus.java b/accord-core/src/main/java/accord/coordinate/tracking/RequestStatus.java
new file mode 100644
index 0000000..a0117f3
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/RequestStatus.java
@@ -0,0 +1,8 @@
+package accord.coordinate.tracking;
+
+public enum RequestStatus
+{
+    Failed,
+    NoChange,
+    Success
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ShardTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ShardTracker.java
new file mode 100644
index 0000000..ee628be
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ShardTracker.java
@@ -0,0 +1,17 @@
+package accord.coordinate.tracking;
+
+import accord.topology.Shard;
+
+public abstract class ShardTracker
+{
+    public final Shard shard;
+
+    public ShardTracker(Shard shard)
+    {
+        this.shard = shard;
+    }
+
+    abstract boolean hasFailed();
+    abstract boolean hasReachedQuorum();
+    abstract boolean hasInFlight();
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index dd3eaa6..c942a0b 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -85,6 +85,7 @@ public class InMemoryCommandStore
             return commands.get(txnId);
         }
 
+        // TODO (soon): mimic caching to test C* behaviour
         public Command ifLoaded(TxnId txnId)
         {
             return commands.get(txnId);
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
index 0e82b4e..d14867b 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
@@ -105,6 +105,7 @@ public class InMemoryCommandsForKey extends CommandsForKey
         }
     }
 
+    // TODO (now): add validation that anything inserted into *committedBy* has everything prior in its dependencies
     private final Key key;
     private final InMemoryCommandTimeseries<TxnIdWithExecuteAt> uncommitted = new InMemoryCommandTimeseries<>(cmd -> new TxnIdWithExecuteAt(cmd.txnId(), cmd.executeAt()));
     private final InMemoryCommandTimeseries<TxnId> committedById = new InMemoryCommandTimeseries<>(Command::txnId);
diff --git a/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java
new file mode 100644
index 0000000..39bac13
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java
@@ -0,0 +1,51 @@
+package accord.impl;
+
+import accord.api.TopologySorter;
+import accord.local.Node;
+import accord.topology.ShardSelection;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+
+public class SizeOfIntersectionSorter implements TopologySorter
+{
+    public static final TopologySorter.Supplier SUPPLIER = new Supplier() {
+        @Override
+        public TopologySorter get(Topology topology)
+        {
+            return new SizeOfIntersectionSorter(new Topologies.Single(this, topology));
+        }
+
+        @Override
+        public TopologySorter get(Topologies topologies)
+        {
+            return new SizeOfIntersectionSorter(topologies);
+        }
+    };
+
+    final Topologies topologies;
+    SizeOfIntersectionSorter(Topologies topologies)
+    {
+        this.topologies = topologies;
+    }
+
+    @Override
+    public int compare(Node.Id node1, Node.Id node2, ShardSelection shards)
+    {
+        int maxShardsPerEpoch = topologies.maxShardsPerEpoch();
+        int count1 = 0, count2 = 0;
+        for (int i = 0, mi = topologies.size() ; i < mi ; ++i)
+        {
+            Topology topology = topologies.get(i);
+            count1 += count(node1, shards, i * maxShardsPerEpoch, topology);
+            count2 += count(node2, shards, i * maxShardsPerEpoch, topology);
+        }
+
+        // sort more intersections later
+        return count1 - count2;
+    }
+
+    private static int count(Node.Id node, ShardSelection shards, int offset, Topology topology)
+    {
+        return topology.foldlIntOn(node, (i, shard, v) -> shard.get(i) ? v + 1 : v, shards, offset, 0, 0);
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 20c990e..3f86efa 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -125,13 +125,13 @@ public class Node implements ConfigurationService.Listener, NodeTimeService
     private final Map<TxnId, Future<? extends Outcome>> coordinating = new ConcurrentHashMap<>();
 
     public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
-                Supplier<DataStore> dataSupplier, Agent agent, Random random, Scheduler scheduler,
+                Supplier<DataStore> dataSupplier, Agent agent, Random random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
                 Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
     {
         this.id = id;
         this.messageSink = messageSink;
         this.configService = configService;
-        this.topology = new TopologyManager(id);
+        this.topology = new TopologyManager(topologySorter, id);
         this.nowSupplier = nowSupplier;
         Topology topology = configService.currentTopology();
         this.now = new AtomicReference<>(new Timestamp(topology.epoch(), nowSupplier.getAsLong(), 0, id));
diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java b/accord-core/src/main/java/accord/local/SaveStatus.java
index 5c2d6f4..87d4b0a 100644
--- a/accord-core/src/main/java/accord/local/SaveStatus.java
+++ b/accord-core/src/main/java/accord/local/SaveStatus.java
@@ -22,7 +22,6 @@ import accord.local.Status.ExecutionStatus;
 import accord.local.Status.Known;
 import accord.local.Status.Phase;
 
-import static accord.local.Status.ExecutionStatus.*;
 import static accord.local.Status.Known.*;
 
 /**
diff --git a/accord-core/src/main/java/accord/local/SyncCommandStores.java b/accord-core/src/main/java/accord/local/SyncCommandStores.java
index fb07b70..4e2d0fd 100644
--- a/accord-core/src/main/java/accord/local/SyncCommandStores.java
+++ b/accord-core/src/main/java/accord/local/SyncCommandStores.java
@@ -10,6 +10,7 @@ import accord.utils.MapReduceConsume;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
+// TODO (soon): introduce new CommandStores that mimics asynchrony by integrating with Cluster scheduling for List workload
 public class SyncCommandStores extends CommandStores<SyncCommandStores.SyncCommandStore>
 {
     public interface SafeSyncCommandStore extends SafeCommandStore
diff --git a/accord-core/src/main/java/accord/messages/BeginInvalidation.java b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
index 97261ae..479110b 100644
--- a/accord-core/src/main/java/accord/messages/BeginInvalidation.java
+++ b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
@@ -40,7 +40,7 @@ public class BeginInvalidation extends AbstractEpochRequest<BeginInvalidation.In
         if (!command.preacceptInvalidate(ballot))
             return new InvalidateNack(command.promised(), command.homeKey());
 
-        return new InvalidateOk(command.status(), command.route(), command.homeKey());
+        return new InvalidateOk(command.saveStatus(), command.route(), command.homeKey());
     }
 
     @Override
@@ -83,11 +83,11 @@ public class BeginInvalidation extends AbstractEpochRequest<BeginInvalidation.In
 
     public static class InvalidateOk implements InvalidateReply
     {
-        public final Status status;
+        public final SaveStatus status;
         public final @Nullable AbstractRoute route;
         public final @Nullable RoutingKey homeKey;
 
-        public InvalidateOk(Status status, @Nullable AbstractRoute route, @Nullable RoutingKey homeKey)
+        public InvalidateOk(SaveStatus status, @Nullable AbstractRoute route, @Nullable RoutingKey homeKey)
         {
             this.status = status;
             this.route = route;
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 71028f6..621e8e4 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -194,6 +194,7 @@ public class ReadData extends AbstractEpochRequest<ReadData.ReadNack> implements
         }
         else if (failure != null)
         {
+            // TODO (soon): test
             node.reply(replyTo, replyContext, ReadNack.Error);
             data = null;
             node.agent().onUncaughtException(failure); // TODO: probably a better way to handle this, as might not be uncaught
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index d4c9c91..8a729dc 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -82,6 +82,11 @@ public class Shard
         return (f + electorate)/2 + 1;
     }
 
+    public boolean rejectsFastPath(int rejectCount)
+    {
+        return rejectCount > fastPathElectorate.size() - fastPathQuorumSize;
+    }
+
     static int slowPathQuorumSize(int replicas)
     {
         return replicas - maxToleratedFailures(replicas);
diff --git a/accord-core/src/main/java/accord/topology/ShardSelection.java b/accord-core/src/main/java/accord/topology/ShardSelection.java
new file mode 100644
index 0000000..41f1cb9
--- /dev/null
+++ b/accord-core/src/main/java/accord/topology/ShardSelection.java
@@ -0,0 +1,7 @@
+package accord.topology;
+
+import java.util.BitSet;
+
+public class ShardSelection extends BitSet
+{
+}
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java
index ea77a30..6de8ddb 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -18,18 +18,19 @@
 
 package accord.topology;
 
+import accord.api.TopologySorter;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.primitives.KeyRanges;
 import accord.utils.IndexedConsumer;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 import java.util.*;
-import java.util.function.Consumer;
 
 // TODO: we can probably most efficiently create a new synthetic Topology that applies for a range of epochs
 //       and permit Topology to implement it, so that
-public interface Topologies
+public interface Topologies extends TopologySorter
 {
     Topology current();
 
@@ -42,8 +43,6 @@ public interface Topologies
         return current().epoch;
     }
 
-    boolean fastPathPermitted();
-
     // topologies are stored in reverse epoch order, with the highest epoch at idx 0
     Topology get(int i);
 
@@ -57,26 +56,18 @@ public interface Topologies
 
     Set<Node.Id> copyOfNodes();
 
+    int estimateUniqueNodes();
+
     KeyRanges computeRangesForNode(Id node);
 
+    int maxShardsPerEpoch();
+
     default void forEach(IndexedConsumer<Topology> consumer)
     {
         for (int i=0, mi=size(); i<mi; i++)
             consumer.accept(i, get(i));
     }
 
-    default void forEachShard(Consumer<Shard> consumer)
-    {
-        for (int i=0, mi=size(); i<mi; i++)
-        {
-            Topology topology = get(i);
-            for (int j=0, mj=topology.size(); j<mj; j++)
-            {
-                consumer.accept(topology.get(j));
-            }
-        }
-    }
-
     static boolean equals(Topologies t, Object o)
     {
         if (o == t)
@@ -122,13 +113,19 @@ public interface Topologies
 
     class Single implements Topologies
     {
+        private final TopologySorter sorter;
         private final Topology topology;
-        private final boolean fastPathPermitted;
 
-        public Single(Topology topology, boolean fastPathPermitted)
+        public Single(TopologySorter.Supplier sorter, Topology topology)
         {
             this.topology = topology;
-            this.fastPathPermitted = fastPathPermitted;
+            this.sorter = sorter.get(this);
+        }
+
+        public Single(TopologySorter sorter, Topology topology)
+        {
+            this.topology = topology;
+            this.sorter = sorter;
         }
 
         @Override
@@ -151,12 +148,6 @@ public interface Topologies
             return currentEpoch();
         }
 
-        @Override
-        public boolean fastPathPermitted()
-        {
-            return fastPathPermitted;
-        }
-
         @Override
         public Topology get(int i)
         {
@@ -195,12 +186,24 @@ public interface Topologies
             return new HashSet<>(nodes());
         }
 
+        @Override
+        public int estimateUniqueNodes()
+        {
+            return topology.nodes().size();
+        }
+
         @Override
         public KeyRanges computeRangesForNode(Id node)
         {
             return topology.rangesForNode(node);
         }
 
+        @Override
+        public int maxShardsPerEpoch()
+        {
+            return topology.size();
+        }
+
         @Override
         public boolean equals(Object obj)
         {
@@ -218,20 +221,33 @@ public interface Topologies
         {
             return Topologies.toString(this);
         }
+
+        @Override
+        public int compare(Id node1, Id node2, ShardSelection shards)
+        {
+            return sorter.compare(node1, node2, shards);
+        }
     }
 
     class Multi implements Topologies
     {
+        private final TopologySorter sorter;
         private final List<Topology> topologies;
+        private final int maxShardsPerEpoch;
 
-        public Multi(int initialCapacity)
+        public Multi(TopologySorter.Supplier sorter, int initialCapacity)
         {
             this.topologies = new ArrayList<>(initialCapacity);
+            this.sorter = sorter.get(this);
+            int maxShardsPerEpoch = 0;
+            for (int i = 0 ; i < topologies.size() ; ++i)
+                maxShardsPerEpoch = Math.max(maxShardsPerEpoch, topologies.get(i).size());
+            this.maxShardsPerEpoch = maxShardsPerEpoch;
         }
 
-        public Multi(Topology... topologies)
+        public Multi(TopologySorter.Supplier sorter, Topology... topologies)
         {
-            this(topologies.length);
+            this(sorter, topologies.length);
             for (Topology topology : topologies)
                 add(topology);
         }
@@ -257,15 +273,6 @@ public interface Topologies
             return get(size() - 1).epoch;
         }
 
-        @Override
-        public boolean fastPathPermitted()
-        {
-            // TODO (soon): this is overly restrictive: we can still take the fast-path during topology movements,
-            //              just not for transactions started across the initiation of a topology movement (i.e.
-            //              where the epoch changes while the transaction is being pre-accepted)
-            return false;
-        }
-
         @Override
         public Topology get(int i)
         {
@@ -298,10 +305,18 @@ public interface Topologies
             return false;
         }
 
+        @Override
+        public int estimateUniqueNodes()
+        {
+            // just guess at one additional node per epoch, and at most twice as many nodes
+            int estSize = get(0).nodes().size();
+            return Math.min(estSize * 2, estSize + size() - 1);
+        }
+
         @Override
         public Set<Node.Id> nodes()
         {
-            Set<Node.Id> result = new HashSet<>();
+            Set<Node.Id> result = Sets.newHashSetWithExpectedSize(estimateUniqueNodes());
             for (int i=0,mi=size(); i<mi; i++)
                 result.addAll(get(i).nodes());
             return result;
@@ -322,6 +337,12 @@ public interface Topologies
             return ranges;
         }
 
+        @Override
+        public int maxShardsPerEpoch()
+        {
+            return maxShardsPerEpoch;
+        }
+
         public void add(Topology topology)
         {
             Preconditions.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
@@ -345,5 +366,11 @@ public interface Topologies
         {
             return Topologies.toString(this);
         }
+
+        @Override
+        public int compare(Id node1, Id node2, ShardSelection shards)
+        {
+            return sorter.compare(node1, node2, shards);
+        }
     }
 }
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index a9411ae..e2ad58f 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -19,20 +19,16 @@
 package accord.topology;
 
 import java.util.*;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
 import accord.api.RoutingKey;
 import accord.local.Node.Id;
-import accord.api.Key;
 import accord.primitives.AbstractKeys;
 import accord.primitives.KeyRange;
 import accord.primitives.KeyRanges;
-import accord.primitives.Keys;
-import accord.utils.IndexedConsumer;
-import accord.utils.IndexedBiFunction;
-import accord.utils.IndexedIntFunction;
-import accord.utils.IndexedPredicate;
+import accord.utils.*;
 
 import static accord.utils.SortedArrays.exponentialSearch;
 
@@ -44,6 +40,12 @@ public class Topology
     final KeyRanges ranges;
     final Map<Id, NodeInfo> nodeLookup;
     final KeyRanges subsetOfRanges;
+    /**
+     * This array is used to permit cheaper sharing of Topology objects between requests, as we must only specify
+     * the indexes within the parent Topology that we contain. This also permits us to perform efficient merges with
+     * {@code NodeInfo.supersetIndexes} to find the shards that intersect a given node without recomputing the NodeInfo.
+     * TODO: do not recompute nodeLookup
+     */
     final int[] supersetIndexes;
 
     static class NodeInfo
@@ -227,6 +229,7 @@ public class Topology
         int[] newSubset = new int[Math.min(select.size(), subsetOfRanges.size())];
         for (int i = 0 ; i < select.size() ; )
         {
+            // TODO: use SortedArrays.findNextIntersection
             // find the range containing the key at i
             subsetIndex = subsetOfRanges.rangeIndexForKey(subsetIndex, subsetOfRanges.size(), select.get(i));
             if (subsetIndex < 0 || subsetIndex >= subsetOfRanges.size())
@@ -272,42 +275,46 @@ public class Topology
         return accumulator;
     }
 
-    /**
-     * @param on the node to limit our selection to
-     * @param select may be a superSet of the keys owned by {@code on} but not of this {@code Topology}
-     */
-    public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
+    public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
     {
         NodeInfo info = nodeLookup.get(on);
-        for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
+        if (info == null)
+            return;
+        int[] a = supersetIndexes, b = info.supersetIndexes;
+        int ai = 0, bi = 0;
+        while (ai < a.length && bi < b.length)
         {
-            Key key = select.get(i);
-            Shard shard = shards[supersetIndexes[j]];
-            int c = supersetIndexes[j] - info.supersetIndexes[k];
-            if (c < 0) ++j;
-            else if (c > 0) ++k;
+            if (a[ai] == b[bi])
+            {
+                consumer.accept(ai, shards[a[ai]]);
+                ++ai; ++bi;
+            }
+            else if (a[ai] < b[bi])
+            {
+                ai = exponentialSearch(a, ai + 1, a.length, b[bi]);
+                if (ai < 0) ai = -1 -ai;
+            }
             else
             {
-                int rcmp = shard.range.compareKey(key);
-                if (rcmp < 0) ++i;
-                else if (rcmp == 0) { consumer.accept(j, shard); i++; j++; k++; }
-                else { j++; k++; }
+                bi = exponentialSearch(b, bi + 1, b.length, a[ai]);
+                if (bi < 0) bi = -1 -bi;
             }
         }
     }
 
-    public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
+    public <P1, P2, P3, O> O mapReduceOn(Id on, int offset, IndexedTriFunction<? super P1, ? super P2, ? super P3, ? extends O> function, P1 p1, P2 p2, P3 p3, BiFunction<? super O, ? super O, ? extends O> reduce, O initialValue)
     {
         NodeInfo info = nodeLookup.get(on);
         if (info == null)
-            return;
+            return initialValue;
         int[] a = supersetIndexes, b = info.supersetIndexes;
         int ai = 0, bi = 0;
         while (ai < a.length && bi < b.length)
         {
             if (a[ai] == b[bi])
             {
-                consumer.accept(ai, shards[a[ai]]);
+                O next = function.apply(offset + ai, p1, p2, p3);
+                initialValue = reduce.apply(initialValue, next);
                 ++ai; ++bi;
             }
             else if (a[ai] < b[bi])
@@ -321,6 +328,7 @@ public class Topology
                 if (bi < 0) bi = -1 -bi;
             }
         }
+        return initialValue;
     }
 
     public int matchesOn(Id on, IndexedPredicate<Shard> consumer)
@@ -354,7 +362,7 @@ public class Topology
         return count;
     }
 
-    public int foldlIntOn(Id on, IndexedIntFunction<Shard> consumer, int offset, int initialValue, int terminalValue)
+    public <P> int foldlIntOn(Id on, IndexedIntFunction<P> consumer, P param, int offset, int initialValue, int terminalValue)
     {
         // TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
         NodeInfo info = nodeLookup.get(on);
@@ -366,7 +374,7 @@ public class Topology
         {
             if (a[ai] == b[bi])
             {
-                initialValue = consumer.apply(offset + ai, shards[a[ai]], initialValue);
+                initialValue = consumer.apply(offset + ai, param, initialValue);
                 if (terminalValue == initialValue)
                     return terminalValue;
                 ++ai; ++bi;
@@ -448,19 +456,6 @@ public class Topology
         return ranges;
     }
 
-    // TODO: use SortedArrays impls
-    private static boolean intersects(int[] is, int[] js)
-    {
-        for (int i = 0, j = 0 ; i < is.length && j < js.length ;)
-        {
-            int c = is[i] - js[j];
-            if (c < 0) ++i;
-            else if (c > 0) ++j;
-            else return true;
-        }
-        return false;
-    }
-
     public Shard[] unsafeGetShards()
     {
         return shards;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 9ffce88..f33cc08 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -20,6 +20,7 @@ package accord.topology;
 
 import accord.api.ConfigurationService;
 import accord.api.RoutingKey;
+import accord.api.TopologySorter;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.local.Node.Id;
 import accord.messages.EpochRequest;
@@ -37,7 +38,8 @@ import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 
 import java.util.*;
-import java.util.function.LongConsumer;
+
+import static accord.coordinate.tracking.RequestStatus.Success;
 
 /**
  * Manages topology state changes and update bookkeeping
@@ -63,12 +65,12 @@ public class TopologyManager implements ConfigurationService.Listener
         private boolean syncComplete = false;
         private boolean prevSynced;
 
-        EpochState(Id node, Topology global, boolean prevSynced)
+        EpochState(Id node, Topology global, TopologySorter sorter, boolean prevSynced)
         {
             this.global = global;
             this.local = global.forNode(node).trim();
             Preconditions.checkArgument(!global().isSubset());
-            this.syncTracker = new QuorumTracker(new Single(global(), false));
+            this.syncTracker = new QuorumTracker(new Single(sorter, global()));
             this.prevSynced = prevSynced;
         }
 
@@ -79,7 +81,7 @@ public class TopologyManager implements ConfigurationService.Listener
 
         public void recordSyncComplete(Id node)
         {
-            syncComplete = syncTracker.success(node);
+            syncComplete = syncTracker.recordSuccess(node) == Success;
         }
 
         Topology global()
@@ -114,7 +116,7 @@ public class TopologyManager implements ConfigurationService.Listener
             Boolean result = global().foldl(keys, (i, shard, acc) -> {
                 if (acc == Boolean.FALSE)
                     return acc;
-                return Boolean.valueOf(syncTracker.unsafeGet(i).hasReachedQuorum());
+                return syncTracker.unsafeGet(i).hasReachedQuorum();
             }, Boolean.TRUE);
             return result == Boolean.TRUE;
         }
@@ -125,7 +127,7 @@ public class TopologyManager implements ConfigurationService.Listener
         }
     }
 
-    private class Epochs
+    private static class Epochs
     {
         private final long currentEpoch;
         private final EpochState[] epochs;
@@ -220,11 +222,13 @@ public class TopologyManager implements ConfigurationService.Listener
         }
     }
 
+    private final TopologySorter.Supplier sorter;
     private final Id node;
     private volatile Epochs epochs;
 
-    public TopologyManager(Id node)
+    public TopologyManager(TopologySorter.Supplier sorter, Id node)
     {
+        this.sorter = sorter;
         this.node = node;
         this.epochs = new Epochs(new EpochState[0]);
     }
@@ -247,7 +251,7 @@ public class TopologyManager implements ConfigurationService.Listener
         System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length);
 
         boolean prevSynced = current.epochs.length == 0 || current.epochs[0].syncComplete();
-        nextEpochs[0] = new EpochState(node, topology, prevSynced);
+        nextEpochs[0] = new EpochState(node, topology, sorter.get(topology), prevSynced);
 
         List<AsyncPromise<Void>> futureEpochFutures = new ArrayList<>(current.futureEpochFutures);
         AsyncPromise<Void> toComplete = !futureEpochFutures.isEmpty() ? futureEpochFutures.remove(0) : null;
@@ -267,6 +271,11 @@ public class TopologyManager implements ConfigurationService.Listener
         epochs.syncComplete(node, epoch);
     }
 
+    public TopologySorter.Supplier sorter()
+    {
+        return sorter;
+    }
+
     public Topology current()
     {
         return epochs.current();
@@ -292,7 +301,7 @@ public class TopologyManager implements ConfigurationService.Listener
 
         EpochState maxEpochState = snapshot.get(maxEpoch);
         if (minEpoch == maxEpoch && !snapshot.requiresHistoricalTopologiesFor(keys, maxEpoch))
-            return new Single(maxEpochState.global.forKeys(keys), true);
+            return new Single(sorter, maxEpochState.global.forKeys(keys));
 
         int start = (int)(snapshot.currentEpoch - maxEpoch);
         int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length));
@@ -315,7 +324,7 @@ public class TopologyManager implements ConfigurationService.Listener
                 epochState.global.visitNodeForKeysOnceOrMore(keys, (i1, i2) -> true, nodes::add);
         }
 
-        Topologies.Multi topologies = new Topologies.Multi(count);
+        Topologies.Multi topologies = new Topologies.Multi(sorter, count);
         for (int i = start; i < limit ; ++i)
         {
             EpochState epochState = snapshot.epochs[i];
@@ -338,14 +347,14 @@ public class TopologyManager implements ConfigurationService.Listener
         Epochs snapshot = epochs;
 
         if (minEpoch == maxEpoch)
-            return new Single(snapshot.get(minEpoch).global.forKeys(keys), true);
+            return new Single(sorter, snapshot.get(minEpoch).global.forKeys(keys));
 
         Set<Id> nodes = new LinkedHashSet<>();
         int count = (int)(1 + maxEpoch - minEpoch);
         for (int i = count - 1 ; i >= 0 ; --i)
             snapshot.get(minEpoch + i).global().visitNodeForKeysOnceOrMore(keys, (i1, i2) -> true, nodes::add);
 
-        Topologies.Multi topologies = new Topologies.Multi(count);
+        Topologies.Multi topologies = new Topologies.Multi(sorter, count);
         for (int i = count - 1 ; i >= 0 ; --i)
             topologies.add(snapshot.get(minEpoch + i).global.forKeys(keys, nodes));
 
@@ -355,7 +364,7 @@ public class TopologyManager implements ConfigurationService.Listener
     public Topologies forEpoch(AbstractKeys<?, ?> keys, long epoch)
     {
         EpochState state = epochs.get(epoch);
-        return new Single(state.global.forKeys(keys), true);
+        return new Single(sorter, state.global.forKeys(keys));
     }
 
     public Shard forEpochIfKnown(RoutingKey key, long epoch)
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/utils/IndexedFunction.java
similarity index 50%
copy from accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to accord-core/src/main/java/accord/utils/IndexedFunction.java
index 428c340..e4d662c 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFunction.java
@@ -16,29 +16,9 @@
  * limitations under the License.
  */
 
-package accord.coordinate.tracking;
+package accord.utils;
 
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
-import accord.local.Node;
-import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
-
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
+public interface IndexedFunction<T, R>
 {
-    public QuorumTracker(Topologies topologies)
-    {
-        super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
-    }
-
-    public QuorumTracker(Topology topology)
-    {
-        super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
-    }
-
-    // return true iff hasReachedQuorum()
-    public boolean success(Node.Id node)
-    {
-        return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
-    }
+    R apply(int i, T t);
 }
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
similarity index 50%
copy from accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to accord-core/src/main/java/accord/utils/IndexedTriFunction.java
index 428c340..3c722d5 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
@@ -16,29 +16,9 @@
  * limitations under the License.
  */
 
-package accord.coordinate.tracking;
+package accord.utils;
 
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
-import accord.local.Node;
-import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
-
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
+public interface IndexedTriFunction<I1, I2, I3, O>
 {
-    public QuorumTracker(Topologies topologies)
-    {
-        super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
-    }
-
-    public QuorumTracker(Topology topology)
-    {
-        super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
-    }
-
-    // return true iff hasReachedQuorum()
-    public boolean success(Node.Id node)
-    {
-        return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
-    }
+    O apply(int i0, I1 i1, I2 i2, I3 i3);
 }
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 34c9ba3..94ba547 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -18,6 +18,8 @@
 
 package accord;
 
+import accord.api.TopologySorter;
+import accord.impl.SizeOfIntersectionSorter;
 import accord.primitives.KeyRange;
 import accord.local.Node;
 import accord.impl.mock.MockStore;
@@ -109,6 +111,6 @@ public class Utils
 
     public static Topologies topologies(Topology... topologies)
     {
-        return new Topologies.Multi(topologies);
+        return new Topologies.Multi(SizeOfIntersectionSorter.SUPPLIER, topologies);
     }
 }
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index 1fa99ab..cfe5641 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -268,7 +268,7 @@ public class BurnTest
     public static void main(String[] args) throws Exception
     {
 //        Long overrideSeed = null;
-        Long overrideSeed = 1727794989115080196L;
+        Long overrideSeed = -5707319446834273528L;
         do
         {
             run(overrideSeed != null ? overrideSeed : ThreadLocalRandom.current().nextLong());
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index 9fabfee..95e4656 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -44,7 +44,7 @@ public class CoordinateTest
             Node node = cluster.get(1);
             Assertions.assertNotNull(node);
 
-            TxnId txnId = new TxnId(1, 100, 0, node.id());
+            TxnId txnId = node.nextTxnId();
             Keys keys = keys(10);
             Txn txn = writeTxn(keys);
             Route route = keys.toRoute(keys.get(0).toRoutingKey());
@@ -71,7 +71,8 @@ public class CoordinateTest
 
     private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
     {
-        TxnId txnId = new TxnId(1, clock, 0, node.id());
+        TxnId txnId = node.nextTxnId();
+        txnId = new TxnId(txnId.epoch, txnId.real + clock, 0, txnId.node);
         Txn txn = writeTxn(keys);
         Result result = Coordinate.coordinate(node, txnId, txn, node.computeRoute(txnId, txn.keys())).get();
         Assertions.assertEquals(MockStore.RESULT, result);
@@ -134,7 +135,7 @@ public class CoordinateTest
             Node node = cluster.get(1);
             Assertions.assertNotNull(node);
 
-            TxnId txnId = new TxnId(1, 100, 0, node.id());
+            TxnId txnId = node.nextTxnId();
             Keys oneKey = keys(10);
             Keys twoKeys = keys(10, 20);
             Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), MockStore.QUERY, MockStore.update(twoKeys));
diff --git a/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java b/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java
index b6b9b32..d175008 100644
--- a/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java
@@ -41,14 +41,14 @@ public class PreAcceptTrackerTest
         [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
          */
 
-    private static void assertResponseState(FastPathTracker<?> responses,
+    private static void assertResponseState(FastPathTracker responses,
                                             boolean quorumReached,
                                             boolean fastPathAccepted,
                                             boolean failed,
                                             boolean hasOutstandingResponses)
     {
         Assertions.assertEquals(quorumReached, responses.hasReachedQuorum());
-        Assertions.assertEquals(fastPathAccepted, responses.hasMetFastPathCriteria());
+        Assertions.assertEquals(fastPathAccepted, responses.hasFastPathAccepted());
         Assertions.assertEquals(failed, responses.hasFailed());
         Assertions.assertEquals(hasOutstandingResponses, responses.hasInFlight());
     }
@@ -57,7 +57,7 @@ public class PreAcceptTrackerTest
     void singleShard()
     {
         Topology subTopology = topology(topology.get(0));
-        FastPathTracker responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+        FastPathTracker responses = new FastPathTracker(topologies(subTopology));
 
         responses.recordSuccess(ids[0], false);
         assertResponseState(responses, false, false, false, true);
@@ -73,7 +73,7 @@ public class PreAcceptTrackerTest
     void singleShardFastPath()
     {
         Topology subTopology = topology(topology.get(0));
-        FastPathTracker responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+        FastPathTracker responses = new FastPathTracker(topologies(subTopology));
 
         responses.recordSuccess(ids[0], true);
         assertResponseState(responses, false, false, false, true);
@@ -92,7 +92,7 @@ public class PreAcceptTrackerTest
     void unexpectedResponsesAreIgnored()
     {
         Topology subTopology = topology(topology.get(0));
-        FastPathTracker responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+        FastPathTracker responses = new FastPathTracker(topologies(subTopology));
 
         responses.recordSuccess(ids[0], false);
         assertResponseState(responses, false, false, false, true);
@@ -109,15 +109,15 @@ public class PreAcceptTrackerTest
     void failure()
     {
         Topology subTopology = topology(topology.get(0));
-        FastPathTracker<?> responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+        FastPathTracker responses = new FastPathTracker(topologies(subTopology));
 
         responses.recordSuccess(ids[0], true);
         assertResponseState(responses, false, false, false, true);
 
-        responses.failure(ids[1]);
+        responses.recordFailure(ids[1]);
         assertResponseState(responses, false, false, false, true);
 
-        responses.failure(ids[2]);
+        responses.recordFailure(ids[2]);
         assertResponseState(responses, false, false, true, false);
     }
 
@@ -125,7 +125,7 @@ public class PreAcceptTrackerTest
     void multiShard()
     {
         Topology subTopology = new Topology(1, new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
-        FastPathTracker<Coordinate.ShardTracker> responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+        FastPathTracker responses = new FastPathTracker(topologies(subTopology));
         /*
         (000, 100](100, 200](200, 300]
         [1, 2, 3] [2, 3, 4] [3, 4, 5]
diff --git a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index bd14031..3298489 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -104,63 +104,4 @@ public class TopologyChangeTest
             });
         }
     }
-
-    @Test
-    void fastPathSkippedUntilSync() throws Throwable
-    {
-        Keys keys = keys(150);
-        KeyRange range = range(100, 200);
-        Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
-        Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
-        try (MockCluster cluster = MockCluster.builder().nodes(3)
-                                                        .messageSink(RecordingMessageSink::new)
-                                                        .topology(topology1).build())
-        {
-            Node node1 = cluster.get(1);
-            RecordingMessageSink messageSink = (RecordingMessageSink) node1.messageSink();
-            messageSink.clearHistory();
-            TxnId txnId1 = coordinate(node1, keys);
-            node1.commandStores().forEach(empty(), keys, 1, 1, commands -> {
-                Command command = commands.command(txnId1);
-                Assertions.assertTrue(command.partialDeps().isEmpty());
-            }).awaitUninterruptibly();
-
-            // check there was no accept phase
-            Assertions.assertFalse(new ArrayList<>(messageSink.requests).stream().anyMatch(env -> env.payload instanceof Accept));
-
-            cluster.configServices(1, 2, 3).forEach(config -> config.reportTopology(topology2));
-            messageSink.clearHistory();
-
-            // post epoch change, there _should_ be accepts, but with the original timestamp
-            TxnId txnId2 = coordinate(node1, keys);
-            Set<Node.Id> accepts = new ArrayList<>(messageSink.requests).stream()
-                    .filter(env -> env.payload instanceof Accept).map(env -> {
-                        Accept accept = (Accept) env.payload;
-                        Assertions.assertEquals(txnId2, accept.txnId);
-                        return env.to;
-            }).collect(Collectors.toSet());
-            Assertions.assertEquals(idSet(1, 2, 3), accepts);
-
-            node1.commandStores().forEach(empty(), keys, 2, 2, commands -> {
-                Command command = commands.command(txnId2);
-                Assertions.assertTrue(command.hasBeen(Status.Committed));
-                Assertions.assertTrue(command.partialDeps().contains(txnId1));
-                Assertions.assertEquals(txnId2, command.executeAt());
-            }).awaitUninterruptibly();
-
-            EpochSync.sync(cluster, 1);
-
-            // post sync, fast path should be working again, and there should be no accept phase
-            messageSink.clearHistory();
-            TxnId txnId3 = coordinate(node1, keys);
-            Assertions.assertFalse(new ArrayList<>(messageSink.requests).stream().anyMatch(env -> env.payload instanceof Accept));
-            node1.commandStores().forEach(empty(), keys, 2, 2, commands -> {
-                Command command = commands.command(txnId3);
-                Assertions.assertTrue(command.hasBeen(Status.Committed));
-                Assertions.assertTrue(command.partialDeps().contains(txnId1));
-                Assertions.assertTrue(command.partialDeps().contains(txnId2));
-                Assertions.assertEquals(txnId3, command.executeAt());
-            }).awaitUninterruptibly();
-        }
-    }
 }
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/FastPathTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/FastPathTrackerReconciler.java
new file mode 100644
index 0000000..905dd6d
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/FastPathTrackerReconciler.java
@@ -0,0 +1,59 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.FastPathTracker.FastPathShardTracker;
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.local.Node;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+public class FastPathTrackerReconciler extends TrackerReconciler<FastPathShardTracker, FastPathTracker, FastPathTrackerReconciler.Rsp>
+{
+    enum Rsp { FAST, SLOW, FAIL }
+
+    FastPathTrackerReconciler(Random random, Topologies topologies)
+    {
+        this(random, new FastPathTracker(topologies));
+    }
+
+    private FastPathTrackerReconciler(Random random, FastPathTracker tracker)
+    {
+        super(random, Rsp.class, tracker, new ArrayList<>(tracker.nodes()));
+    }
+
+    @Override
+    RequestStatus invoke(Rsp event, FastPathTracker tracker, Node.Id from)
+    {
+        switch (event)
+        {
+            default: throw new AssertionError();
+            case FAST: inflight.remove(from); return tracker.recordSuccess(from, true);
+            case SLOW: inflight.remove(from); return tracker.recordSuccess(from, false);
+            case FAIL: inflight.remove(from); return tracker.recordFailure(from);
+        }
+    }
+
+    @Override
+    void validate(RequestStatus status)
+    {
+        switch (status)
+        {
+            case Failed:
+                Assertions.assertTrue(tracker.any(ShardTracker::hasFailed));
+                Assertions.assertFalse(tracker.all(FastPathShardTracker::hasReachedQuorum));
+                break;
+
+            case Success:
+                Assertions.assertTrue(tracker.all(FastPathShardTracker::hasReachedQuorum));
+                Assertions.assertTrue(tracker.all(shard -> shard.hasRejectedFastPath() || shard.hasMetFastPathCriteria()));
+                Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+                break;
+
+            case NoChange:
+                Assertions.assertFalse(tracker.all(shard -> shard.hasRejectedFastPath() || shard.hasMetFastPathCriteria()) && tracker.all(FastPathShardTracker::hasReachedQuorum));
+                Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+        }
+    }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerReconciler.java
new file mode 100644
index 0000000..e94b385
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerReconciler.java
@@ -0,0 +1,56 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.local.Node;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+public class QuorumTrackerReconciler extends TrackerReconciler<QuorumShardTracker, QuorumTracker, QuorumTrackerReconciler.Rsp>
+{
+    enum Rsp { QUORUM, FAIL }
+
+    QuorumTrackerReconciler(Random random, Topologies topologies)
+    {
+        this(random, new QuorumTracker(topologies));
+    }
+
+    private QuorumTrackerReconciler(Random random, QuorumTracker tracker)
+    {
+        super(random, Rsp.class, tracker, new ArrayList<>(tracker.nodes()));
+    }
+
+    @Override
+    RequestStatus invoke(Rsp event, QuorumTracker tracker, Node.Id from)
+    {
+        switch (event)
+        {
+            default: throw new AssertionError();
+            case QUORUM: inflight.remove(from); return tracker.recordSuccess(from);
+            case FAIL:   inflight.remove(from); return tracker.recordFailure(from);
+        }
+    }
+
+    @Override
+    void validate(RequestStatus status)
+    {
+        switch (status)
+        {
+            case Failed:
+                Assertions.assertTrue(tracker.any(ShardTracker::hasFailed));
+                Assertions.assertFalse(tracker.all(QuorumShardTracker::hasReachedQuorum));
+                break;
+
+            case Success:
+                Assertions.assertTrue(tracker.all(QuorumShardTracker::hasReachedQuorum));
+                Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+                break;
+
+            case NoChange:
+                Assertions.assertFalse(tracker.all(QuorumShardTracker::hasReachedQuorum));
+                Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+        }
+    }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
index 9014b92..6aab3a9 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
@@ -58,13 +58,13 @@ public class QuorumTrackerTest
         Topology subTopology = topology(topology.get(0));
         QuorumTracker responses = new QuorumTracker(topologies(subTopology));
 
-        responses.success(ids[0]);
+        responses.recordSuccess(ids[0]);
         assertResponseState(responses, false, false, true);
 
-        responses.success(ids[1]);
+        responses.recordSuccess(ids[1]);
         assertResponseState(responses, true, false, true);
 
-        responses.success(ids[2]);
+        responses.recordSuccess(ids[2]);
         assertResponseState(responses, true, false, false);
     }
 
@@ -77,14 +77,14 @@ public class QuorumTrackerTest
         Topology subTopology = topology(topology.get(0));
         QuorumTracker responses = new QuorumTracker(topologies(subTopology));
 
-        responses.success(ids[0]);
+        responses.recordSuccess(ids[0]);
         assertResponseState(responses, false, false, true);
 
-        responses.success(ids[1]);
+        responses.recordSuccess(ids[1]);
         assertResponseState(responses, true, false, true);
 
         Assertions.assertFalse(subTopology.get(0).nodes.contains(ids[4]));
-        responses.success(ids[4]);
+        responses.recordSuccess(ids[4]);
         assertResponseState(responses, true, false, true);
     }
 
@@ -94,13 +94,13 @@ public class QuorumTrackerTest
         Topology subTopology = topology(topology.get(0));
         QuorumTracker responses = new QuorumTracker(topologies(subTopology));
 
-        responses.success(ids[0]);
+        responses.recordSuccess(ids[0]);
         assertResponseState(responses, false, false, true);
 
-        responses.failure(ids[1]);
+        responses.recordFailure(ids[1]);
         assertResponseState(responses, false, false, true);
 
-        responses.failure(ids[2]);
+        responses.recordFailure(ids[2]);
         assertResponseState(responses, false, true, false);
     }
 
@@ -118,11 +118,11 @@ public class QuorumTrackerTest
         Assertions.assertSame(subTopology.get(1), responses.unsafeGet(1).shard);
         Assertions.assertSame(subTopology.get(2), responses.unsafeGet(2).shard);
 
-        responses.success(ids[1]);
+        responses.recordSuccess(ids[1]);
         assertResponseState(responses, false, false, true);
-        responses.success(ids[2]);
+        responses.recordSuccess(ids[2]);
         assertResponseState(responses, false, false, true);
-        responses.success(ids[3]);
+        responses.recordSuccess(ids[3]);
         assertResponseState(responses, true, false, true);
     }
 
@@ -135,14 +135,14 @@ public class QuorumTrackerTest
 
         QuorumTracker responses = new QuorumTracker(topologies(topology2, topology1));
 
-        responses.success(id(1));
+        responses.recordSuccess(id(1));
         assertResponseState(responses, false, false, true);
-        responses.success(id(2));
+        responses.recordSuccess(id(2));
         assertResponseState(responses, false, false, true);
 
-        responses.success(id(4));
+        responses.recordSuccess(id(4));
         assertResponseState(responses, false, false, true);
-        responses.success(id(5));
+        responses.recordSuccess(id(5));
         assertResponseState(responses, true, false, true);
     }
 
@@ -155,14 +155,14 @@ public class QuorumTrackerTest
 
         QuorumTracker responses = new QuorumTracker(topologies(topology2, topology1));
 
-        responses.success(id(1));
+        responses.recordSuccess(id(1));
         assertResponseState(responses, false, false, true);
-        responses.success(id(2));
+        responses.recordSuccess(id(2));
         assertResponseState(responses, false, false, true);
 
-        responses.failure(id(4));
+        responses.recordFailure(id(4));
         assertResponseState(responses, false, false, true);
-        responses.failure(id(5));
+        responses.recordFailure(id(5));
         assertResponseState(responses, false, true, true);
     }
 }
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerReconciler.java
new file mode 100644
index 0000000..72bfb71
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerReconciler.java
@@ -0,0 +1,84 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Stream;
+
+public class ReadTrackerReconciler extends TrackerReconciler<ReadShardTracker, ReadTracker, ReadTrackerReconciler.Rsp>
+{
+    enum Rsp { DATA, QUORUM, SLOW, FAIL }
+
+    static class InFlightCapturingReadTracker extends ReadTracker
+    {
+        final List<Node.Id> inflight = new ArrayList<>();
+        public InFlightCapturingReadTracker(Topologies topologies)
+        {
+            super(topologies);
+        }
+
+        @Override
+        protected RequestStatus trySendMore()
+        {
+            return super.trySendMore(List::add, inflight);
+        }
+    }
+
+    ReadTrackerReconciler(Random random, Topologies topologies)
+    {
+        this(random, new InFlightCapturingReadTracker(topologies));
+    }
+
+    private ReadTrackerReconciler(Random random, InFlightCapturingReadTracker tracker)
+    {
+        super(random, Rsp.class, tracker, tracker.inflight);
+    }
+
+    @Override
+    void test()
+    {
+        tracker.trySendMore();
+        super.test();
+    }
+
+    @Override
+    RequestStatus invoke(Rsp event, ReadTracker tracker, Node.Id from)
+    {
+        switch (event)
+        {
+            default: throw new AssertionError();
+            case DATA:   inflight.remove(from); return tracker.recordReadSuccess(from);
+            case QUORUM: inflight.remove(from); return tracker.recordQuorumReadSuccess(from);
+            case FAIL:   inflight.remove(from); return tracker.recordReadFailure(from);
+            case SLOW:   return tracker.recordSlowResponse(from);
+        }
+    }
+
+    @Override
+    void validate(RequestStatus status)
+    {
+        switch (status)
+        {
+            case Failed:
+                Assertions.assertTrue(tracker.any(ReadShardTracker::hasFailed));
+                Assertions.assertFalse(tracker.all(ReadShardTracker::hasSucceeded));
+                break;
+
+            case Success:
+                Assertions.assertTrue(tracker.all(ReadShardTracker::hasSucceeded));
+                Assertions.assertFalse(tracker.any(ReadShardTracker::hasFailed));
+                break;
+
+            case NoChange:
+                Assertions.assertFalse(tracker.all(ReadShardTracker::hasSucceeded));
+                Assertions.assertFalse(tracker.any(ReadShardTracker::hasFailed));
+        }
+    }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
index a65a64b..96a9a68 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
@@ -19,22 +19,24 @@
 package accord.coordinate.tracking;
 
 import accord.impl.TopologyUtils;
-import accord.local.Node;
+import accord.local.Node.Id;
 import accord.primitives.KeyRanges;
 import accord.topology.Shard;
+import accord.topology.Topologies;
 import accord.topology.Topology;
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import static accord.Utils.*;
 import static accord.utils.Utils.toArray;
 
 public class ReadTrackerTest
 {
-    private static final Node.Id[] ids = toArray(ids(5), Node.Id[]::new);
+    private static final Id[] ids = toArray(ids(5), Id[]::new);
     private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
     private static final Topology topology = TopologyUtils.initialTopology(ids, ranges, 3);
         /*
@@ -42,11 +44,39 @@ public class ReadTrackerTest
         [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
          */
 
+    static class AutoReadTracker extends ReadTracker
+    {
+        public AutoReadTracker(Topologies topologies)
+        {
+            super(topologies);
+        }
+
+        @Override
+        public RequestStatus trySendMore()
+        {
+            return super.trySendMore(Set::add, inflight);
+        }
+    }
+
+    static class TestReadTracker extends ReadTracker
+    {
+        public TestReadTracker(Topologies topologies)
+        {
+            super(topologies);
+        }
+
+        @Override
+        public RequestStatus trySendMore()
+        {
+            return RequestStatus.NoChange;
+        }
+    }
+
     private static void assertResponseState(ReadTracker responses,
                                             boolean complete,
                                             boolean failed)
     {
-        Assertions.assertEquals(complete, responses.hasCompletedRead());
+        Assertions.assertEquals(complete, responses.hasData());
         Assertions.assertEquals(failed, responses.hasFailed());
     }
 
@@ -54,9 +84,9 @@ public class ReadTrackerTest
     void singleShard()
     {
         Topology subTopology = topology(topology.get(0));
-        ReadTracker tracker = ReadTracker.create(topologies(subTopology));
+        ReadTracker tracker = new ReadTracker(topologies(subTopology));
 
-        tracker.recordInflightRead(ids[0]);
+        tracker.recordInFlightRead(ids[0]);
         assertResponseState(tracker, false, false);
 
         tracker.recordReadSuccess(ids[0]);
@@ -67,15 +97,15 @@ public class ReadTrackerTest
     void singleShardRetry()
     {
         Topology subTopology = topology(topology.get(0));
-        ReadTracker tracker = ReadTracker.create(topologies(subTopology));
+        ReadTracker tracker = new AutoReadTracker(topologies(subTopology));
 
-        tracker.recordInflightRead(ids[0]);
+        tracker.recordInFlightRead(ids[0]);
         assertResponseState(tracker, false, false);
 
         tracker.recordReadFailure(ids[0]);
         assertResponseState(tracker, false, false);
 
-        tracker.recordInflightRead(ids[1]);
+        tracker.recordInFlightRead(ids[1]);
         assertResponseState(tracker, false, false);
 
         tracker.recordReadSuccess(ids[1]);
@@ -86,17 +116,17 @@ public class ReadTrackerTest
     void singleShardFailure()
     {
         Topology subTopology = topology(topology.get(0));
-        ReadTracker tracker = ReadTracker.create(topologies(subTopology));
+        ReadTracker tracker = new TestReadTracker(topologies(subTopology));
 
-        tracker.recordInflightRead(ids[0]);
+        tracker.recordInFlightRead(ids[0]);
         tracker.recordReadFailure(ids[0]);
         assertResponseState(tracker, false, false);
 
-        tracker.recordInflightRead(ids[1]);
+        tracker.recordInFlightRead(ids[1]);
         tracker.recordReadFailure(ids[1]);
         assertResponseState(tracker, false, false);
 
-        tracker.recordInflightRead(ids[2]);
+        tracker.recordInFlightRead(ids[2]);
         tracker.recordReadFailure(ids[2]);
         assertResponseState(tracker, false, true);
     }
@@ -105,13 +135,13 @@ public class ReadTrackerTest
     void multiShardSuccess()
     {
         Topology subTopology = new Topology(1, new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
-        ReadTracker responses = ReadTracker.create(topologies(subTopology));
+        ReadTracker responses = new AutoReadTracker(topologies(subTopology));
         /*
         (000, 100](100, 200](200, 300]
         [1, 2, 3] [2, 3, 4] [3, 4, 5]
          */
 
-        responses.recordInflightRead(ids[2]);
+        responses.recordInFlightRead(ids[2]);
         responses.recordReadSuccess(ids[2]);
         assertResponseState(responses, true, false);
     }
@@ -120,30 +150,44 @@ public class ReadTrackerTest
     void multiShardRetryAndReadSet()
     {
         Topology subTopology = new Topology(1, new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
-        ReadTracker responses = ReadTracker.create(topologies(subTopology));
+        ReadTracker responses = new TestReadTracker(topologies(subTopology));
         /*
         (000, 100](100, 200](200, 300]
         [1, 2, 3] [2, 3, 4] [3, 4, 5]
          */
 
-        Assertions.assertEquals(Sets.newHashSet(ids[2]), responses.computeMinimalReadSetAndMarkInflight());
+        assertContacts(Sets.newHashSet(ids[2]), responses);
 
         assertResponseState(responses, false, false);
 
         responses.recordReadFailure(ids[2]);
         assertResponseState(responses, false, false);
 
-        Assertions.assertEquals(Sets.newHashSet(ids[1], ids[3]), responses.computeMinimalReadSetAndMarkInflight());
+        assertContacts(Sets.newHashSet(ids[1], ids[3]), responses);
         assertResponseState(responses, false, false);
 
         responses.recordReadFailure(ids[1]);
-        Assertions.assertEquals(Sets.newHashSet(ids[0]), responses.computeMinimalReadSetAndMarkInflight());
+        assertContacts(Sets.newHashSet(ids[0]), responses);
 
         responses.recordReadSuccess(ids[3]);
         assertResponseState(responses, false, false);
-        Assertions.assertEquals(Collections.emptySet(), responses.computeMinimalReadSetAndMarkInflight());
+        try
+        {
+            responses.trySendMore((i,j)->{}, null);
+            Assertions.fail();
+        }
+        catch (IllegalStateException t)
+        {
+        }
 
         responses.recordReadSuccess(ids[0]);
         assertResponseState(responses, true, false);
     }
+
+    private static void assertContacts(Set<Id> expect, ReadTracker tracker)
+    {
+        Set<Id> actual = new HashSet<>();
+        tracker.trySendMore(Set::add, actual);
+        Assertions.assertEquals(expect, actual);
+    }
 }
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/RecoveryTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/RecoveryTrackerReconciler.java
new file mode 100644
index 0000000..314c63f
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/RecoveryTrackerReconciler.java
@@ -0,0 +1,58 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.RecoveryTracker.RecoveryShardTracker;
+import accord.local.Node;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+// TODO: check fast path accounting
+public class RecoveryTrackerReconciler extends TrackerReconciler<RecoveryShardTracker, RecoveryTracker, RecoveryTrackerReconciler.Rsp>
+{
+    enum Rsp { FAST, SLOW, FAIL }
+
+    RecoveryTrackerReconciler(Random random, Topologies topologies)
+    {
+        this(random, new RecoveryTracker(topologies));
+    }
+
+    private RecoveryTrackerReconciler(Random random, RecoveryTracker tracker)
+    {
+        super(random, Rsp.class, tracker, new ArrayList<>(tracker.nodes()));
+    }
+
+    @Override
+    RequestStatus invoke(Rsp event, RecoveryTracker tracker, Node.Id from)
+    {
+        switch (event)
+        {
+            default: throw new AssertionError();
+            case FAST: inflight.remove(from); return tracker.recordSuccess(from, true);
+            case SLOW: inflight.remove(from); return tracker.recordSuccess(from, false);
+            case FAIL: inflight.remove(from); return tracker.recordFailure(from);
+        }
+    }
+
+    @Override
+    void validate(RequestStatus status)
+    {
+        switch (status)
+        {
+            case Failed:
+                Assertions.assertTrue(tracker.any(ShardTracker::hasFailed));
+                Assertions.assertFalse(tracker.all(RecoveryShardTracker::hasReachedQuorum));
+                break;
+
+            case Success:
+                Assertions.assertTrue(tracker.all(RecoveryShardTracker::hasReachedQuorum));
+                Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+                break;
+
+            case NoChange:
+                Assertions.assertFalse(tracker.all(RecoveryShardTracker::hasReachedQuorum));
+                Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+        }
+    }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
new file mode 100644
index 0000000..acbb3b3
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -0,0 +1,116 @@
+package accord.coordinate.tracking;
+
+import accord.burn.TopologyUpdates;
+import accord.impl.IntHashKey;
+import accord.impl.SizeOfIntersectionSorter;
+import accord.impl.TopologyFactory;
+import accord.local.Node.Id;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import accord.topology.TopologyRandomizer;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class TrackerReconciler<ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+{
+    final Random random;
+    final E[] events;
+    final EnumMap<E, Integer>[] counts;
+    final T tracker;
+    final List<Id> inflight;
+
+    protected TrackerReconciler(Random random, Class<E> events, T tracker, List<Id> inflight)
+    {
+        this.random = random;
+        this.events = events.getEnumConstants();
+        this.tracker = tracker;
+        this.inflight = inflight;
+        this.counts = new EnumMap[tracker.trackers.length];
+        for (int i = 0 ; i < counts.length ; ++i)
+        {
+            counts[i] = new EnumMap<>(events);
+            for (E event : this.events)
+                counts[i].put(event, 0);
+        }
+    }
+
+    Topologies topologies()
+    {
+        return tracker.topologies;
+    }
+
+    void test()
+    {
+        while (true)
+        {
+            Assertions.assertFalse(inflight.isEmpty());
+            E next = events[random.nextInt(events.length)];
+            Id from = inflight.get(random.nextInt(inflight.size()));
+            RequestStatus newStatus = invoke(next, tracker, from);
+            for (int i = 0 ; i < topologies().size() ; ++i)
+            {
+                topologies().get(i).forEachOn(from, (si, s) -> {
+                    counts[si].compute(next, (ignore, cur) -> cur + 1);
+                });
+            }
+
+            validate(newStatus);
+            if (newStatus != RequestStatus.NoChange)
+                break;
+        }
+    }
+
+    abstract RequestStatus invoke(E event, T tracker, Id from);
+    abstract void validate(RequestStatus status);
+
+    protected static <ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+    List<TrackerReconciler<ST, T, E>> generate(long seed, BiFunction<Random, Topologies, ? extends TrackerReconciler<ST, T, E>> constructor)
+    {
+        System.out.println("seed: " + seed);
+        Random random = new Random(seed);
+        return topologies(random).map(topologies -> constructor.apply(random, topologies))
+                .collect(Collectors.toList());
+    }
+
+    // TODO: generalise and parameterise topology generation a bit more
+    // TODO: select a subset of the generated topologies to correctly simulate topology consumption logic
+    private static Stream<Topologies> topologies(Random random)
+    {
+        TopologyFactory factory = new TopologyFactory(2 + random.nextInt(3), IntHashKey.ranges(4 + random.nextInt(12)));
+        List<Id> nodes = cluster(factory.rf * (1 + random.nextInt(factory.shardRanges.length - 1)));
+        Topology topology = factory.toTopology(nodes);
+        int count = 1 + random.nextInt(3);
+
+        List<Topologies> result = new ArrayList<>();
+        result.add(new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology));
+
+        if (count == 1)
+            return result.stream();
+
+        Deque<Topology> topologies = new ArrayDeque<>();
+        topologies.add(topology);
+        TopologyUpdates topologyUpdates = new TopologyUpdates();
+        TopologyRandomizer configRandomizer = new TopologyRandomizer(() -> random, topology, topologyUpdates, (id, top) -> {});
+        while (--count > 0)
+        {
+            Topology next = configRandomizer.updateTopology();
+            while (next == null)
+                next = configRandomizer.updateTopology();
+            topologies.addFirst(next);
+            result.add(new Topologies.Multi(SizeOfIntersectionSorter.SUPPLIER, topologies.toArray(new Topology[0])));
+        }
+        return result.stream();
+    }
+
+    private static List<Id> cluster(int count)
+    {
+        List<Id> cluster = new ArrayList<>();
+        for (int i = 1 ; i <= count ; ++i)
+            cluster.add(new Id(i));
+        return cluster;
+    }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconcilerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconcilerTest.java
new file mode 100644
index 0000000..25ed28a
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconcilerTest.java
@@ -0,0 +1,49 @@
+package accord.coordinate.tracking;
+
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+import java.util.function.BiFunction;
+
+public class TrackerReconcilerTest
+{
+    @Test
+    public void testReadTracker()
+    {
+        test(10000, ReadTrackerReconciler::new);
+    }
+
+    @Test
+    public void testQuorumTracker()
+    {
+        test(10000, QuorumTrackerReconciler::new);
+    }
+    
+    @Test
+    public void testFastPathTracker()
+    {
+        test(10000, FastPathTrackerReconciler::new);
+    }
+
+    @Test
+    public void testRecoveryTracker()
+    {
+        test(10000, RecoveryTrackerReconciler::new);
+    }
+
+    static <ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+    void test(int count, BiFunction<Random, Topologies, ? extends TrackerReconciler<ST, T, E>> constructor)
+    {
+        long seed = System.currentTimeMillis();
+        while (--count >= 0)
+            test(seed++, constructor);
+    }
+
+    static <ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+    void test(long seed, BiFunction<Random, Topologies, ? extends TrackerReconciler<ST, T, E>> constructor)
+    {
+        for (TrackerReconciler<?, ?, ?> test : TrackerReconciler.generate(seed, constructor))
+            test.test();
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/test/java/accord/impl/TopologyFactory.java
index 3dd450d..d52728b 100644
--- a/accord-core/src/test/java/accord/impl/TopologyFactory.java
+++ b/accord-core/src/test/java/accord/impl/TopologyFactory.java
@@ -32,17 +32,17 @@ public class TopologyFactory
 {
     public final int rf;
     // TODO: convert to KeyRanges
-    final KeyRange[] ranges;
+    public final KeyRange[] shardRanges;
 
-    public TopologyFactory(int rf, KeyRange... ranges)
+    public TopologyFactory(int rf, KeyRange... shardRanges)
     {
         this.rf = rf;
-        this.ranges = ranges;
+        this.shardRanges = shardRanges;
     }
 
     public Topology toTopology(Node.Id[] cluster)
     {
-        return TopologyUtils.initialTopology(cluster, KeyRanges.ofSortedAndDeoverlapped(ranges), rf);
+        return TopologyUtils.initialTopology(cluster, KeyRanges.ofSortedAndDeoverlapped(shardRanges), rf);
     }
 
     public Topology toTopology(List<Node.Id> cluster)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index a1f215e..1ea9210 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -40,6 +40,7 @@ import accord.burn.BurnTestConfigurationService;
 import accord.burn.TopologyUpdates;
 import accord.impl.SimpleProgressLog;
 import accord.impl.InMemoryCommandStores;
+import accord.impl.SizeOfIntersectionSorter;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
@@ -221,7 +222,8 @@ public class Cluster implements Scheduler
                 BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get, topologyUpdates);
                 lookup.put(node, new Node(node, messageSink, configService,
                                           nowSupplier.get(), () -> new ListStore(node), new ListAgent(30L, onFailure),
-                                          randomSupplier.get(), sinks, SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
+                                          randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
+                                          SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
             }
 
             List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index 1d1c443..c33bf58 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -61,14 +61,14 @@ public class ListRequest implements Request
         }
 
         @Override
-        protected Action check(Id from, CheckStatusOk ok)
+        protected Action checkSufficient(Id from, CheckStatusOk ok)
         {
             ++count;
-            return ok.saveStatus.hasBeen(PreApplied) ? Action.Accept : Action.Reject;
+            return ok.saveStatus.hasBeen(PreApplied) ? Action.Approve : Action.Reject;
         }
 
         @Override
-        protected void onDone(Done done, Throwable failure)
+        protected void onDone(CheckShards.Success done, Throwable failure)
         {
             if (failure != null) callback.accept(null, failure);
             else if (merged.saveStatus.hasBeen(Status.Invalidated)) callback.accept(Outcome.Invalidated, null);
@@ -77,7 +77,7 @@ public class ListRequest implements Request
         }
 
         @Override
-        protected boolean isSufficient(Id from, CheckStatusOk ok)
+        protected boolean isSufficient(CheckStatusOk ok)
         {
             throw new UnsupportedOperationException();
         }
diff --git a/accord-core/src/test/java/accord/impl/mock/EpochSync.java b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
index 276ff3b..ef05c0f 100644
--- a/accord-core/src/test/java/accord/impl/mock/EpochSync.java
+++ b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
@@ -100,14 +100,14 @@ public class EpochSync implements Runnable
 
         public CommandSync(Node node, AbstractRoute route, SyncCommitted message, Topology topology)
         {
-            this.tracker = new QuorumTracker(new Single(topology.forKeys(route), false));
+            this.tracker = new QuorumTracker(new Single(node.topology().sorter(), topology.forKeys(route)));
             node.send(tracker.nodes(), message, this);
         }
 
         @Override
         public synchronized void onSuccess(Node.Id from, SimpleReply reply)
         {
-            tracker.success(from);
+            tracker.recordSuccess(from);
             if (tracker.hasReachedQuorum())
                 trySuccess(null);
         }
@@ -115,7 +115,7 @@ public class EpochSync implements Runnable
         @Override
         public synchronized void onFailure(Node.Id from, Throwable failure)
         {
-            tracker.failure(from);
+            tracker.recordFailure(from);
             if (tracker.hasFailed())
                 tryFailure(failure);
         }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 544723d..996cd66 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -21,10 +21,7 @@ package accord.impl.mock;
 import accord.NetworkFilter;
 import accord.api.MessageSink;
 import accord.coordinate.Timeout;
-import accord.impl.InMemoryCommandStores;
-import accord.impl.TopologyUtils;
-import accord.local.CommandStores;
-import accord.impl.SimpleProgressLog;
+import accord.impl.*;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.primitives.KeyRanges;
@@ -34,7 +31,6 @@ import accord.primitives.TxnId;
 import accord.messages.Callback;
 import accord.messages.Reply;
 import accord.messages.Request;
-import accord.impl.TestAgent;
 import accord.topology.Topology;
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
@@ -111,6 +107,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node>
                         new TestAgent(),
                         new Random(random.nextLong()),
                         new ThreadPoolScheduler(),
+                        SizeOfIntersectionSorter.SUPPLIER,
                         SimpleProgressLog::new,
                         InMemoryCommandStores.SingleThread::new);
     }
diff --git a/accord-core/src/test/java/accord/local/CommandTest.java b/accord-core/src/test/java/accord/local/CommandTest.java
index cc3efe8..0b55caf 100644
--- a/accord-core/src/test/java/accord/local/CommandTest.java
+++ b/accord-core/src/test/java/accord/local/CommandTest.java
@@ -146,7 +146,7 @@ public class CommandTest
     {
         return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
                         new MockCluster.Clock(100), () -> storeSupport.data, new TestAgent(), new Random(), null,
-                        ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
+                        SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
     }
 
     @Test
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 4ade147..0955bf7 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -68,6 +68,7 @@ public class PreAcceptTest
                         new TestAgent(),
                         new Random(),
                         scheduler,
+                        SizeOfIntersectionSorter.SUPPLIER,
                         SimpleProgressLog::new,
                         InMemoryCommandStores.Synchronized::new);
     }
diff --git a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
index 3c5e168..acbfa4e 100644
--- a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
+++ b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
@@ -18,6 +18,7 @@
 
 package accord.messages;
 
+import accord.api.TopologySorter;
 import accord.primitives.KeyRange;
 import accord.primitives.Route;
 import accord.topology.Topologies;
@@ -43,7 +44,7 @@ public class TxnRequestScopeTest
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(3, 4, 5), idSet(4, 5)));
 
-        Topologies.Multi topologies = new Topologies.Multi();
+        Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0);
         topologies.add(topology2);
         topologies.add(topology1);
 
@@ -74,7 +75,7 @@ public class TxnRequestScopeTest
                                       shard(range1, idList(4, 5, 6), idSet(4, 5)),
                                       shard(range2, idList(1, 2, 3), idSet(1, 2)) );
 
-        Topologies.Multi topologies = new Topologies.Multi();
+        Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0);
         topologies.add(topology2);
         topologies.add(topology1);
 
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index cb68d8c..58890e1 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -18,6 +18,8 @@
 
 package accord.topology;
 
+import accord.api.TopologySorter;
+import accord.impl.SizeOfIntersectionSorter;
 import accord.local.Node;
 import accord.primitives.KeyRange;
 import accord.primitives.Keys;
@@ -27,6 +29,7 @@ import org.junit.jupiter.api.Test;
 import static accord.Utils.*;
 import static accord.impl.IntKey.keys;
 import static accord.impl.IntKey.range;
+import static accord.impl.SizeOfIntersectionSorter.SUPPLIER;
 
 public class TopologyManagerTest
 {
@@ -39,7 +42,7 @@ public class TopologyManagerTest
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
 
-        TopologyManager service = new TopologyManager(ID);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
 
         Assertions.assertSame(Topology.EMPTY, service.current());
         service.onTopologyUpdate(topology1);
@@ -64,7 +67,7 @@ public class TopologyManagerTest
                                       shard(range(100, 200), idList(1, 2, 3), idSet(3, 4)),
                                       shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)));
 
-        TopologyManager service = new TopologyManager(ID);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1);
         service.onTopologyUpdate(topology2);
 
@@ -95,7 +98,7 @@ public class TopologyManagerTest
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
         Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(1, 2)));
 
-        TopologyManager service = new TopologyManager(ID);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1);
         service.onTopologyUpdate(topology2);
         service.onTopologyUpdate(topology3);
@@ -128,7 +131,7 @@ public class TopologyManagerTest
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
 
-        TopologyManager service = new TopologyManager(ID);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1);
 
         // sync epoch 2
@@ -147,7 +150,7 @@ public class TopologyManagerTest
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
 
-        TopologyManager service = new TopologyManager(ID);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
 
         Assertions.assertSame(Topology.EMPTY, service.current());
         service.onTopologyUpdate(topology1);
@@ -179,7 +182,7 @@ public class TopologyManagerTest
                                       shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
                                       shard(range(200, 300), idList(4, 5, 6), idSet(5, 6)));
 
-        TopologyManager service = new TopologyManager(ID);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1);
         service.onTopologyUpdate(topology2);
 
@@ -193,6 +196,5 @@ public class TopologyManagerTest
         Topologies actual = service.withUnsyncedEpochs(keys(150, 250), 2, 2);
         Assertions.assertEquals(topologies(topology2, topology(1, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))),
                                 actual);
-        Assertions.assertFalse(actual.fastPathPermitted());
     }
 }
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index 2b6fd84..4be9914 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -27,28 +27,31 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Supplier;
+import java.util.function.*;
 
+// TODO: add change replication factor
 public class TopologyRandomizer
 {
     private static final Logger logger = LoggerFactory.getLogger(TopologyRandomizer.class);
     private final Random random;
-    private final Function<Node.Id, Node> lookup;
+    private final BiConsumer<Node.Id, Topology> notifier;
     private final List<Topology> epochs = new ArrayList<>();
     private final Map<Node.Id, KeyRanges> previouslyReplicated = new HashMap<>();
     private final TopologyUpdates topologyUpdates;
 
     public TopologyRandomizer(Supplier<Random> randomSupplier, Topology initialTopology, TopologyUpdates topologyUpdates, Function<Node.Id, Node> lookup)
+    {
+        this(randomSupplier, initialTopology, topologyUpdates, (id, topology) -> topologyUpdates.notify(lookup.apply(id), topology.nodes(), topology));
+    }
+    public TopologyRandomizer(Supplier<Random> randomSupplier, Topology initialTopology, TopologyUpdates topologyUpdates, BiConsumer<Node.Id, Topology> notifier)
     {
         this.random = randomSupplier.get();
         this.topologyUpdates = topologyUpdates;
-        this.lookup = lookup;
         this.epochs.add(Topology.EMPTY);
         this.epochs.add(initialTopology);
         for (Node.Id node : initialTopology.nodes())
             previouslyReplicated.put(node, initialTopology.rangesForNode(node));
+        this.notifier = notifier;
     }
 
     private enum UpdateType
@@ -209,13 +212,17 @@ public class TopologyRandomizer
         return false;
     }
 
-    public synchronized void maybeUpdateTopology()
-    {
+    public synchronized void maybeUpdateTopology() {
         // if we don't limit the number of pending topology changes in flight,
         // the topology randomizer will keep the burn test busy indefinitely
         if (topologyUpdates.pendingTopologies() > 5 || random.nextInt(200) != 0)
             return;
 
+        updateTopology();
+    }
+
+    public synchronized Topology updateTopology()
+    {
         Topology current = epochs.get(epochs.size() - 1);
         Shard[] shards = current.unsafeGetShards().clone();
         int mutations = random.nextInt(current.size());
@@ -231,7 +238,7 @@ public class TopologyRandomizer
         //  In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
         //  convoluted without the ability to jettison epochs.
         if (reassignsRanges(current, shards, previouslyReplicated))
-            return;
+            return null;
 
         Map<Node.Id, KeyRanges> nextAdditions = getAdditions(current, nextTopology);
         for (Map.Entry<Node.Id, KeyRanges> entry : nextAdditions.entrySet())
@@ -248,7 +255,7 @@ public class TopologyRandomizer
         List<Node.Id> nodes = new ArrayList<>(nextTopology.nodes());
 
         int originatorIdx = random.nextInt(nodes.size());
-        Node originator = lookup.apply(nodes.remove(originatorIdx));
-        topologyUpdates.notify(originator, nextTopology.nodes(), nextTopology);
+        notifier.accept(nodes.remove(originatorIdx), nextTopology);
+        return nextTopology;
     }
 }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 97a765d..64b0ec0 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -32,6 +32,7 @@ import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
 import accord.coordinate.Timeout;
+import accord.impl.SizeOfIntersectionSorter;
 import accord.local.CommandStores;
 import accord.impl.SimpleProgressLog;
 import accord.impl.InMemoryCommandStores;
@@ -296,7 +297,8 @@ public class Cluster implements Scheduler
                 MessageSink messageSink = sinks.create(node, randomSupplier.get());
                 lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
                                           nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE,
-                                          randomSupplier.get(), sinks, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
+                                          randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
+                                          SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
             }
 
             List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 77531c3..d861bf7 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -34,6 +34,7 @@ import java.util.function.Supplier;
 import accord.coordinate.Timeout;
 import accord.impl.SimpleProgressLog;
 import accord.impl.InMemoryCommandStores;
+import accord.impl.SizeOfIntersectionSorter;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.api.Scheduler;
@@ -160,7 +161,7 @@ public class Main
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
             on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
                           MaelstromStore::new, MaelstromAgent.INSTANCE, new Random(), scheduler,
-                          SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
+                          SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
             err.println("Initialized node " + init.self);
             err.flush();
             sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org