You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/11/22 10:08:29 UTC
[cassandra-accord] branch trunk updated: Refactor response tracking to improve efficiency and clarity; introduce dedicated property tests; re-activate fast-path during range movements
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 05b3376 Refactor response tracking to improve efficiency and clarity; introduce dedicated property tests; re-activate fast-path during range movements
05b3376 is described below
commit 05b33767b538d41620910a417d27ed78ce4f8d9c
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Fri Nov 4 11:35:27 2022 +0000
Refactor response tracking to improve efficiency and clarity; introduce dedicated property tests; re-activate fast-path during range movements
patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18056
---
.../src/main/java/accord/api/ProgressLog.java | 2 +-
.../src/main/java/accord/api/TopologySorter.java | 26 ++
.../java/accord/coordinate/AnyReadCoordinator.java | 115 --------
.../src/main/java/accord/coordinate/CheckOn.java | 41 ++-
.../main/java/accord/coordinate/CheckShards.java | 22 +-
.../main/java/accord/coordinate/CollectDeps.java | 7 +-
.../main/java/accord/coordinate/Coordinate.java | 192 +++----------
.../src/main/java/accord/coordinate/Execute.java | 38 +--
.../QuorumTracker.java => Exhausted.java} | 31 +--
.../src/main/java/accord/coordinate/FetchData.java | 9 +-
.../main/java/accord/coordinate/FindHomeKey.java | 4 +-
.../src/main/java/accord/coordinate/FindRoute.java | 7 +-
.../java/accord/coordinate/InformHomeOfTxn.java | 9 +-
.../main/java/accord/coordinate/Invalidate.java | 71 +++--
.../main/java/accord/coordinate/MaybeRecover.java | 11 +-
.../src/main/java/accord/coordinate/Persist.java | 3 +-
.../src/main/java/accord/coordinate/Propose.java | 16 +-
.../accord/coordinate/QuorumReadCoordinator.java | 213 ---------------
.../java/accord/coordinate/ReadCoordinator.java | 187 +++++++++++++
.../src/main/java/accord/coordinate/Recover.java | 81 ++----
.../java/accord/coordinate/RecoverWithHomeKey.java | 14 +-
.../java/accord/coordinate/RecoverWithRoute.java | 22 +-
.../coordinate/tracking/AbstractQuorumTracker.java | 119 --------
.../tracking/AbstractResponseTracker.java | 189 -------------
.../coordinate/tracking/AbstractTracker.java | 203 ++++++++++++++
.../coordinate/tracking/FastPathTracker.java | 142 ++++++++--
.../accord/coordinate/tracking/QuorumTracker.java | 65 ++++-
.../accord/coordinate/tracking/ReadTracker.java | 303 ++++++++++++---------
.../coordinate/tracking/RecoveryTracker.java | 75 +++++
.../accord/coordinate/tracking/RequestStatus.java | 8 +
.../accord/coordinate/tracking/ShardTracker.java | 17 ++
.../java/accord/impl/InMemoryCommandStore.java | 1 +
.../java/accord/impl/InMemoryCommandsForKey.java | 1 +
.../java/accord/impl/SizeOfIntersectionSorter.java | 51 ++++
accord-core/src/main/java/accord/local/Node.java | 4 +-
.../src/main/java/accord/local/SaveStatus.java | 1 -
.../main/java/accord/local/SyncCommandStores.java | 1 +
.../java/accord/messages/BeginInvalidation.java | 6 +-
.../src/main/java/accord/messages/ReadData.java | 1 +
.../src/main/java/accord/topology/Shard.java | 5 +
.../main/java/accord/topology/ShardSelection.java | 7 +
.../src/main/java/accord/topology/Topologies.java | 103 ++++---
.../src/main/java/accord/topology/Topology.java | 73 +++--
.../main/java/accord/topology/TopologyManager.java | 35 ++-
.../IndexedFunction.java} | 26 +-
.../IndexedTriFunction.java} | 26 +-
accord-core/src/test/java/accord/Utils.java | 4 +-
.../src/test/java/accord/burn/BurnTest.java | 2 +-
.../java/accord/coordinate/CoordinateTest.java | 7 +-
.../accord/coordinate/PreAcceptTrackerTest.java | 18 +-
.../java/accord/coordinate/TopologyChangeTest.java | 59 ----
.../tracking/FastPathTrackerReconciler.java | 59 ++++
.../tracking/QuorumTrackerReconciler.java | 56 ++++
.../coordinate/tracking/QuorumTrackerTest.java | 40 +--
.../coordinate/tracking/ReadTrackerReconciler.java | 84 ++++++
.../coordinate/tracking/ReadTrackerTest.java | 84 ++++--
.../tracking/RecoveryTrackerReconciler.java | 58 ++++
.../coordinate/tracking/TrackerReconciler.java | 116 ++++++++
.../coordinate/tracking/TrackerReconcilerTest.java | 49 ++++
.../src/test/java/accord/impl/TopologyFactory.java | 8 +-
.../src/test/java/accord/impl/basic/Cluster.java | 4 +-
.../test/java/accord/impl/list/ListRequest.java | 8 +-
.../src/test/java/accord/impl/mock/EpochSync.java | 6 +-
.../test/java/accord/impl/mock/MockCluster.java | 7 +-
.../src/test/java/accord/local/CommandTest.java | 2 +-
.../test/java/accord/messages/PreAcceptTest.java | 1 +
.../java/accord/messages/TxnRequestScopeTest.java | 5 +-
.../java/accord/topology/TopologyManagerTest.java | 16 +-
.../java/accord/topology/TopologyRandomizer.java | 27 +-
.../src/main/java/accord/maelstrom/Cluster.java | 4 +-
.../src/main/java/accord/maelstrom/Main.java | 3 +-
71 files changed, 1872 insertions(+), 1438 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java b/accord-core/src/main/java/accord/api/ProgressLog.java
index cf535d1..47dfb6a 100644
--- a/accord-core/src/main/java/accord/api/ProgressLog.java
+++ b/accord-core/src/main/java/accord/api/ProgressLog.java
@@ -46,7 +46,7 @@ import accord.primitives.*;
*
* - Non-home shards may also be informed of transactions that are blocking the progress of other transactions.
* If the {@code waitingOn} transaction that is blocking progress is uncommitted it is required that the progress
- * log invoke {@link CheckOn#checkOnUncommitted} for the transaction if no {@link #committed} is witnessed.
+ * log invoke {@link accord.coordinate.FetchData#fetch} for the transaction if no {@link #committed} is witnessed.
*
* - Members of the home shard will be informed of a transaction to monitor by the invocation of {@link #preaccepted} or
* {@link #accepted}. If this is not followed closely by {@link #committed}, {@link accord.coordinate.MaybeRecover} should
diff --git a/accord-core/src/main/java/accord/api/TopologySorter.java b/accord-core/src/main/java/accord/api/TopologySorter.java
new file mode 100644
index 0000000..c3587b2
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/TopologySorter.java
@@ -0,0 +1,26 @@
+package accord.api;
+
+import accord.local.Node;
+import accord.topology.ShardSelection;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+
+public interface TopologySorter
+{
+ interface Supplier
+ {
+ TopologySorter get(Topology topologies);
+ TopologySorter get(Topologies topologies);
+ }
+
+ interface StaticSorter extends Supplier, TopologySorter
+ {
+ default TopologySorter get(Topology topologies) { return this; }
+ default TopologySorter get(Topologies topologies) { return this; }
+ }
+
+ /**
+ * Compare two nodes that occur in some topologies, so that the one that is *least* preferable sorts first
+ */
+ int compare(Node.Id node1, Node.Id node2, ShardSelection shards);
+}
diff --git a/accord-core/src/main/java/accord/coordinate/AnyReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/AnyReadCoordinator.java
deleted file mode 100644
index 0a4bfa3..0000000
--- a/accord-core/src/main/java/accord/coordinate/AnyReadCoordinator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package accord.coordinate;
-
-import java.util.Set;
-
-import accord.coordinate.tracking.ReadTracker;
-import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.Callback;
-import accord.primitives.TxnId;
-import accord.topology.Topologies;
-
-abstract class AnyReadCoordinator<Reply> implements Callback<Reply>
-{
- enum Action { Abort, TryAlternative, Accept, Success }
-
- final Node node;
- final TxnId txnId;
- final ReadTracker<ReadShardTracker> tracker;
- private boolean isDone;
- private Throwable failure;
-
- AnyReadCoordinator(Node node, Topologies topologies, TxnId txnId)
- {
- this.node = node;
- this.txnId = txnId;
- this.tracker = new ReadTracker<>(topologies, ReadShardTracker[]::new, ReadShardTracker::new);
- }
-
- void start()
- {
- start(tracker.computeMinimalReadSetAndMarkInflight());
- }
-
- void start(Set<Id> nodes)
- {
- contact(nodes);
- }
-
- abstract void contact(Set<Id> nodes);
- abstract Action process(Id from, Reply reply);
- abstract void onSuccess();
- abstract void onFailure(Throwable t);
-
- @Override
- public void onSuccess(Id from, Reply reply)
- {
- if (isDone)
- return;
-
- switch (process(from, reply))
- {
- default: throw new IllegalStateException();
- case Abort:
- isDone = true;
- break;
-
- case TryAlternative:
- onSlowResponse(from);
- break;
-
- case Accept:
- if (!(tracker.recordReadSuccess(from) && tracker.hasCompletedRead()))
- break;
-
- case Success:
- isDone = true;
- onSuccess();
- if (failure != null)
- node.agent().onHandledException(failure); // TODO: introduce dedicated Agent method for this case
- }
- }
-
- @Override
- public void onSlowResponse(Id from)
- {
- tracker.recordSlowRead(from);
- Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
- if (readFrom != null)
- contact(readFrom);
- }
-
- @Override
- public void onFailure(Id from, Throwable failure)
- {
- if (isDone)
- return;
-
- if (tracker.recordReadFailure(from))
- {
- if (this.failure == null) this.failure = failure;
- else this.failure.addSuppressed(failure);
-
- Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
- if (readFrom != null)
- {
- contact(readFrom);
- }
- else if (tracker.hasFailed())
- {
- isDone = true;
- onFailure(this.failure);
- }
- }
- }
-
- @Override
- public void onCallbackFailure(Id from, Throwable failure)
- {
- isDone = true;
- if (this.failure != null)
- failure.addSuppressed(this.failure);
- onFailure(failure);
- }
-}
diff --git a/accord-core/src/main/java/accord/coordinate/CheckOn.java b/accord-core/src/main/java/accord/coordinate/CheckOn.java
index 81e6580..a20b7ff 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckOn.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckOn.java
@@ -76,14 +76,9 @@ public class CheckOn extends CheckShards
// TODO: many callers only need to consult precisely executeAt.epoch remotely
public static CheckOn checkOn(Known sufficientStatus, Node node, TxnId txnId, AbstractRoute route, long srcEpoch, long untilLocalEpoch, BiConsumer<? super CheckStatusOkFull, Throwable> callback)
{
- CheckOn checkOnCommitted = new CheckOn(node, sufficientStatus, txnId, route, srcEpoch, untilLocalEpoch, callback);
- checkOnCommitted.start();
- return checkOnCommitted;
- }
-
- public static CheckOn checkOnUncommitted(Node node, TxnId txnId, AbstractRoute route, long untilLocalEpoch, BiConsumer<? super CheckStatusOkFull, Throwable> callback)
- {
- return checkOn(ExecutionOrder, node, txnId, route, txnId.epoch, untilLocalEpoch, callback);
+ CheckOn checkOn = new CheckOn(node, sufficientStatus, txnId, route, srcEpoch, untilLocalEpoch, callback);
+ checkOn.start();
+ return checkOn;
}
protected AbstractRoute route()
@@ -94,23 +89,39 @@ public class CheckOn extends CheckShards
@Override
protected boolean isSufficient(Id from, CheckStatusOk ok)
{
- return ((CheckStatusOkFull)ok).sufficientFor(route()).compareTo(sufficient) >= 0;
+ KeyRanges rangesForNode = topologies().computeRangesForNode(from);
+ PartialRoute scope = this.route.slice(rangesForNode);
+ return isSufficient(scope, ok);
}
@Override
- protected void onDone(Done done, Throwable failure)
+ protected boolean isSufficient(CheckStatusOk ok)
{
+ return isSufficient(route, ok);
+ }
+
+ protected boolean isSufficient(AbstractRoute scope, CheckStatusOk ok)
+ {
+ return ((CheckStatusOkFull)ok).sufficientFor(scope).compareTo(sufficient) >= 0;
+ }
+
+ @Override
+ protected void onDone(Success success, Throwable failure)
+ {
+ Preconditions.checkState((success == null) != (failure == null));
if (failure != null)
{
callback.accept(null, failure);
}
- else if (merged == null || merged.saveStatus == NotWitnessed)
- {
- callback.accept(CheckStatusOkFull.NOT_WITNESSED, null);
- }
else
{
- new OnDone().start();
+ if (success == Success.Success)
+ Preconditions.checkState(isSufficient(merged));
+
+ if (merged.saveStatus == NotWitnessed)
+ callback.accept(CheckStatusOkFull.NOT_WITNESSED, null);
+ else
+ new OnDone().start();
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index a642b9b..913db02 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -1,7 +1,5 @@
package accord.coordinate;
-import java.util.Set;
-
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.CheckStatus;
@@ -16,7 +14,7 @@ import accord.topology.Topologies;
* A result of null indicates the transaction is globally persistent
* A result of CheckStatusOk indicates the maximum status found for the transaction, which may be used to assess progress
*/
-public abstract class CheckShards extends QuorumReadCoordinator<CheckStatusReply>
+public abstract class CheckShards extends ReadCoordinator<CheckStatusReply>
{
final RoutingKeys contactKeys;
@@ -44,24 +42,22 @@ public abstract class CheckShards extends QuorumReadCoordinator<CheckStatusReply
}
@Override
- protected void contact(Set<Id> nodes)
+ protected void contact(Id id)
{
- node.send(nodes, new CheckStatus(txnId, contactKeys, txnId.epoch, untilRemoteEpoch, includeInfo), this);
+ node.send(id, new CheckStatus(txnId, contactKeys, txnId.epoch, untilRemoteEpoch, includeInfo), this);
}
- protected abstract boolean isSufficient(Id from, CheckStatusOk ok);
+ protected boolean isSufficient(Id from, CheckStatusOk ok) { return isSufficient(ok); }
+ protected abstract boolean isSufficient(CheckStatusOk ok);
- protected Action check(Id from, CheckStatusOk ok)
+ protected Action checkSufficient(Id from, CheckStatusOk ok)
{
if (isSufficient(from, ok))
- return Action.Accept;
+ return Action.Approve;
- return Action.AcceptQuorum;
+ return Action.ApproveIfQuorum;
}
- @Override
- protected abstract void onDone(Done done, Throwable failure);
-
@Override
protected Action process(Id from, CheckStatusReply reply)
{
@@ -72,7 +68,7 @@ public abstract class CheckShards extends QuorumReadCoordinator<CheckStatusReply
if (merged == null) merged = ok;
else merged = merged.merge(ok);
- return check(from, ok);
+ return checkSufficient(from, ok);
}
else
{
diff --git a/accord-core/src/main/java/accord/coordinate/CollectDeps.java b/accord-core/src/main/java/accord/coordinate/CollectDeps.java
index de55cef..5b615bd 100644
--- a/accord-core/src/main/java/accord/coordinate/CollectDeps.java
+++ b/accord-core/src/main/java/accord/coordinate/CollectDeps.java
@@ -17,6 +17,9 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
+import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.coordinate.tracking.RequestStatus.Success;
+
class CollectDeps implements Callback<GetDepsOk>
{
final Node node;
@@ -57,14 +60,14 @@ class CollectDeps implements Callback<GetDepsOk>
return;
oks.add(ok);
- if (tracker.success(from))
+ if (tracker.recordSuccess(from) == Success)
onQuorum();
}
@Override
public void onFailure(Id from, Throwable failure)
{
- if (tracker.failure(from))
+ if (tracker.recordFailure(from) == Failed)
{
isDone = true;
callback.accept(null, new Timeout(txnId, route.homeKey));
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java b/accord-core/src/main/java/accord/coordinate/Coordinate.java
index a126dc9..588f72c 100644
--- a/accord-core/src/main/java/accord/coordinate/Coordinate.java
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -18,16 +18,13 @@
package accord.coordinate;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.function.BiConsumer;
import accord.api.Result;
import accord.coordinate.tracking.FastPathTracker;
+import accord.coordinate.tracking.RequestStatus;
import accord.primitives.Route;
-import accord.topology.Shard;
import accord.topology.Topologies;
import accord.primitives.Ballot;
import accord.messages.Callback;
@@ -40,7 +37,7 @@ import accord.messages.PreAccept.PreAcceptOk;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.messages.PreAccept.PreAcceptReply;
-import com.google.common.collect.Sets;
+import com.google.common.base.Preconditions;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
@@ -51,110 +48,19 @@ import static accord.messages.Commit.Invalidate.commitInvalidate;
/**
* Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
* If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO: dedicated burn test to validate outcomes
*/
public class Coordinate extends AsyncFuture<Result> implements Callback<PreAcceptReply>, BiConsumer<Result, Throwable>
{
- static class ShardTracker extends FastPathTracker.FastPathShardTracker
- {
- public ShardTracker(Shard shard)
- {
- super(shard);
- }
-
- @Override
- public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
- {
- return withFastPathTimestamp && shard.fastPathElectorate.contains(node);
- }
-
- @Override
- public boolean hasMetFastPathCriteria()
- {
- return fastPathAccepts >= shard.fastPathQuorumSize;
- }
- }
-
- static class PreacceptTracker extends FastPathTracker<ShardTracker>
- {
- volatile long supersedingEpoch = -1;
- private final boolean fastPathPermitted;
- private final Set<Id> successes = new HashSet<>();
- private Set<Id> failures;
-
- public PreacceptTracker(Topologies topologies, boolean fastPathPermitted)
- {
- super(topologies, Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
- this.fastPathPermitted = fastPathPermitted;
- }
-
- public PreacceptTracker(Topologies topologies)
- {
- this(topologies, topologies.fastPathPermitted());
- }
-
- @Override
- public boolean failure(Id node)
- {
- if (failures == null)
- failures = new HashSet<>();
- failures.add(node);
- return super.failure(node);
- }
-
- @Override
- public void recordSuccess(Id node, boolean withFastPathTimestamp)
- {
- successes.add(node);
- super.recordSuccess(node, withFastPathTimestamp);
- }
-
- public void recordSuccess(Id node)
- {
- recordSuccess(node, false);
- }
-
- public synchronized boolean recordSupersedingEpoch(long epoch)
- {
- if (epoch <= supersedingEpoch)
- return false;
- supersedingEpoch = epoch;
- return true;
- }
-
- public boolean hasSupersedingEpoch()
- {
- return supersedingEpoch > 0;
- }
-
- public PreacceptTracker withUpdatedTopologies(Topologies topologies)
- {
- PreacceptTracker tracker = new PreacceptTracker(topologies, false);
- successes.forEach(tracker::recordSuccess);
- if (failures != null)
- failures.forEach(tracker::failure);
- return tracker;
- }
-
- @Override
- public boolean hasMetFastPathCriteria()
- {
- return fastPathPermitted && super.hasMetFastPathCriteria();
- }
-
- boolean shouldSlowPathAccept()
- {
- return (!fastPathPermitted || !hasInFlight()) && hasReachedQuorum();
- }
- }
-
final Node node;
final TxnId txnId;
final Txn txn;
final Route route;
- private PreacceptTracker tracker;
- private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
+ private FastPathTracker tracker;
private boolean preAcceptIsDone;
+ private final List<PreAcceptOk> successes;
private Coordinate(Node node, TxnId txnId, Txn txn, Route route)
{
@@ -163,7 +69,8 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
this.txn = txn;
this.route = route;
Topologies topologies = node.topology().withUnsyncedEpochs(route, txnId, txnId);
- this.tracker = new PreacceptTracker(topologies);
+ this.tracker = new FastPathTracker(topologies);
+ this.successes = new ArrayList<>(tracker.topologies().estimateUniqueNodes());
}
private void start()
@@ -186,15 +93,18 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
if (preAcceptIsDone)
return;
- if (tracker.failure(from))
+ switch (tracker.recordFailure(from))
{
- preAcceptIsDone = true;
- tryFailure(new Timeout(txnId, route.homeKey));
+ default: throw new AssertionError();
+ case NoChange:
+ break;
+ case Failed:
+ preAcceptIsDone = true;
+ tryFailure(new Timeout(txnId, route.homeKey));
+ break;
+ case Success:
+ onPreAccepted();
}
-
- // if no other responses are expected and the slow quorum has been satisfied, proceed
- if (tracker.shouldSlowPathAccept())
- onPreAccepted();
}
@Override
@@ -203,29 +113,6 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
tryFailure(failure);
}
- // TODO (soon): I don't think we need to preaccept in later epochs? the Sync logic should take care of it for us,
- // since either we haven't synced between a majority and the earlier epochs are still involved for
- // later preaccepts, or they have been sync'd and the earlier transactions are known to the later epochs
- // (also, we aren't doing this on recovery, and everything works...)
- private synchronized void onEpochUpdate()
- {
- if (!tracker.hasSupersedingEpoch())
- return;
- Topologies newTopologies = node.topology().withUnsyncedEpochs(txn.keys(), txnId.epoch, tracker.supersedingEpoch);
- if (newTopologies.currentEpoch() < tracker.supersedingEpoch)
- return;
- Set<Id> previousNodes = tracker.nodes();
- tracker = tracker.withUpdatedTopologies(newTopologies);
-
- // send messages to new nodes
- Set<Id> needMessages = Sets.difference(tracker.nodes(), previousNodes);
- if (!needMessages.isEmpty())
- node.send(needMessages, to -> new PreAccept(to, newTopologies, txnId, txn, route), this);
-
- if (tracker.shouldSlowPathAccept())
- onPreAccepted();
- }
-
public synchronized void onSuccess(Id from, PreAcceptReply reply)
{
if (preAcceptIsDone)
@@ -239,40 +126,32 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
}
PreAcceptOk ok = (PreAcceptOk) reply;
- preAcceptOks.add(ok);
+ successes.add(ok);
boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
- tracker.recordSuccess(from, fastPath);
-
- // TODO: we should only update epoch if we need to in order to reach quorum
- if (!fastPath && ok.witnessedAt.epoch > txnId.epoch && tracker.recordSupersedingEpoch(ok.witnessedAt.epoch))
- {
- node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch);
- node.topology().awaitEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate);
- }
-
- if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || tracker.shouldSlowPathAccept()))
- onPreAccepted(); // note, can already have invoked onPreAccepted in onEpochUpdate
+ // TODO: update formalisation (and proof), as we do not seek additional pre-accepts from later epochs.
+ // instead we rely on accept to do our work: a quorum of accept in the later epoch
+ // and its effect on preaccepted timestamps and the deps it returns create our sync point.
+ if (tracker.recordSuccess(from, fastPath) == RequestStatus.Success)
+ onPreAccepted();
}
- private void onPreAccepted()
+ private synchronized void onPreAccepted()
{
- if (preAcceptIsDone)
- return;
-
+ Preconditions.checkState(!preAcceptIsDone);
preAcceptIsDone = true;
- if (tracker.hasMetFastPathCriteria())
+
+ if (tracker.hasFastPathAccepted())
{
- preAcceptIsDone = true;
- Deps deps = Deps.merge(preAcceptOks, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
+ Deps deps = Deps.merge(successes, ok -> ok.witnessedAt.equals(txnId) ? ok.deps : null);
Execute.execute(node, txnId, txn, route, txnId, deps, this);
}
else
{
- Deps deps = Deps.merge(preAcceptOks, ok -> ok.deps);
+ Deps deps = Deps.merge(successes, ok -> ok.deps);
Timestamp executeAt; {
Timestamp accumulate = Timestamp.NONE;
- for (PreAcceptOk preAcceptOk : preAcceptOks)
+ for (PreAcceptOk preAcceptOk : successes)
accumulate = Timestamp.max(accumulate, preAcceptOk.witnessedAt);
executeAt = accumulate;
}
@@ -296,7 +175,12 @@ public class Coordinate extends AsyncFuture<Result> implements Callback<PreAccep
}
else
{
- node.withEpoch(executeAt.epoch, () -> Propose.propose(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, executeAt, deps, this));
+ node.withEpoch(executeAt.epoch, () -> {
+ Topologies topologies = tracker.topologies();
+ if (executeAt.epoch > txnId.epoch)
+ topologies = node.topology().withUnsyncedEpochs(txn, txnId.epoch, executeAt.epoch);
+ Propose.propose(node, topologies, Ballot.ZERO, txnId, txn, route, executeAt, deps, this);
+ });
}
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 9d4103e..3bac876 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -34,10 +34,10 @@ import accord.local.Node.Id;
import accord.messages.Commit;
import accord.topology.Topology;
-import static accord.coordinate.AnyReadCoordinator.Action.Accept;
+import static accord.coordinate.ReadCoordinator.Action.Approve;
import static accord.messages.Commit.Kind.Maximal;
-class Execute extends AnyReadCoordinator<ReadReply>
+class Execute extends ReadCoordinator<ReadReply>
{
final Txn txn;
final Keys readScope;
@@ -78,26 +78,26 @@ class Execute extends AnyReadCoordinator<ReadReply>
}
@Override
- void start(Set<Id> readSet)
+ protected void start(Set<Id> readSet)
{
Commit.commitMinimalAndRead(node, applyTo, txnId, txn, route, readScope, executeAt, deps, readSet, this);
}
@Override
- void contact(Set<Id> nodes)
+ public void contact(Id to)
{
- node.send(nodes, to -> new ReadData(to, tracker.topologies(), txnId, readScope, executeAt), this);
+ node.send(to, new ReadData(to, topologies(), txnId, readScope, executeAt), this);
}
@Override
- Action process(Id from, ReadReply reply)
+ protected Action process(Id from, ReadReply reply)
{
if (reply.isOk())
{
Data next = ((ReadOk) reply).data;
if (next != null)
data = data == null ? next : data.merge(next);
- return Accept;
+ return Approve;
}
ReadNack nack = (ReadNack) reply;
@@ -120,19 +120,21 @@ class Execute extends AnyReadCoordinator<ReadReply>
}
}
- @Override
- void onSuccess()
- {
- Result result = txn.result(txnId, data);
- callback.accept(result, null);
- // avoid re-calculating topologies if it is unchanged
- Topologies sendTo = txnId.epoch == executeAt.epoch ? applyTo : node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
- Persist.persist(node, sendTo, applyTo, txnId, route, txn, executeAt, deps, txn.execute(executeAt, data), result);
- }
@Override
- public void onFailure(Throwable failure)
+ protected void onDone(Success success, Throwable failure)
{
- callback.accept(null, failure);
+ if (failure == null)
+ {
+ Result result = txn.result(txnId, data);
+ callback.accept(result, null);
+ // avoid re-calculating topologies if it is unchanged
+ Topologies sendTo = txnId.epoch == executeAt.epoch ? applyTo : node.topology().preciseEpochs(route, txnId.epoch, executeAt.epoch);
+ Persist.persist(node, sendTo, applyTo, txnId, route, txn, executeAt, deps, txn.execute(executeAt, data), result);
+ }
+ else
+ {
+ callback.accept(null, failure);
+ }
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/Exhausted.java
similarity index 51%
copy from accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to accord-core/src/main/java/accord/coordinate/Exhausted.java
index 428c340..70bc02d 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/Exhausted.java
@@ -16,29 +16,20 @@
* limitations under the License.
*/
-package accord.coordinate.tracking;
+package accord.coordinate;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
-import accord.local.Node;
-import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
+import accord.api.RoutingKey;
+import accord.primitives.TxnId;
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
-{
- public QuorumTracker(Topologies topologies)
- {
- super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
- }
-
- public QuorumTracker(Topology topology)
- {
- super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
- }
+import javax.annotation.Nullable;
- // return true iff hasReachedQuorum()
- public boolean success(Node.Id node)
+/**
+ * Thrown when a transaction exceeds its specified timeout for obtaining a result for a client
+ */
+public class Exhausted extends CoordinateFailed
+{
+ public Exhausted(TxnId txnId, @Nullable RoutingKey homeKey)
{
- return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
+ super(txnId, homeKey);
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 6037d07..8dd066f 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -67,15 +67,8 @@ public class FetchData
return fetchInternal(ranges, phase, node, txnId, route.sliceStrict(ranges), executeAt, untilLocalEpoch, callback);
}
- private static Object fetchInternal(Known target, Node node, TxnId txnId, PartialRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)
+ private static Object fetchInternal(KeyRanges ranges, Known target, Node node, TxnId txnId, PartialRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)
{
- KeyRanges ranges = node.topology().localRangesForEpochs(txnId.epoch, untilLocalEpoch);
- return fetchInternal(ranges, target, node, txnId, route, executeAt, untilLocalEpoch, callback);
- }
-
- private static Object fetchInternal(KeyRanges localRanges, Known target, Node node, TxnId txnId, PartialRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)
- {
- KeyRanges ranges = node.topology().localRangesForEpochs(txnId.epoch, untilLocalEpoch);
PartialRoute fetch = route.sliceStrict(ranges);
long srcEpoch = target == ExecutionOrder || executeAt == null ? txnId.epoch : executeAt.epoch;
return CheckOn.checkOn(target, node, txnId, fetch, srcEpoch, untilLocalEpoch, (ok, fail) -> {
diff --git a/accord-core/src/main/java/accord/coordinate/FindHomeKey.java b/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
index c7075de..0972c8e 100644
--- a/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
+++ b/accord-core/src/main/java/accord/coordinate/FindHomeKey.java
@@ -30,13 +30,13 @@ public class FindHomeKey extends CheckShards
}
@Override
- protected boolean isSufficient(Id from, CheckStatusOk ok)
+ protected boolean isSufficient(CheckStatusOk ok)
{
return ok.homeKey != null;
}
@Override
- protected void onDone(Done done, Throwable failure)
+ protected void onDone(Success success, Throwable failure)
{
if (failure != null) callback.accept(null, failure);
else callback.accept(merged == null ? null : merged.homeKey, null);
diff --git a/accord-core/src/main/java/accord/coordinate/FindRoute.java b/accord-core/src/main/java/accord/coordinate/FindRoute.java
index 1ce1bb6..721110e 100644
--- a/accord-core/src/main/java/accord/coordinate/FindRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/FindRoute.java
@@ -51,15 +51,16 @@ public class FindRoute extends CheckShards
}
@Override
- protected boolean isSufficient(Id from, CheckStatusOk ok)
+ protected boolean isSufficient(CheckStatusOk ok)
{
return ok.route instanceof Route;
}
@Override
- protected void onDone(Done done, Throwable failure)
+ protected void onDone(Success success, Throwable failure)
{
if (failure != null) callback.accept(null, failure);
- else callback.accept(merged == null || merged.route == null ? null : new Result(merged), null);
+ else if (success == Success.Success) callback.accept(new Result(merged), null);
+ else callback.accept(null, null);
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
index 363e28f..9ab76fc 100644
--- a/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/InformHomeOfTxn.java
@@ -19,7 +19,7 @@
package accord.coordinate;
import accord.api.RoutingKey;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
import accord.local.Node;
import accord.local.Node.Id;
import accord.messages.Callback;
@@ -30,6 +30,9 @@ import accord.primitives.TxnId;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
+
public class InformHomeOfTxn extends AsyncFuture<Void> implements Callback<SimpleReply>
{
final TxnId txnId;
@@ -61,7 +64,7 @@ public class InformHomeOfTxn extends AsyncFuture<Void> implements Callback<Simpl
{
default:
case Ok:
- if (tracker.success(from))
+ if (tracker.onSuccess(null) == Success)
trySuccess(null);
break;
@@ -78,7 +81,7 @@ public class InformHomeOfTxn extends AsyncFuture<Void> implements Callback<Simpl
else this.failure.addSuppressed(failure);
// TODO: if we fail and have an incorrect topology, trigger refresh
- if (tracker.failure(from))
+ if (tracker.onFailure(null) == Fail)
tryFailure(this.failure);
}
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index 1aae425..fa056c8 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -22,11 +22,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.SaveStatus;
import accord.primitives.*;
import com.google.common.base.Preconditions;
import accord.api.RoutingKey;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.Status;
@@ -38,25 +39,28 @@ import accord.messages.Callback;
import accord.topology.Shard;
import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
import static accord.local.PreLoadContext.contextFor;
-import static accord.local.Status.Accepted;
-import static accord.local.Status.PreAccepted;
+import static accord.local.Status.*;
+import static accord.local.Status.Known.Definition;
import static accord.messages.Commit.Invalidate.commitInvalidate;
import static accord.primitives.ProgressToken.INVALIDATED;
public class Invalidate implements Callback<InvalidateReply>
{
- final Node node;
- final Ballot ballot;
- final TxnId txnId;
- final RoutingKeys informKeys;
- final RoutingKey invalidateWithKey;
- final Status recoverIfAtLeast;
- final BiConsumer<Outcome, Throwable> callback;
-
- boolean isDone;
- final List<InvalidateOk> invalidateOks = new ArrayList<>();
- final QuorumShardTracker preacceptTracker;
+ private final Node node;
+ private final Ballot ballot;
+ private final TxnId txnId;
+ private final RoutingKeys informKeys;
+ private final RoutingKey invalidateWithKey;
+ private final Status recoverIfAtLeast;
+ private final BiConsumer<Outcome, Throwable> callback;
+
+ private boolean isDone;
+ private boolean isBallotPromised;
+ private final List<InvalidateOk> invalidateOks = new ArrayList<>();
+ private final QuorumTracker.QuorumShardTracker tracker;
private Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, Status recoverIfAtLeast, BiConsumer<Outcome, Throwable> callback)
{
@@ -68,15 +72,17 @@ public class Invalidate implements Callback<InvalidateReply>
this.informKeys = informKeys;
this.invalidateWithKey = invalidateWithKey;
this.recoverIfAtLeast = recoverIfAtLeast;
- this.preacceptTracker = new QuorumShardTracker(shard);
+ this.tracker = new QuorumTracker.QuorumShardTracker(shard);
}
- public static Invalidate invalidateIfNotWitnessed(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
+ public static Invalidate invalidate(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
{
return invalidate(node, txnId, informKeys, invalidateWithKey, PreAccepted, callback);
}
- public static Invalidate invalidate(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
+ // TODO (now): (file separately) this is a bug, as it is possible for a preaccept to still be in-flight.
+ // It's not even safe to do it after an initial Invalidate round unless we ensure that recovery never pre-accepts with txnId (which we should also enforce)
+ public static Invalidate invalidateIfNotAccepted(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
{
return invalidate(node, txnId, informKeys, invalidateWithKey, Accepted, callback);
}
@@ -93,7 +99,7 @@ public class Invalidate implements Callback<InvalidateReply>
@Override
public synchronized void onSuccess(Id from, InvalidateReply reply)
{
- if (isDone || preacceptTracker.hasReachedQuorum())
+ if (isDone || isBallotPromised)
return;
if (!reply.isOk())
@@ -113,19 +119,22 @@ public class Invalidate implements Callback<InvalidateReply>
InvalidateOk ok = (InvalidateOk) reply;
invalidateOks.add(ok);
- if (preacceptTracker.success(from))
+ if (tracker.onSuccess(from) == Success)
invalidate();
}
private void invalidate()
{
+ Preconditions.checkState(!isBallotPromised);
+ isBallotPromised = true;
// first look to see if it has already been
{
- Status maxStatus = invalidateOks.stream().map(ok -> ok.status).max(Comparable::compareTo).orElseThrow(IllegalStateException::new);
Route route = InvalidateOk.findRoute(invalidateOks);
RoutingKey homeKey = route != null ? route.homeKey : InvalidateOk.findHomeKey(invalidateOks);
+ SaveStatus maxStatus = invalidateOks.stream().map(ok -> ok.status).max(Comparable::compareTo).orElseThrow(IllegalStateException::new);
+ Known maxKnown = invalidateOks.stream().map(ok -> ok.status.known).max(Comparable::compareTo).orElseThrow(IllegalStateException::new);
- switch (maxStatus)
+ switch (maxStatus.status)
{
default: throw new IllegalStateException();
case AcceptedInvalidate:
@@ -133,13 +142,13 @@ public class Invalidate implements Callback<InvalidateReply>
case NotWitnessed:
break;
- case PreAccepted:
+ case PreAccepted:
case Accepted:
// note: we do not attempt to calculate PreAccept outcome here, we rely on the caller to tell us
// what is safe to do. If the caller knows no decision was reached with PreAccept, we can safely
// invalidate if we see PreAccept, and only need to recover if we see Accept
// TODO: if we see Accept, go straight to propose to save some unnecessary work
- if (recoverIfAtLeast.compareTo(maxStatus) > 0)
+ if (recoverIfAtLeast.compareTo(maxStatus.status) > 0)
break;
case Committed:
@@ -151,12 +160,11 @@ public class Invalidate implements Callback<InvalidateReply>
{
RecoverWithRoute.recover(node, ballot, txnId, route, callback);
}
- else if (homeKey != null && homeKey.equals(invalidateWithKey))
- {
- throw new IllegalStateException("Received a reply from a node that must have known the route, but that did not include it");
- }
else if (homeKey != null)
{
+ if (homeKey.equals(invalidateWithKey) && maxKnown.compareTo(Definition) >= 0)
+ throw new IllegalStateException("Received a reply from a node that must have known the route, but that did not include it");
+
RecoverWithHomeKey.recover(node, txnId, homeKey, callback);
}
else
@@ -179,6 +187,11 @@ public class Invalidate implements Callback<InvalidateReply>
// if we have witnessed the transaction, but are able to invalidate, do we want to proceed?
// Probably simplest to do so, but perhaps better for user if we don't.
proposeInvalidate(node, ballot, txnId, invalidateWithKey, (success, fail) -> {
+ /**
+ * We're now inside our *exactly once* callback we registered with proposeInvalidate, and we need to
+ * make sure we honour our own exactly once semantics with {@code callback}.
+ * So we are responsible for all exception handling.
+ */
isDone = true;
if (fail != null)
{
@@ -211,10 +224,10 @@ public class Invalidate implements Callback<InvalidateReply>
@Override
public void onFailure(Id from, Throwable failure)
{
- if (isDone)
+ if (isDone || isBallotPromised)
return;
- if (preacceptTracker.failure(from))
+ if (tracker.onFailure(from) == Fail)
{
isDone = true;
callback.accept(null, new Timeout(txnId, null));
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index d360f2d..067b258 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -62,7 +62,7 @@ public class MaybeRecover extends CheckShards
}
@Override
- protected boolean isSufficient(Id from, CheckStatusOk ok)
+ protected boolean isSufficient(CheckStatusOk ok)
{
return hasMadeProgress(ok);
}
@@ -74,18 +74,15 @@ public class MaybeRecover extends CheckShards
}
@Override
- protected void onDone(Done done, Throwable fail)
+ protected void onDone(Success success, Throwable fail)
{
if (fail != null)
{
callback.accept(null, fail);
}
- else if (merged == null)
- {
- callback.accept(null, new Timeout(txnId, homeKey));
- }
else
{
+ Preconditions.checkState(merged != null);
switch (merged.saveStatus.known)
{
default: throw new AssertionError();
@@ -96,7 +93,7 @@ public class MaybeRecover extends CheckShards
RoutingKeys someKeys = reduceNonNull(RoutingKeys::union, this.contactKeys, merged.route, route);
// for correctness reasons, we have not necessarily preempted the initial pre-accept round and
// may have raced with it, so we must attempt to recover anything we see pre-accepted.
- Invalidate.invalidateIfNotWitnessed(node, txnId, someKeys, homeKey, callback);
+ Invalidate.invalidate(node, txnId, someKeys, homeKey, callback);
break;
}
case ExecutionOrder:
diff --git a/accord-core/src/main/java/accord/coordinate/Persist.java b/accord-core/src/main/java/accord/coordinate/Persist.java
index c8be9f5..c4c427e 100644
--- a/accord-core/src/main/java/accord/coordinate/Persist.java
+++ b/accord-core/src/main/java/accord/coordinate/Persist.java
@@ -40,6 +40,7 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
+import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.local.Status.Durability.Durable;
import static accord.local.Status.Durability.Universal;
@@ -90,7 +91,7 @@ public class Persist implements Callback<ApplyReply>
case Redundant:
case Applied:
persistedOn.add(from);
- if (tracker.success(from))
+ if (tracker.recordSuccess(from) == Success)
{
if (!isDone)
{
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java
index 6126e41..2c851d0 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -24,8 +24,10 @@ import java.util.function.BiConsumer;
import accord.api.Result;
import accord.api.RoutingKey;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
+import accord.coordinate.tracking.AbstractTracker.ShardOutcomes;
import accord.coordinate.tracking.QuorumTracker;
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.coordinate.tracking.RequestStatus;
import accord.messages.Callback;
import accord.primitives.Route;
import accord.topology.Shard;
@@ -40,7 +42,9 @@ import accord.primitives.TxnId;
import accord.messages.Accept;
import accord.messages.Accept.AcceptOk;
import accord.messages.Accept.AcceptReply;
-import com.google.common.base.Preconditions;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
+import static accord.coordinate.tracking.RequestStatus.Failed;
class Propose implements Callback<AcceptReply>
{
@@ -102,7 +106,7 @@ class Propose implements Callback<AcceptReply>
case Success:
AcceptOk ok = (AcceptOk) reply;
acceptOks.add(ok);
- if (acceptTracker.success(from))
+ if (acceptTracker.recordSuccess(from) == RequestStatus.Success)
onAccepted();
}
}
@@ -110,7 +114,7 @@ class Propose implements Callback<AcceptReply>
@Override
public void onFailure(Id from, Throwable failure)
{
- if (acceptTracker.failure(from))
+ if (acceptTracker.recordFailure(from) == Failed)
{
isDone = true;
callback.accept(null, new Timeout(txnId, route.homeKey));
@@ -174,7 +178,7 @@ class Propose implements Callback<AcceptReply>
return;
}
- if (acceptTracker.success(from))
+ if (acceptTracker.onSuccess(from) == ShardOutcomes.Success)
{
isDone = true;
callback.accept(null, null);
@@ -184,7 +188,7 @@ class Propose implements Callback<AcceptReply>
@Override
public void onFailure(Id from, Throwable failure)
{
- if (acceptTracker.failure(from))
+ if (acceptTracker.onFailure(from) == Fail)
{
isDone = true;
callback.accept(null, new Timeout(txnId, null));
diff --git a/accord-core/src/main/java/accord/coordinate/QuorumReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/QuorumReadCoordinator.java
deleted file mode 100644
index f084753..0000000
--- a/accord-core/src/main/java/accord/coordinate/QuorumReadCoordinator.java
+++ /dev/null
@@ -1,213 +0,0 @@
-package accord.coordinate;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-
-import accord.coordinate.tracking.ReadTracker;
-import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.Callback;
-import accord.primitives.TxnId;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-
-// TODO: this class needs cleaning up
-// we should also escalate the number of nodes we contact on each failure to succeed
-abstract class QuorumReadCoordinator<Reply> implements Callback<Reply>
-{
- protected enum Action { Abort, Reject, AcceptQuorum, Accept }
-
- protected enum Done { Exhausted, ReachedQuorum, Success }
-
- static class QuorumReadShardTracker extends ReadShardTracker
- {
- private int responseCount;
- public QuorumReadShardTracker(Shard shard)
- {
- super(shard);
- }
-
- public boolean recordReadResponse(Id node)
- {
- Preconditions.checkArgument(shard.nodes.contains(node));
- ++responseCount;
- --inflight;
- return true;
- }
-
- @Override
- public boolean recordReadSuccess(Id node)
- {
- if (!super.recordReadSuccess(node))
- return false;
-
- ++responseCount;
- return true;
- }
-
- public boolean hasFailed()
- {
- return super.hasFailed() && !hasReachedQuorum();
- }
-
- public boolean hasReachedQuorum()
- {
- return responseCount >= shard.slowPathQuorumSize;
- }
- }
-
- static class Tracker extends ReadTracker<QuorumReadShardTracker>
- {
- public Tracker(Topologies topologies)
- {
- super(topologies, QuorumReadShardTracker[]::new, QuorumReadShardTracker::new);
- }
-
- void recordReadResponse(Id node)
- {
- recordResponse(node);
- forEachTrackerForNode(node, QuorumReadShardTracker::recordReadResponse);
- }
-
- public boolean hasReachedQuorum()
- {
- return all(QuorumReadShardTracker::hasReachedQuorum);
- }
- }
-
- final Node node;
- final TxnId txnId;
- protected final Tracker tracker;
- private boolean isDone;
- private Throwable failure;
- Map<Id, Reply> debug = new HashMap<>();
-
- QuorumReadCoordinator(Node node, Topologies topologies, TxnId txnId)
- {
- this.node = node;
- this.txnId = txnId;
- this.tracker = new Tracker(topologies);
- }
-
- protected void start()
- {
- contact(tracker.computeMinimalReadSetAndMarkInflight());
- }
-
- protected abstract void contact(Set<Id> nodes);
- protected abstract Action process(Id from, Reply reply);
-
- protected abstract void onDone(Done done, Throwable failure);
-
- @Override
- public void onSuccess(Id from, Reply reply)
- {
- debug.put(from, reply);
- if (isDone)
- return;
-
- switch (process(from, reply))
- {
- default: throw new IllegalStateException();
- case Abort:
- isDone = true;
- break;
-
- case Reject:
- tracker.recordReadFailure(from);
- tryOneMore();
- break;
-
- case AcceptQuorum:
- tracker.recordReadResponse(from);
- if (!finishIfQuorum())
- tryOneMore();
- break;
-
- case Accept:
- tracker.recordReadSuccess(from);
- if (!tracker.hasCompletedRead())
- {
- if (!finishIfQuorum())
- finishIfFailure();
- break;
- }
-
- isDone = true;
- onDone(Done.Success, null);
- }
- }
-
- @Override
- public void onSlowResponse(Id from)
- {
- tracker.recordSlowRead(from);
- Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
- if (readFrom != null)
- contact(readFrom);
- }
-
- @Override
- public void onFailure(Id from, Throwable failure)
- {
- debug.put(from, null);
- if (isDone)
- return;
-
- if (this.failure == null) this.failure = failure;
- else this.failure.addSuppressed(failure);
-
- if (tracker.recordReadFailure(from))
- tryOneMore();
- }
-
- private void tryOneMore()
- {
- Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
- if (readFrom != null) contact(readFrom);
- else finishIfFailure();
- }
-
- private boolean finishIfQuorum()
- {
- if (!tracker.hasReachedQuorum())
- return false;
-
- isDone = true;
- onDone(Done.ReachedQuorum, null);
- return true;
- }
-
- private void finishIfFailure()
- {
- if (tracker.hasFailed())
- {
- isDone = true;
- onDone(null, failure);
- }
- else if (!tracker.hasInFlight())
- {
- isDone = true;
- onDone(Done.Exhausted, null);
- }
- }
-
- @Override
- public void onCallbackFailure(Id from, Throwable failure)
- {
- isDone = true;
- if (this.failure != null)
- failure.addSuppressed(this.failure);
- onDone(null, failure);
- }
-
- protected Set<Id> nodes()
- {
- return tracker.nodes();
- }
-
-}
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
new file mode 100644
index 0000000..4154b21
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -0,0 +1,187 @@
+package accord.coordinate;
+
+import accord.coordinate.tracking.RequestStatus;
+import accord.messages.Callback;
+import com.google.common.base.Preconditions;
+
+import accord.coordinate.tracking.ReadTracker;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.primitives.TxnId;
+import accord.topology.Topologies;
+
+import java.util.*;
+
+import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
+
+abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends ReadTracker implements Callback<Reply>
+{
+ protected enum Action
+ {
+ /**
+ * Immediately fail the coordination
+ */
+ Abort,
+
+ /**
+ * This response is unsuitable for the purposes of this coordination, whether individually or as a quorum.
+ */
+ Reject,
+
+ /**
+ * An intermediate response has been received that suggests a full response may be delayed; another replica
+ * should be contacted for its response. This is currently used when a read is lacking necessary information
+ * (such as a commit) in order to serve the response, and so additional information is sent by the coordinator.
+ */
+ TryAlternative,
+
+ /**
+ * This response is unsuitable by itself, but if a quorum of such responses is received for the shard
+ * we will Success.Quorum
+ */
+ ApproveIfQuorum,
+
+ /**
+ * This response is suitable by itself; if we receive such a response from each shard we will complete
+ * successfully with Success.Success
+ */
+ Approve
+ }
+ protected enum Success { Quorum, Success }
+
+ final Node node;
+ final TxnId txnId;
+ private boolean isDone;
+ private Throwable failure;
+ Map<Id, Object> debug = new HashMap<>();
+
+ ReadCoordinator(Node node, Topologies topologies, TxnId txnId)
+ {
+ super(topologies);
+ this.node = node;
+ this.txnId = txnId;
+ }
+
+ protected abstract Action process(Id from, Reply reply);
+ protected abstract void onDone(Success success, Throwable failure);
+ protected abstract void contact(Id to);
+
+ @Override
+ public void onSuccess(Id from, Reply reply)
+ {
+ if (debug != null) debug.put(from, reply);
+ if (isDone)
+ return;
+
+ switch (process(from, reply))
+ {
+ default: throw new IllegalStateException();
+ case Abort:
+ isDone = true;
+ break;
+
+ case TryAlternative:
+ Preconditions.checkState(!reply.isFinal());
+ onSlowResponse(from);
+ break;
+
+ case Reject:
+ handle(recordReadFailure(from));
+ break;
+
+ case ApproveIfQuorum:
+ handle(recordQuorumReadSuccess(from));
+ break;
+
+ case Approve:
+ handle(recordReadSuccess(from));
+ }
+ }
+
+ public void onSlowResponse(Id from)
+ {
+ handle(recordSlowResponse(from));
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable failure)
+ {
+ if (debug != null)
+ debug.put(from, null);
+
+ if (isDone)
+ return;
+
+ if (this.failure == null) this.failure = failure;
+ else this.failure.addSuppressed(failure);
+
+ handle(recordReadFailure(from));
+ }
+
+ @Override
+ public void onCallbackFailure(Id from, Throwable failure)
+ {
+ if (isDone)
+ {
+ node.agent().onUncaughtException(failure);
+ return;
+ }
+
+ if (this.failure != null)
+ failure.addSuppressed(this.failure);
+ this.failure = failure;
+ finishOnFailure();
+ }
+
+ protected void finishOnFailure()
+ {
+ Preconditions.checkState(!isDone);
+ isDone = true;
+ if (failure == null)
+ failure = new Exhausted(txnId, null);
+ onDone(null, failure);
+ }
+
+ private void handle(RequestStatus result)
+ {
+ switch (result)
+ {
+ default: throw new AssertionError();
+ case NoChange:
+ break;
+ case Success:
+ Preconditions.checkState(!isDone);
+ isDone = true;
+ onDone(waitingOnData == 0 ? Success.Success : Success.Quorum, null);
+ break;
+ case Failed:
+ finishOnFailure();
+ }
+ }
+
+ protected void start(Set<Id> to)
+ {
+ to.forEach(this::contact);
+ }
+
+ public void start()
+ {
+ Set<Id> contact = newHashSetWithExpectedSize(maxShardsPerEpoch());
+ if (trySendMore(Set::add, contact) != RequestStatus.NoChange)
+ throw new IllegalStateException();
+ start(contact);
+ }
+
+ @Override
+ protected RequestStatus trySendMore()
+ {
+ // TODO: due to potential re-entrancy into this method, if the node we are contacting is unavailable
+ // so onFailure is invoked immediately, for the moment we copy nodes to an intermediate list.
+ // would be better to prevent reentrancy either by detecting this inside trySendMore or else queueing
+ // callbacks externally, so two may not be in-flight at once
+ List<Id> contact = new ArrayList<>(1);
+ RequestStatus status = trySendMore(List::add, contact);
+ contact.forEach(this::contact);
+ return status;
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java
index 37aab37..59b8339 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -25,14 +25,12 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
+import accord.coordinate.tracking.*;
import accord.primitives.*;
import accord.messages.Commit;
import com.google.common.base.Preconditions;
import accord.api.Result;
-import accord.coordinate.tracking.FastPathTracker;
-import accord.coordinate.tracking.QuorumTracker;
-import accord.topology.Shard;
import accord.topology.Topologies;
import accord.messages.Callback;
import accord.local.Node;
@@ -49,8 +47,9 @@ import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.Promise;
import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
+import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.messages.BeginRecovery.RecoverOk.maxAcceptedOrLater;
-import static accord.messages.Commit.Invalidate.commitInvalidate;
// TODO: rename to Recover (verb); rename Recover message to not clash
public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throwable>
@@ -65,7 +64,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
AwaitCommit(Node node, TxnId txnId, RoutingKeys someKeys)
{
Topology topology = node.topology().globalForEpoch(txnId.epoch).forKeys(someKeys);
- this.tracker = new QuorumTracker(topology);
+ this.tracker = new QuorumTracker(new Topologies.Single(node.topology().sorter(), topology));
node.send(topology.nodes(), to -> new WaitOnCommit(to, topology, txnId, someKeys), this);
}
@@ -74,7 +73,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
{
if (isDone()) return;
- if (tracker.success(from))
+ if (tracker.recordSuccess(from) == Success)
trySuccess(null);
}
@@ -83,7 +82,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
{
if (isDone()) return;
- if (tracker.failure(from))
+ if (tracker.recordFailure(from) == Failed)
tryFailure(new Timeout(txnId, route.homeKey));
}
@@ -113,43 +112,17 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
return future;
}
- // TODO: not sure it makes sense to extend FastPathTracker, as intent here is a bit different
- static class ShardTracker extends FastPathTracker.FastPathShardTracker
- {
- int responsesFromElectorate;
- public ShardTracker(Shard shard)
- {
- super(shard);
- }
-
- @Override
- public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
- {
- if (!shard.fastPathElectorate.contains(node))
- return false;
-
- ++responsesFromElectorate;
- return withFastPathTimestamp;
- }
-
- @Override
- public boolean hasMetFastPathCriteria()
- {
- int fastPathRejections = responsesFromElectorate - fastPathAccepts;
- return fastPathRejections <= shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
- }
- }
-
- final Node node;
- final Ballot ballot;
- final TxnId txnId;
- final Txn txn;
- final Route route;
- final BiConsumer<Outcome, Throwable> callback;
+ private final Node node;
+ private final Ballot ballot;
+ private final TxnId txnId;
+ private final Txn txn;
+ private final Route route;
+ private final BiConsumer<Outcome, Throwable> callback;
private boolean isDone;
- final List<RecoverOk> recoverOks = new ArrayList<>();
- final FastPathTracker<ShardTracker> tracker;
+ private final List<RecoverOk> recoverOks = new ArrayList<>();
+ private final RecoveryTracker tracker;
+ private boolean isBallotPromised;
private Recover(Node node, Ballot ballot, TxnId txnId, Txn txn, Route route, BiConsumer<Outcome, Throwable> callback, Topologies topologies)
{
@@ -160,7 +133,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
this.route = route;
this.callback = callback;
assert topologies.oldestEpoch() == topologies.currentEpoch() && topologies.currentEpoch() == txnId.epoch;
- this.tracker = new FastPathTracker<>(topologies, ShardTracker[]::new, ShardTracker::new);
+ this.tracker = new RecoveryTracker(topologies);
}
@Override
@@ -203,7 +176,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
@Override
public synchronized void onSuccess(Id from, RecoverReply reply)
{
- if (isDone || tracker.hasReachedQuorum())
+ if (isDone || isBallotPromised)
return;
if (!reply.isOk())
@@ -215,14 +188,15 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
RecoverOk ok = (RecoverOk) reply;
recoverOks.add(ok);
boolean fastPath = ok.executeAt.compareTo(txnId) == 0;
- tracker.recordSuccess(from, fastPath);
-
- if (tracker.hasReachedQuorum())
+ if (tracker.recordSuccess(from, fastPath) == Success)
recover();
}
private void recover()
{
+ Preconditions.checkState(!isBallotPromised);
+ isBallotPromised = true;
+
// first look for the most recent Accept; if present, go straight to proposing it again
RecoverOk acceptOrCommit = maxAcceptedOrLater(recoverOks);
if (acceptOrCommit != null)
@@ -280,21 +254,12 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
}
}
- if (!tracker.hasMetFastPathCriteria())
+ if (tracker.rejectsFastPath() || recoverOks.stream().anyMatch(ok -> ok.rejectsFastPath))
{
invalidate();
return;
}
- for (RecoverOk ok : recoverOks)
- {
- if (ok.rejectsFastPath)
- {
- invalidate();
- return;
- }
- }
-
// should all be PreAccept
Deps deps = mergeDeps();
Deps earlierAcceptedNoWitness = Deps.merge(recoverOks, ok -> ok.earlierAcceptedNoWitness);
@@ -356,7 +321,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw
if (isDone)
return;
- if (tracker.failure(from))
+ if (tracker.recordFailure(from) == Failed)
accept(null, new Timeout(txnId, route.homeKey));
}
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java b/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
index 28672e3..fe735e3 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithHomeKey.java
@@ -4,7 +4,6 @@ import java.util.function.BiConsumer;
import accord.api.RoutingKey;
import accord.local.Node;
-import accord.local.Node.Id;
import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.IncludeInfo;
import accord.primitives.Route;
@@ -41,13 +40,13 @@ public class RecoverWithHomeKey extends CheckShards implements BiConsumer<Object
}
@Override
- protected boolean isSufficient(Id from, CheckStatusOk ok)
+ protected boolean isSufficient(CheckStatusOk ok)
{
return ok.route != null;
}
@Override
- protected void onDone(Done done, Throwable fail)
+ protected void onDone(Success success, Throwable fail)
{
if (fail != null)
{
@@ -55,19 +54,16 @@ public class RecoverWithHomeKey extends CheckShards implements BiConsumer<Object
}
else if (merged == null || !(merged.route instanceof Route))
{
- switch (done)
+ switch (success)
{
default: throw new IllegalStateException();
- case Exhausted:
- callback.accept(null, new Timeout(txnId, homeKey));
- return;
case Success:
// home shard must know full Route. Our success criteria is that the response contained a route,
// so reaching here without a response or one without a full Route is a bug.
callback.accept(null, new IllegalStateException());
return;
- case ReachedQuorum:
- Invalidate.invalidate(node, txnId, contactKeys, homeKey, callback);
+ case Quorum:
+ Invalidate.invalidateIfNotAccepted(node, txnId, contactKeys, homeKey, callback);
}
}
else
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 911fb8a..b60bbaa 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -1,6 +1,5 @@
package accord.coordinate;
-import java.util.Set;
import java.util.function.BiConsumer;
import accord.local.Status.Known;
@@ -59,13 +58,26 @@ public class RecoverWithRoute extends CheckShards
}
@Override
- protected void contact(Set<Id> nodes)
+ public void contact(Id to)
{
- node.send(nodes, to -> new CheckStatus(to, tracker.topologies(), txnId, route, IncludeInfo.All), this);
+ node.send(to, new CheckStatus(to, topologies(), txnId, route, IncludeInfo.All), this);
}
@Override
protected boolean isSufficient(Id from, CheckStatusOk ok)
+ {
+ KeyRanges rangesForNode = topologies().forEpoch(txnId.epoch).rangesForNode(from);
+ PartialRoute route = this.route.slice(rangesForNode);
+ return isSufficient(route, ok);
+ }
+
+ @Override
+ protected boolean isSufficient(CheckStatusOk ok)
+ {
+ return isSufficient(route, merged);
+ }
+
+ protected boolean isSufficient(AbstractRoute route, CheckStatusOk ok)
{
CheckStatusOkFull full = (CheckStatusOkFull)ok;
Known sufficientTo = full.sufficientFor(route);
@@ -75,14 +87,12 @@ public class RecoverWithRoute extends CheckShards
if (sufficientTo == Known.Invalidation)
return true;
- KeyRanges rangesForNode = tracker.topologies().forEpoch(txnId.epoch).rangesForNode(from);
- PartialRoute route = this.route.slice(rangesForNode);
Preconditions.checkState(full.partialTxn.covers(route));
return true;
}
@Override
- protected void onDone(Done done, Throwable failure)
+ protected void onDone(Success success, Throwable failure)
{
if (failure != null)
{
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
deleted file mode 100644
index ab8d73b..0000000
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.coordinate.tracking;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-
-import accord.local.Node;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-
-public class AbstractQuorumTracker<T extends AbstractQuorumTracker.QuorumShardTracker> extends AbstractResponseTracker<T>
-{
- public static class QuorumShardTracker extends ShardTracker
- {
- private final Set<Node.Id> inflight;
- private int success = 0;
- private int failures = 0;
- public QuorumShardTracker(Shard shard)
- {
- super(shard);
- this.inflight = new HashSet<>(shard.nodes);
- }
-
- protected boolean oneSuccess(Node.Id id)
- {
- if (!inflight.remove(id))
- return false;
- success++;
- return true;
- }
-
- // return true iff hasReachedQuorum()
- public boolean success(Node.Id id)
- {
- oneSuccess(id);
- return hasReachedQuorum();
- }
-
- // return true iff hasFailed()
- public boolean failure(Node.Id id)
- {
- if (!inflight.remove(id))
- return false;
- failures++;
- return hasFailed();
- }
-
- public boolean hasFailed()
- {
- return failures > shard.maxFailures;
- }
-
- public boolean hasFailures()
- {
- return failures > 0;
- }
-
- public boolean hasReachedQuorum()
- {
- return success >= shard.slowPathQuorumSize;
- }
-
- public boolean hasInFlight()
- {
- return !inflight.isEmpty();
- }
- }
-
- public AbstractQuorumTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
- {
- super(topologies, arrayFactory, trackerFactory);
- }
-
- // return true iff hasFailed()
- public boolean failure(Node.Id node)
- {
- return anyForNode(node, QuorumShardTracker::failure);
- }
-
- public boolean hasReachedQuorum()
- {
- return all(QuorumShardTracker::hasReachedQuorum);
- }
-
- public boolean hasFailed()
- {
- return any(QuorumShardTracker::hasFailed);
- }
-
- public boolean hasFailures()
- {
- return any(QuorumShardTracker::hasFailed);
- }
-
- public boolean hasInFlight()
- {
- return any(QuorumShardTracker::hasInFlight);
- }
-
-}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
deleted file mode 100644
index 9793c87..0000000
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.coordinate.tracking;
-
-import accord.local.Node;
-import accord.topology.Shard;
-import accord.topology.Topologies;
-import accord.utils.IndexedIntFunction;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import java.util.*;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.BiPredicate;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-import java.util.function.Predicate;
-
-public abstract class AbstractResponseTracker<T extends AbstractResponseTracker.ShardTracker>
-{
- private static final int[] SINGLETON_OFFSETS = new int[0];
- private final Topologies topologies;
- private final T[] trackers;
- private final int[] offsets;
-
- public static class ShardTracker
- {
- public final Shard shard;
-
- public ShardTracker(Shard shard)
- {
- this.shard = shard;
- }
- }
-
- public AbstractResponseTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
- {
- Preconditions.checkArgument(topologies.totalShards() > 0);
- this.topologies = topologies;
- this.trackers = arrayFactory.apply(topologies.totalShards());
- if (topologies.size() > 1)
- {
- this.offsets = new int[topologies.size() - 1];
- int offset = topologies.get(0).size();
- for (int i=1, mi=topologies.size(); i<mi; i++)
- {
- this.offsets[i - 1] = offset;
- offset += topologies.get(i).size();
- }
- }
- else
- {
- this.offsets = SINGLETON_OFFSETS;
- }
-
- this.topologies.forEach((i, topology) -> {
- int offset = topologyOffset(i);
- topology.forEach((j, shard) -> trackers[offset + j] = trackerFactory.apply(shard));
- });
- }
-
- protected int topologyOffset(int topologyIdx)
- {
- return topologyIdx > 0 ? offsets[topologyIdx - 1] : 0;
- }
-
- protected int topologyLength(int topologyIdx)
- {
- if (topologyIdx > offsets.length)
- throw new IndexOutOfBoundsException();
-
- int endIdx = topologyIdx == offsets.length ? trackers.length : topologyOffset(topologyIdx + 1);
- return endIdx - topologyOffset(topologyIdx);
- }
-
- public Topologies topologies()
- {
- return topologies;
- }
-
- protected void forEachTrackerForNode(Node.Id node, BiConsumer<? super T, Node.Id> consumer)
- {
- this.topologies.forEach((i, topology) -> {
- int offset = topologyOffset(i);
- topology.forEachOn(node, (j, shard) -> consumer.accept(trackers[offset + j], node));
- });
- }
-
- // does not abort early, to ensure all trackers are updated (introduce Lazy variant if necessary)
- protected boolean anyForNode(Node.Id node, BiPredicate<? super T, Node.Id> consumer)
- {
- return matchingTrackersForNode(node, consumer, Integer.MAX_VALUE) > 0;
- }
-
- // does not abort early, to ensure all trackers are updated (introduce Lazy variant if necessary)
- protected boolean allForNode(Node.Id node, BiPredicate<T, Node.Id> consumer)
- {
- return nonMatchingTrackersForNode(node, consumer, Integer.MAX_VALUE) == 0;
- }
-
- protected int nonMatchingTrackersForNode(Node.Id node, BiPredicate<? super T, Node.Id> consumer, int limit)
- {
- return foldlForNode(node, (shardIndex, shard, v) -> consumer.test(trackers[shardIndex], node) ? v : v + 1, 0, limit);
- }
-
- protected int matchingTrackersForNode(Node.Id node, BiPredicate<? super T, Node.Id> consumer, int limit)
- {
- return foldlForNode(node, (shardIndex, shard, v) -> consumer.test(trackers[shardIndex], node) ? v + 1 : v, 0, limit);
- }
-
- protected int matchingTrackersForNode(Node.Id node, Predicate<? super T> consumer)
- {
- return foldlForNode(node, (shardIndex, shard, v) -> consumer.test(trackers[shardIndex]) ? v + 1 : v, 0, Integer.MAX_VALUE);
- }
-
- protected int foldlForNode(Node.Id node, IndexedIntFunction<Shard> function, int initialValue, int terminalValue)
- {
- for (int i = 0 ; i < topologies.size() && initialValue != terminalValue ; ++i)
- {
- initialValue = topologies.get(i).foldlIntOn(node, function, topologyOffset(i), initialValue, terminalValue);
- }
- return initialValue;
- }
-
- protected boolean all(Predicate<T> predicate)
- {
- for (T tracker : trackers)
- if (!predicate.test(tracker))
- return false;
- return true;
- }
-
- protected boolean any(Predicate<T> predicate)
- {
- for (T tracker : trackers)
- if (predicate.test(tracker))
- return true;
- return false;
- }
-
- protected <V> V foldl(BiFunction<T, V, V> function, V accumulator)
- {
- for (T tracker : trackers)
- accumulator = function.apply(tracker, accumulator);
- return accumulator;
- }
-
- public Set<Node.Id> nodes()
- {
- return topologies.nodes();
- }
-
- @VisibleForTesting
- public T unsafeGet(int topologyIdx, int shardIdx)
- {
- if (shardIdx >= topologyLength(topologyIdx))
- throw new IndexOutOfBoundsException();
- return trackers[topologyOffset(topologyIdx) + shardIdx];
- }
-
- int trackerCount()
- {
- return trackers.length;
- }
-
- public T unsafeGet(int i)
- {
- Preconditions.checkArgument(offsets.length == 0);
- return unsafeGet(0, i);
- }
-}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
new file mode 100644
index 0000000..7717b9b
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.local.Node.Id;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+
+public abstract class AbstractTracker<ST extends ShardTracker, P>
+{
+ /**
+ * Represents the logical result of a ShardFunction applied to a ShardTracker,
+ * encapsulating also any modification it should make to the AbstractTracker
+ * containing the shard.
+ */
+ public interface ShardOutcome<T extends AbstractTracker<?, ?>>
+ {
+ ShardOutcomes apply(T tracker);
+ }
+
+ public enum ShardOutcomes implements ShardOutcome<AbstractTracker<?, ?>>
+ {
+ Fail, SendMore, Success, NoChange;
+
+ static ShardOutcomes min(ShardOutcomes a, ShardOutcomes b)
+ {
+ return a.compareTo(b) <= 0 ? a : b;
+ }
+
+ @Override
+ public ShardOutcomes apply(AbstractTracker<?, ?> tracker)
+ {
+ if (this == Success)
+ --tracker.waitingOnShards;
+ return this;
+ }
+ }
+
+ final Topologies topologies;
+ protected final ST[] trackers;
+ protected final int maxShardsPerEpoch;
+ protected int waitingOnShards;
+
+ AbstractTracker(Topologies topologies, IntFunction<ST[]> arrayFactory, Function<Shard, ST> trackerFactory)
+ {
+ Preconditions.checkArgument(topologies.totalShards() > 0);
+ int topologyCount = topologies.size();
+ int maxShardsPerEpoch = topologies.get(0).size();
+ int shardCount = maxShardsPerEpoch;
+ for (int i = 1 ; i < topologyCount ; ++i)
+ {
+ int size = topologies.get(i).size();
+ maxShardsPerEpoch = Math.max(maxShardsPerEpoch, size);
+ shardCount += size;
+ }
+ this.topologies = topologies;
+ this.trackers = arrayFactory.apply(topologyCount * maxShardsPerEpoch);
+ for (int i = 0 ; i < topologyCount ; ++i)
+ {
+ Topology topology = topologies.get(i);
+ int size = topology.size();
+ for (int j = 0; j < size; ++j)
+ trackers[i * maxShardsPerEpoch + j] = trackerFactory.apply(topology.get(j));
+ }
+ this.maxShardsPerEpoch = maxShardsPerEpoch;
+ this.waitingOnShards = shardCount;
+ }
+
+ protected int topologyOffset(int topologyIdx)
+ {
+ return topologyIdx * maxShardsPerEpoch();
+ }
+
+ public Topologies topologies()
+ {
+ return topologies;
+ }
+
+ protected RequestStatus trySendMore() { throw new UnsupportedOperationException(); }
+
+ <T extends AbstractTracker<ST, P>>
+ RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param)
+ {
+ return recordResponse(self, node, function, param, topologies.size());
+ }
+
+ <T extends AbstractTracker<ST, P>>
+ RequestStatus recordResponse(T self, Id node, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param, int topologyLimit)
+ {
+ Preconditions.checkState(self == this); // we just accept self as parameter for type safety
+ ShardOutcomes minShardStatus = ShardOutcomes.NoChange;
+ int maxShards = maxShardsPerEpoch();
+ for (int i = 0; i < topologyLimit && minShardStatus != ShardOutcomes.Fail; ++i)
+ {
+ minShardStatus = topologies.get(i).mapReduceOn(node, i * maxShards, AbstractTracker::apply, self, function, param, ShardOutcomes::min, minShardStatus);
+ }
+
+ switch (minShardStatus)
+ {
+ default: throw new AssertionError();
+ case SendMore:
+ return trySendMore();
+ case Success:
+ if (waitingOnShards == 0)
+ return RequestStatus.Success;
+ case NoChange:
+ return RequestStatus.NoChange;
+ case Fail:
+ return RequestStatus.Failed;
+ }
+ }
+
+ static <ST extends ShardTracker, P, T extends AbstractTracker<ST, P>>
+ ShardOutcomes apply(int trackerIndex, T tracker, BiFunction<? super ST, P, ? extends ShardOutcome<? super T>> function, P param)
+ {
+ return function.apply(tracker.trackers[trackerIndex], param).apply(tracker);
+ }
+
+ public boolean any(Predicate<ST> test)
+ {
+ for (ST tracker : trackers)
+ {
+ if (test.test(tracker))
+ return true;
+ }
+ return false;
+ }
+
+ public boolean all(Predicate<ST> test)
+ {
+ for (ST tracker : trackers)
+ {
+ if (!test.test(tracker))
+ return false;
+ }
+ return true;
+ }
+
+ public boolean hasFailed()
+ {
+ return any(ShardTracker::hasFailed);
+ }
+
+ public boolean hasInFlight()
+ {
+ return any(ShardTracker::hasInFlight);
+ }
+
+ public boolean hasReachedQuorum()
+ {
+ return all(ShardTracker::hasReachedQuorum);
+ }
+
+ public Set<Id> nodes()
+ {
+ return topologies.nodes();
+ }
+
+ @VisibleForTesting
+ public ST unsafeGet(int topologyIdx, int shardIdx)
+ {
+ if (shardIdx >= maxShardsPerEpoch())
+ throw new IndexOutOfBoundsException();
+ return trackers[topologyOffset(topologyIdx) + shardIdx];
+ }
+
+ protected int maxShardsPerEpoch()
+ {
+ return maxShardsPerEpoch;
+ }
+
+ public ST unsafeGet(int i)
+ {
+ Preconditions.checkArgument(topologies.size() == 1);
+ return unsafeGet(0, i);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
index 568ddf7..389a285 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -18,47 +18,155 @@
package accord.coordinate.tracking;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-
import accord.local.Node;
import accord.topology.Shard;
import accord.topology.Topologies;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.function.BiFunction;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
-public class FastPathTracker<T extends FastPathTracker.FastPathShardTracker> extends AbstractQuorumTracker<T>
+public class FastPathTracker extends AbstractTracker<FastPathTracker.FastPathShardTracker, Node.Id>
{
- public abstract static class FastPathShardTracker extends QuorumTracker.QuorumShardTracker
+ private static final ShardOutcome<FastPathTracker> NewFastPathSuccess = tracker -> {
+ --tracker.waitingOnShards;
+ --tracker.waitingOnFastPathSuccess;
+ return ShardOutcomes.Success;
+ };
+
+ public static class FastPathShardTracker extends ShardTracker
{
- protected int fastPathAccepts = 0;
+ protected int fastPathAccepts, accepts;
+ protected int fastPathFailures, failures;
public FastPathShardTracker(Shard shard)
{
super(shard);
}
- public abstract boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp);
+ // return NewQuorumSuccess ONLY once fast path is rejected
+ public ShardOutcome<? super FastPathTracker> onQuorumSuccess(Node.Id node)
+ {
+ ++accepts;
+ if (!shard.fastPathElectorate.contains(node))
+ return quorumIfRejectsFastPath();
+
+ ++fastPathFailures;
+ if (isNewFastPathReject() && hasReachedQuorum())
+ return Success;
+
+ if (isNewSlowPathSuccess() && hasRejectedFastPath())
+ return Success;
+
+ return NoChange;
+ }
+
+ public ShardOutcome<? super FastPathTracker> onMaybeFastPathSuccess(Node.Id node)
+ {
+ ++accepts;
+ if (shard.fastPathElectorate.contains(node))
+ {
+ ++fastPathAccepts;
+ if (isNewFastPathSuccess())
+ return NewFastPathSuccess;
+ }
+ return quorumIfRejectsFastPath();
+ }
+
+ public ShardOutcome<? super FastPathTracker> onFailure(@Nonnull Node.Id from)
+ {
+ if (++failures > shard.maxFailures)
+ return Fail;
+
+ if (shard.fastPathElectorate.contains(from)) {
+ ++fastPathFailures;
+
+ if (isNewFastPathReject() && accepts >= shard.slowPathQuorumSize)
+ return Success;
+ }
+
+ return NoChange;
+ }
+
+ private ShardOutcome<? super FastPathTracker> quorumIfRejectsFastPath()
+ {
+ return isNewSlowPathSuccess() && hasRejectedFastPath() ? Success : NoChange;
+ }
+
+ private boolean isNewSlowPathSuccess()
+ {
+ return accepts == shard.slowPathQuorumSize;
+ }
- public void onSuccess(Node.Id node, boolean withFastPathTimestamp)
+ private boolean isNewFastPathReject()
{
- if (oneSuccess(node) && includeInFastPath(node, withFastPathTimestamp))
- fastPathAccepts++;
+ return fastPathFailures == 1 + shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
}
- public abstract boolean hasMetFastPathCriteria();
+ private boolean isNewFastPathSuccess()
+ {
+ return fastPathAccepts == shard.fastPathQuorumSize;
+ }
+
+ @VisibleForTesting
+ public boolean hasMetFastPathCriteria()
+ {
+ return fastPathAccepts >= shard.fastPathQuorumSize;
+ }
+
+ @VisibleForTesting
+ public boolean hasRejectedFastPath()
+ {
+ return shard.rejectsFastPath(fastPathFailures);
+ }
+
+ boolean hasInFlight()
+ {
+ return accepts + failures < shard.rf();
+ }
+
+ boolean hasReachedQuorum()
+ {
+ return accepts >= shard.slowPathQuorumSize;
+ }
+
+ boolean hasFailed()
+ {
+ return failures > shard.maxFailures;
+ }
+ }
+
+ int waitingOnFastPathSuccess; // if we reach zero, we have determined the fast path outcome of every shard
+ public FastPathTracker(Topologies topologies)
+ {
+ super(topologies, FastPathShardTracker[]::new, FastPathShardTracker::new);
+ this.waitingOnFastPathSuccess = super.waitingOnShards;
+ }
+
+ public RequestStatus recordSuccess(Node.Id from, boolean withFastPathTimestamp)
+ {
+ if (withFastPathTimestamp)
+ return recordResponse(from, FastPathShardTracker::onMaybeFastPathSuccess);
+
+ return recordResponse(from, FastPathShardTracker::onQuorumSuccess);
}
- public FastPathTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+ public RequestStatus recordFailure(Node.Id from)
{
- super(topologies, arrayFactory, trackerFactory);
+ return recordResponse(from, FastPathShardTracker::onFailure);
}
- public void recordSuccess(Node.Id node, boolean withFastPathTimestamp)
+ protected RequestStatus recordResponse(Node.Id node, BiFunction<? super FastPathShardTracker, Node.Id, ? extends ShardOutcome<? super FastPathTracker>> function)
{
- forEachTrackerForNode(node, (tracker, n) -> tracker.onSuccess(n, withFastPathTimestamp));
+ return recordResponse(this, node, function, node);
}
- public boolean hasMetFastPathCriteria()
+ public boolean hasFastPathAccepted()
{
- return all(FastPathShardTracker::hasMetFastPathCriteria);
+ return waitingOnFastPathSuccess == 0;
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
index 428c340..ba06181 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -18,27 +18,74 @@
package accord.coordinate.tracking;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
import accord.local.Node;
+import accord.topology.Shard;
import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class QuorumTracker extends AbstractTracker<QuorumTracker.QuorumShardTracker, Object>
{
+ public static class QuorumShardTracker extends ShardTracker
+ {
+ protected int successes;
+ protected int failures;
+
+ public QuorumShardTracker(Shard shard)
+ {
+ super(shard);
+ }
+
+ public ShardOutcomes onSuccess(Object ignore)
+ {
+ return ++successes == shard.slowPathQuorumSize ? Success : NoChange;
+ }
+
+ // return true iff hasFailed()
+ public ShardOutcomes onFailure(Object ignore)
+ {
+ return ++failures > shard.maxFailures ? Fail : NoChange;
+ }
+
+ public boolean hasReachedQuorum()
+ {
+ return successes >= shard.slowPathQuorumSize;
+ }
+
+ boolean hasInFlight()
+ {
+ return successes + failures < shard.rf();
+ }
+
+ boolean hasFailures()
+ {
+ return failures > 0;
+ }
+
+ boolean hasFailed()
+ {
+ return failures > shard.maxFailures;
+ }
+ }
+
public QuorumTracker(Topologies topologies)
{
super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
}
- public QuorumTracker(Topology topology)
+ public RequestStatus recordSuccess(Node.Id node)
+ {
+ return recordResponse(this, node, QuorumShardTracker::onSuccess, null);
+ }
+
+ // return true iff hasFailed()
+ public RequestStatus recordFailure(Node.Id node)
{
- super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
+ return recordResponse(this, node, QuorumShardTracker::onFailure, null);
}
- // return true iff hasReachedQuorum()
- public boolean success(Node.Id node)
+ public boolean hasFailures()
{
- return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
+ return any(QuorumShardTracker::hasFailures);
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
index 509da5e..316a7a0 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -18,232 +18,281 @@
package accord.coordinate.tracking;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.function.IntFunction;
+import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import accord.topology.Shard;
+import accord.topology.ShardSelection;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
import accord.local.Node.Id;
-import accord.topology.Shard;
import accord.topology.Topologies;
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
-public class ReadTracker<T extends ReadShardTracker> extends AbstractResponseTracker<T>
+public class ReadTracker extends AbstractTracker<ReadTracker.ReadShardTracker, Boolean>
{
+ private static final ShardOutcome<ReadTracker> DataSuccess = tracker -> {
+ --tracker.waitingOnShards;
+ --tracker.waitingOnData;
+ return ShardOutcomes.Success;
+ };
+
public static class ReadShardTracker extends ShardTracker
{
- private boolean hasData = false;
+ protected boolean hasData = false;
+ protected int quorum; // if !hasData, a slowPathQuorum will trigger success
protected int inflight;
- private int contacted;
- private int slow;
+ protected int contacted;
+ protected int slow;
public ReadShardTracker(Shard shard)
{
super(shard);
}
- public void recordInflightRead(Id node)
+ public ShardOutcome<? super ReadTracker> recordInFlightRead(boolean ignore)
{
++contacted;
++inflight;
+ return NoChange;
}
- // TODO: this is clunky, restructure the tracker to handle this more cleanly
- // record a node as contacted, even though it isn't
- public void recordContacted(Id node)
+ public ShardOutcome<? super ReadTracker> recordSlowResponse(boolean ignore)
{
- ++contacted;
+ Preconditions.checkState(!hasFailed());
+ ++slow;
+
+ if (shouldRead() && canRead())
+ return SendMore;
+
+ return NoChange;
}
- public void recordSlowRead(Id node)
+ /**
+ * Have received a requested data payload with desired contents
+ */
+ public ShardOutcome<? super ReadTracker> recordReadSuccess(boolean isSlow)
{
- ++slow;
+ Preconditions.checkState(inflight > 0);
+ boolean hadSucceeded = hasSucceeded();
+ --inflight;
+ if (isSlow) --slow;
+ hasData = true;
+ return hadSucceeded ? NoChange : DataSuccess;
}
- public void unrecordSlowRead(Id node)
+ public ShardOutcome<? super ReadTracker> recordQuorumReadSuccess(boolean isSlow)
{
- --slow;
+ Preconditions.checkState(inflight > 0);
+ boolean hadSucceeded = hasSucceeded();
+ --inflight;
+ ++quorum;
+ if (isSlow) --slow;
+
+ if (hadSucceeded)
+ return NoChange;
+
+ if (quorum == shard.slowPathQuorumSize)
+ return Success;
+
+ return ensureProgressOrFail();
}
- public boolean recordReadSuccess(Id node)
+ public ShardOutcomes recordReadFailure(boolean isSlow)
{
- Preconditions.checkArgument(shard.nodes.contains(node));
Preconditions.checkState(inflight > 0);
--inflight;
- hasData = true;
- return true;
+ if (isSlow) --slow;
+
+ return ensureProgressOrFail();
}
- public boolean shouldRead()
+ private ShardOutcomes ensureProgressOrFail()
{
- return !hasData && inflight == slow;
+ if (!shouldRead())
+ return NoChange;
+
+ if (canRead())
+ return SendMore;
+
+ return hasInFlight() ? NoChange : Fail;
}
- public boolean recordReadFailure(Id node)
+ public boolean hasReachedQuorum()
{
- Preconditions.checkState(inflight > 0);
- --inflight;
- return true;
+ return quorum >= shard.slowPathQuorumSize;
}
- public boolean hasCompletedRead()
+ public boolean hasSucceeded()
{
- return hasData;
+ return hasData() || hasReachedQuorum();
+ }
+
+ @Override
+ boolean hasInFlight()
+ {
+ return inflight > 0;
+ }
+
+ public boolean shouldRead()
+ {
+ return !hasSucceeded() && inflight == slow;
+ }
+
+ public boolean canRead()
+ {
+ return contacted < shard.rf();
}
public boolean hasFailed()
{
- return !hasData && inflight == 0 && contacted == shard.nodes.size();
+ return !hasData && inflight == 0 && contacted == shard.nodes.size() && !hasReachedQuorum();
+ }
+
+ public boolean hasData()
+ {
+ return hasData;
}
}
// TODO: abstract the candidate selection process so the implementation may prioritise based on distance/health etc
- private final List<Id> candidates;
- private final Set<Id> inflight;
+ // TODO: faster Id sets and arrays using primitive ints when unambiguous
+ final Set<Id> inflight;
+ final List<Id> candidates;
private Set<Id> slow;
+ protected int waitingOnData;
- public static ReadTracker<ReadShardTracker> create(Topologies topologies)
- {
- return new ReadTracker<>(topologies, ReadShardTracker[]::new, ReadShardTracker::new);
- }
-
- public ReadTracker(Topologies topologies, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+ public ReadTracker(Topologies topologies)
{
- super(topologies, arrayFactory, trackerFactory);
- candidates = new ArrayList<>(topologies.nodes());
- inflight = newHashSetWithExpectedSize(trackerCount());
+ super(topologies, ReadShardTracker[]::new, ReadShardTracker::new);
+ this.candidates = new ArrayList<>(topologies.nodes()); // TODO: copyOfNodesAsList to avoid unnecessary copies
+ this.inflight = newHashSetWithExpectedSize(maxShardsPerEpoch());
+ this.waitingOnData = waitingOnShards;
}
@VisibleForTesting
- void recordInflightRead(Id node)
+ protected void recordInFlightRead(Id node)
{
if (!inflight.add(node))
throw new IllegalStateException(node + " already in flight");
- forEachTrackerForNode(node, ReadShardTracker::recordInflightRead);
- }
-
- @VisibleForTesting
- private void recordContacted(Id node)
- {
- forEachTrackerForNode(node, ReadShardTracker::recordContacted);
- }
-
- public void recordSlowRead(Id node)
- {
- if (slow == null)
- slow = newHashSetWithExpectedSize(trackerCount());
-
- if (slow.add(node))
- {
- forEachTrackerForNode(node, ReadShardTracker::recordSlowRead);
- }
+ recordResponse(this, node, ReadShardTracker::recordInFlightRead, false);
}
- protected void recordResponse(Id node)
+ private boolean receiveResponseIsSlow(Id node)
{
if (!inflight.remove(node))
throw new IllegalStateException("Nothing in flight for " + node);
- if (slow != null && slow.remove(node))
- forEachTrackerForNode(node, ReadShardTracker::unrecordSlowRead);
+ return slow != null && slow.remove(node);
}
- public boolean recordReadSuccess(Id node)
+ /**
+ * Record a response that immediately satisfies the criteria for the shards the node participates in
+ */
+ protected RequestStatus recordSlowResponse(Id from)
{
- recordResponse(node);
- return anyForNode(node, ReadShardTracker::recordReadSuccess);
- }
+ if (!inflight.contains(from))
+ throw new IllegalStateException();
- public boolean recordReadFailure(Id node)
- {
- recordResponse(node);
- return anyForNode(node, ReadShardTracker::recordReadFailure);
- }
+ if (slow == null)
+ slow = newHashSetWithExpectedSize(maxShardsPerEpoch());
- public boolean hasCompletedRead()
- {
- return all(ReadShardTracker::hasCompletedRead);
+ if (!slow.add(from)) // we can mark slow responses due to ReadCoordinator.TryAlternative OR onSlowResponse
+ return RequestStatus.NoChange;
+
+ return recordResponse(this, from, ReadShardTracker::recordSlowResponse, true);
}
- public boolean hasInFlight()
+ /**
+ * Record a response that immediately satisfies the criteria for the shards the node participates in
+ */
+ protected RequestStatus recordReadSuccess(Id from)
{
- return !inflight.isEmpty();
+ return recordResponse(from, ReadShardTracker::recordReadSuccess);
}
- public boolean hasFailed()
+ /**
+ * Record a response that contributes to a potential quorum decision (i.e. accept once we have such a quorum)
+ */
+ protected RequestStatus recordQuorumReadSuccess(Id from)
{
- return any(ReadShardTracker::hasFailed);
+ return recordResponse(from, ReadShardTracker::recordQuorumReadSuccess);
}
- private int intersectionSize(Id node, Set<ReadShardTracker> target)
+ /**
+ * Record a failure response
+ */
+ protected RequestStatus recordReadFailure(Id from)
{
- return matchingTrackersForNode(node, target::contains);
+ return recordResponse(from, ReadShardTracker::recordReadFailure);
}
- private int compareIntersections(Id left, Id right, Set<ReadShardTracker> target)
+ protected RequestStatus recordResponse(Id from, BiFunction<? super ReadShardTracker, Boolean, ? extends ShardOutcome<? super ReadTracker>> function)
{
- return Integer.compare(intersectionSize(left, target), intersectionSize(right, target));
+ boolean isSlow = receiveResponseIsSlow(from);
+ return recordResponse(this, from, function, isSlow);
}
- /**
- * Return the smallest set of nodes needed to satisfy required reads.
- *
- * Returns null if the read cannot be completed.
- *
- * TODO: prioritisation of nodes should be implementation-defined
- */
- public Set<Id> computeMinimalReadSetAndMarkInflight()
+ public <T1> RequestStatus trySendMore(BiConsumer<T1, Id> contact, T1 with)
{
- Set<ReadShardTracker> toRead = foldl((tracker, accumulate) -> {
- if (!tracker.shouldRead())
- return accumulate;
+ ShardSelection toRead;
+ {
+ ShardSelection tmp = null;
+ for (int i = 0 ; i < trackers.length ; ++i)
+ {
+ ReadShardTracker tracker = trackers[i];
+ if (tracker == null || !tracker.shouldRead() || !tracker.canRead())
+ continue;
- if (accumulate == null)
- accumulate = new LinkedHashSet<>(); // determinism
+ if (tmp == null)
+ tmp = new ShardSelection(); // determinism
- accumulate.add(tracker);
- return accumulate;
- }, null);
+ tmp.set(i);
+ }
+ toRead = tmp;
+ }
- if (toRead == null)
- return Collections.emptySet();
+ Preconditions.checkState(toRead != null, "We were asked to read more, but found no shards in need of reading more");
- assert !toRead.isEmpty();
- Set<Id> nodes = new HashSet<>();
- while (!toRead.isEmpty())
+ // TODO: maybe for each additional candidate do one linear compare run to find better secondary match
+ // OR at least discount candidates that do not contribute additional knowledge beyond those additional
+ // candidates already contacted, since implementations are likely to sort primarily by health
+ candidates.sort((a, b) -> topologies().compare(a, b, toRead));
+ int i = candidates.size() - 1;
+ while (i >= 0)
{
- if (candidates.isEmpty())
- {
- if (!nodes.isEmpty())
- nodes.forEach(this::recordContacted);
- return null;
- }
+ Id candidate = candidates.get(i);
+ topologies().forEach((ti, topology) -> {
+ int offset = topologyOffset(ti);
+ topology.forEachOn(candidate, (si, s) -> toRead.clear(offset + si));
+ });
- // TODO: Topology needs concept of locality/distance
- candidates.sort((a, b) -> compareIntersections(a, b, toRead));
+ if (toRead.isEmpty())
+ break;
- int i = candidates.size() - 1;
- Id node = candidates.get(i);
- nodes.add(node);
- candidates.remove(i);
- forEachTrackerForNode(node, (tracker, ignore) -> toRead.remove(tracker));
+ --i;
}
- // must recordInFlightRead after loop, as we might return null if the reads are insufficient to make progress
- // but in this case we need the tracker to
- nodes.forEach(this::recordInflightRead);
+ if (!toRead.isEmpty())
+ return RequestStatus.NoChange;
- return nodes;
+ for (int j = candidates.size() - 1; j >= i; --j)
+ {
+ Id candidate = candidates.get(j);
+ recordInFlightRead(candidate);
+ contact.accept(with, candidate);
+ candidates.remove(j);
+ }
+ return RequestStatus.NoChange;
}
+ public boolean hasData()
+ {
+ return all(ReadShardTracker::hasData);
+ }
}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/RecoveryTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/RecoveryTracker.java
new file mode 100644
index 0000000..10c8e28
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/RecoveryTracker.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class RecoveryTracker extends AbstractTracker<RecoveryTracker.RecoveryShardTracker, Node.Id>
+{
+ public static class RecoveryShardTracker extends QuorumShardTracker
+ {
+ protected int fastPathRejects = 0;
+
+ RecoveryShardTracker(Shard shard)
+ {
+ super(shard);
+ }
+
+ ShardOutcomes onSuccessRejectFastPath(Node.Id from)
+ {
+ if (shard.fastPathElectorate.contains(from))
+ ++fastPathRejects;
+ return onSuccess(from);
+ }
+
+ boolean rejectsFastPath()
+ {
+ return fastPathRejects > shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
+ }
+ }
+
+ public RecoveryTracker(Topologies topologies)
+ {
+ super(topologies, RecoveryShardTracker[]::new, RecoveryShardTracker::new);
+ }
+
+ public RequestStatus recordSuccess(Node.Id node, boolean withFastPathTimestamp)
+ {
+ if (withFastPathTimestamp)
+ return recordResponse(this, node, RecoveryShardTracker::onSuccess, node);
+
+ return recordResponse(this, node, RecoveryShardTracker::onSuccessRejectFastPath, node);
+ }
+
+ // return true iff hasFailed()
+ public RequestStatus recordFailure(Node.Id from)
+ {
+ return recordResponse(this, from, RecoveryShardTracker::onFailure, from);
+ }
+
+ public boolean rejectsFastPath()
+ {
+ return any(RecoveryShardTracker::rejectsFastPath);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/RequestStatus.java b/accord-core/src/main/java/accord/coordinate/tracking/RequestStatus.java
new file mode 100644
index 0000000..a0117f3
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/RequestStatus.java
@@ -0,0 +1,8 @@
+package accord.coordinate.tracking;
+
+public enum RequestStatus
+{
+ Failed,
+ NoChange,
+ Success
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ShardTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ShardTracker.java
new file mode 100644
index 0000000..ee628be
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ShardTracker.java
@@ -0,0 +1,17 @@
+package accord.coordinate.tracking;
+
+import accord.topology.Shard;
+
+public abstract class ShardTracker
+{
+ public final Shard shard;
+
+ public ShardTracker(Shard shard)
+ {
+ this.shard = shard;
+ }
+
+ abstract boolean hasFailed();
+ abstract boolean hasReachedQuorum();
+ abstract boolean hasInFlight();
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index dd3eaa6..c942a0b 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -85,6 +85,7 @@ public class InMemoryCommandStore
return commands.get(txnId);
}
+ // TODO (soon): mimic caching to test C* behaviour
public Command ifLoaded(TxnId txnId)
{
return commands.get(txnId);
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
index 0e82b4e..d14867b 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
@@ -105,6 +105,7 @@ public class InMemoryCommandsForKey extends CommandsForKey
}
}
+ // TODO (now): add validation that anything inserted into *committedBy* has everything prior in its dependencies
private final Key key;
private final InMemoryCommandTimeseries<TxnIdWithExecuteAt> uncommitted = new InMemoryCommandTimeseries<>(cmd -> new TxnIdWithExecuteAt(cmd.txnId(), cmd.executeAt()));
private final InMemoryCommandTimeseries<TxnId> committedById = new InMemoryCommandTimeseries<>(Command::txnId);
diff --git a/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java
new file mode 100644
index 0000000..39bac13
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/SizeOfIntersectionSorter.java
@@ -0,0 +1,51 @@
+package accord.impl;
+
+import accord.api.TopologySorter;
+import accord.local.Node;
+import accord.topology.ShardSelection;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+
+public class SizeOfIntersectionSorter implements TopologySorter
+{
+ public static final TopologySorter.Supplier SUPPLIER = new Supplier() {
+ @Override
+ public TopologySorter get(Topology topology)
+ {
+ return new SizeOfIntersectionSorter(new Topologies.Single(this, topology));
+ }
+
+ @Override
+ public TopologySorter get(Topologies topologies)
+ {
+ return new SizeOfIntersectionSorter(topologies);
+ }
+ };
+
+ final Topologies topologies;
+ SizeOfIntersectionSorter(Topologies topologies)
+ {
+ this.topologies = topologies;
+ }
+
+ @Override
+ public int compare(Node.Id node1, Node.Id node2, ShardSelection shards)
+ {
+ int maxShardsPerEpoch = topologies.maxShardsPerEpoch();
+ int count1 = 0, count2 = 0;
+ for (int i = 0, mi = topologies.size() ; i < mi ; ++i)
+ {
+ Topology topology = topologies.get(i);
+ count1 += count(node1, shards, i * maxShardsPerEpoch, topology);
+ count2 += count(node2, shards, i * maxShardsPerEpoch, topology);
+ }
+
+ // sort more intersections later
+ return count1 - count2;
+ }
+
+ private static int count(Node.Id node, ShardSelection shards, int offset, Topology topology)
+ {
+ return topology.foldlIntOn(node, (i, shard, v) -> shard.get(i) ? v + 1 : v, shards, offset, 0, 0);
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 20c990e..3f86efa 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -125,13 +125,13 @@ public class Node implements ConfigurationService.Listener, NodeTimeService
private final Map<TxnId, Future<? extends Outcome>> coordinating = new ConcurrentHashMap<>();
public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
- Supplier<DataStore> dataSupplier, Agent agent, Random random, Scheduler scheduler,
+ Supplier<DataStore> dataSupplier, Agent agent, Random random, Scheduler scheduler, TopologySorter.Supplier topologySorter,
Function<Node, ProgressLog.Factory> progressLogFactory, CommandStores.Factory factory)
{
this.id = id;
this.messageSink = messageSink;
this.configService = configService;
- this.topology = new TopologyManager(id);
+ this.topology = new TopologyManager(topologySorter, id);
this.nowSupplier = nowSupplier;
Topology topology = configService.currentTopology();
this.now = new AtomicReference<>(new Timestamp(topology.epoch(), nowSupplier.getAsLong(), 0, id));
diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java b/accord-core/src/main/java/accord/local/SaveStatus.java
index 5c2d6f4..87d4b0a 100644
--- a/accord-core/src/main/java/accord/local/SaveStatus.java
+++ b/accord-core/src/main/java/accord/local/SaveStatus.java
@@ -22,7 +22,6 @@ import accord.local.Status.ExecutionStatus;
import accord.local.Status.Known;
import accord.local.Status.Phase;
-import static accord.local.Status.ExecutionStatus.*;
import static accord.local.Status.Known.*;
/**
diff --git a/accord-core/src/main/java/accord/local/SyncCommandStores.java b/accord-core/src/main/java/accord/local/SyncCommandStores.java
index fb07b70..4e2d0fd 100644
--- a/accord-core/src/main/java/accord/local/SyncCommandStores.java
+++ b/accord-core/src/main/java/accord/local/SyncCommandStores.java
@@ -10,6 +10,7 @@ import accord.utils.MapReduceConsume;
import java.util.function.Function;
import java.util.stream.IntStream;
+// TODO (soon): introduce new CommandStores that mimics asynchrony by integrating with Cluster scheduling for List workload
public class SyncCommandStores extends CommandStores<SyncCommandStores.SyncCommandStore>
{
public interface SafeSyncCommandStore extends SafeCommandStore
diff --git a/accord-core/src/main/java/accord/messages/BeginInvalidation.java b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
index 97261ae..479110b 100644
--- a/accord-core/src/main/java/accord/messages/BeginInvalidation.java
+++ b/accord-core/src/main/java/accord/messages/BeginInvalidation.java
@@ -40,7 +40,7 @@ public class BeginInvalidation extends AbstractEpochRequest<BeginInvalidation.In
if (!command.preacceptInvalidate(ballot))
return new InvalidateNack(command.promised(), command.homeKey());
- return new InvalidateOk(command.status(), command.route(), command.homeKey());
+ return new InvalidateOk(command.saveStatus(), command.route(), command.homeKey());
}
@Override
@@ -83,11 +83,11 @@ public class BeginInvalidation extends AbstractEpochRequest<BeginInvalidation.In
public static class InvalidateOk implements InvalidateReply
{
- public final Status status;
+ public final SaveStatus status;
public final @Nullable AbstractRoute route;
public final @Nullable RoutingKey homeKey;
- public InvalidateOk(Status status, @Nullable AbstractRoute route, @Nullable RoutingKey homeKey)
+ public InvalidateOk(SaveStatus status, @Nullable AbstractRoute route, @Nullable RoutingKey homeKey)
{
this.status = status;
this.route = route;
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 71028f6..621e8e4 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -194,6 +194,7 @@ public class ReadData extends AbstractEpochRequest<ReadData.ReadNack> implements
}
else if (failure != null)
{
+ // TODO (soon): test
node.reply(replyTo, replyContext, ReadNack.Error);
data = null;
node.agent().onUncaughtException(failure); // TODO: probably a better way to handle this, as might not be uncaught
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index d4c9c91..8a729dc 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -82,6 +82,11 @@ public class Shard
return (f + electorate)/2 + 1;
}
+ public boolean rejectsFastPath(int rejectCount)
+ {
+ return rejectCount > fastPathElectorate.size() - fastPathQuorumSize;
+ }
+
static int slowPathQuorumSize(int replicas)
{
return replicas - maxToleratedFailures(replicas);
diff --git a/accord-core/src/main/java/accord/topology/ShardSelection.java b/accord-core/src/main/java/accord/topology/ShardSelection.java
new file mode 100644
index 0000000..41f1cb9
--- /dev/null
+++ b/accord-core/src/main/java/accord/topology/ShardSelection.java
@@ -0,0 +1,7 @@
+package accord.topology;
+
+import java.util.BitSet;
+
+public class ShardSelection extends BitSet
+{
+}
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java
index ea77a30..6de8ddb 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -18,18 +18,19 @@
package accord.topology;
+import accord.api.TopologySorter;
import accord.local.Node;
import accord.local.Node.Id;
import accord.primitives.KeyRanges;
import accord.utils.IndexedConsumer;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.util.*;
-import java.util.function.Consumer;
// TODO: we can probably most efficiently create a new synthetic Topology that applies for a range of epochs
// and permit Topology to implement it, so that
-public interface Topologies
+public interface Topologies extends TopologySorter
{
Topology current();
@@ -42,8 +43,6 @@ public interface Topologies
return current().epoch;
}
- boolean fastPathPermitted();
-
// topologies are stored in reverse epoch order, with the highest epoch at idx 0
Topology get(int i);
@@ -57,26 +56,18 @@ public interface Topologies
Set<Node.Id> copyOfNodes();
+ int estimateUniqueNodes();
+
KeyRanges computeRangesForNode(Id node);
+ int maxShardsPerEpoch();
+
default void forEach(IndexedConsumer<Topology> consumer)
{
for (int i=0, mi=size(); i<mi; i++)
consumer.accept(i, get(i));
}
- default void forEachShard(Consumer<Shard> consumer)
- {
- for (int i=0, mi=size(); i<mi; i++)
- {
- Topology topology = get(i);
- for (int j=0, mj=topology.size(); j<mj; j++)
- {
- consumer.accept(topology.get(j));
- }
- }
- }
-
static boolean equals(Topologies t, Object o)
{
if (o == t)
@@ -122,13 +113,19 @@ public interface Topologies
class Single implements Topologies
{
+ private final TopologySorter sorter;
private final Topology topology;
- private final boolean fastPathPermitted;
- public Single(Topology topology, boolean fastPathPermitted)
+ public Single(TopologySorter.Supplier sorter, Topology topology)
{
this.topology = topology;
- this.fastPathPermitted = fastPathPermitted;
+ this.sorter = sorter.get(this);
+ }
+
+ public Single(TopologySorter sorter, Topology topology)
+ {
+ this.topology = topology;
+ this.sorter = sorter;
}
@Override
@@ -151,12 +148,6 @@ public interface Topologies
return currentEpoch();
}
- @Override
- public boolean fastPathPermitted()
- {
- return fastPathPermitted;
- }
-
@Override
public Topology get(int i)
{
@@ -195,12 +186,24 @@ public interface Topologies
return new HashSet<>(nodes());
}
+ @Override
+ public int estimateUniqueNodes()
+ {
+ return topology.nodes().size();
+ }
+
@Override
public KeyRanges computeRangesForNode(Id node)
{
return topology.rangesForNode(node);
}
+ @Override
+ public int maxShardsPerEpoch()
+ {
+ return topology.size();
+ }
+
@Override
public boolean equals(Object obj)
{
@@ -218,20 +221,33 @@ public interface Topologies
{
return Topologies.toString(this);
}
+
+ @Override
+ public int compare(Id node1, Id node2, ShardSelection shards)
+ {
+ return sorter.compare(node1, node2, shards);
+ }
}
class Multi implements Topologies
{
+ private final TopologySorter sorter;
private final List<Topology> topologies;
+ private final int maxShardsPerEpoch;
- public Multi(int initialCapacity)
+ public Multi(TopologySorter.Supplier sorter, int initialCapacity)
{
this.topologies = new ArrayList<>(initialCapacity);
+ this.sorter = sorter.get(this);
+ int maxShardsPerEpoch = 0;
+ for (int i = 0 ; i < topologies.size() ; ++i)
+ maxShardsPerEpoch = Math.max(maxShardsPerEpoch, topologies.get(i).size());
+ this.maxShardsPerEpoch = maxShardsPerEpoch;
}
- public Multi(Topology... topologies)
+ public Multi(TopologySorter.Supplier sorter, Topology... topologies)
{
- this(topologies.length);
+ this(sorter, topologies.length);
for (Topology topology : topologies)
add(topology);
}
@@ -257,15 +273,6 @@ public interface Topologies
return get(size() - 1).epoch;
}
- @Override
- public boolean fastPathPermitted()
- {
- // TODO (soon): this is overly restrictive: we can still take the fast-path during topology movements,
- // just not for transactions started across the initiation of a topology movement (i.e.
- // where the epoch changes while the transaction is being pre-accepted)
- return false;
- }
-
@Override
public Topology get(int i)
{
@@ -298,10 +305,18 @@ public interface Topologies
return false;
}
+ @Override
+ public int estimateUniqueNodes()
+ {
+ // just guess at one additional node per epoch, and at most twice as many nodes
+ int estSize = get(0).nodes().size();
+ return Math.min(estSize * 2, estSize + size() - 1);
+ }
+
@Override
public Set<Node.Id> nodes()
{
- Set<Node.Id> result = new HashSet<>();
+ Set<Node.Id> result = Sets.newHashSetWithExpectedSize(estimateUniqueNodes());
for (int i=0,mi=size(); i<mi; i++)
result.addAll(get(i).nodes());
return result;
@@ -322,6 +337,12 @@ public interface Topologies
return ranges;
}
+ @Override
+ public int maxShardsPerEpoch()
+ {
+ return maxShardsPerEpoch;
+ }
+
public void add(Topology topology)
{
Preconditions.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
@@ -345,5 +366,11 @@ public interface Topologies
{
return Topologies.toString(this);
}
+
+ @Override
+ public int compare(Id node1, Id node2, ShardSelection shards)
+ {
+ return sorter.compare(node1, node2, shards);
+ }
}
}
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index a9411ae..e2ad58f 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -19,20 +19,16 @@
package accord.topology;
import java.util.*;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import accord.api.RoutingKey;
import accord.local.Node.Id;
-import accord.api.Key;
import accord.primitives.AbstractKeys;
import accord.primitives.KeyRange;
import accord.primitives.KeyRanges;
-import accord.primitives.Keys;
-import accord.utils.IndexedConsumer;
-import accord.utils.IndexedBiFunction;
-import accord.utils.IndexedIntFunction;
-import accord.utils.IndexedPredicate;
+import accord.utils.*;
import static accord.utils.SortedArrays.exponentialSearch;
@@ -44,6 +40,12 @@ public class Topology
final KeyRanges ranges;
final Map<Id, NodeInfo> nodeLookup;
final KeyRanges subsetOfRanges;
+ /**
+ * This array is used to permit cheaper sharing of Topology objects between requests, as we must only specify
+ * the indexes within the parent Topology that we contain. This also permits us to perform efficient merges with
+ * {@code NodeInfo.supersetIndexes} to find the shards that intersect a given node without recomputing the NodeInfo.
+ * TODO: do not recompute nodeLookup
+ */
final int[] supersetIndexes;
static class NodeInfo
@@ -227,6 +229,7 @@ public class Topology
int[] newSubset = new int[Math.min(select.size(), subsetOfRanges.size())];
for (int i = 0 ; i < select.size() ; )
{
+ // TODO: use SortedArrays.findNextIntersection
// find the range containing the key at i
subsetIndex = subsetOfRanges.rangeIndexForKey(subsetIndex, subsetOfRanges.size(), select.get(i));
if (subsetIndex < 0 || subsetIndex >= subsetOfRanges.size())
@@ -272,42 +275,46 @@ public class Topology
return accumulator;
}
- /**
- * @param on the node to limit our selection to
- * @param select may be a superSet of the keys owned by {@code on} but not of this {@code Topology}
- */
- public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
+ public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
{
NodeInfo info = nodeLookup.get(on);
- for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
+ if (info == null)
+ return;
+ int[] a = supersetIndexes, b = info.supersetIndexes;
+ int ai = 0, bi = 0;
+ while (ai < a.length && bi < b.length)
{
- Key key = select.get(i);
- Shard shard = shards[supersetIndexes[j]];
- int c = supersetIndexes[j] - info.supersetIndexes[k];
- if (c < 0) ++j;
- else if (c > 0) ++k;
+ if (a[ai] == b[bi])
+ {
+ consumer.accept(ai, shards[a[ai]]);
+ ++ai; ++bi;
+ }
+ else if (a[ai] < b[bi])
+ {
+ ai = exponentialSearch(a, ai + 1, a.length, b[bi]);
+ if (ai < 0) ai = -1 -ai;
+ }
else
{
- int rcmp = shard.range.compareKey(key);
- if (rcmp < 0) ++i;
- else if (rcmp == 0) { consumer.accept(j, shard); i++; j++; k++; }
- else { j++; k++; }
+ bi = exponentialSearch(b, bi + 1, b.length, a[ai]);
+ if (bi < 0) bi = -1 -bi;
}
}
}
- public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
+ public <P1, P2, P3, O> O mapReduceOn(Id on, int offset, IndexedTriFunction<? super P1, ? super P2, ? super P3, ? extends O> function, P1 p1, P2 p2, P3 p3, BiFunction<? super O, ? super O, ? extends O> reduce, O initialValue)
{
NodeInfo info = nodeLookup.get(on);
if (info == null)
- return;
+ return initialValue;
int[] a = supersetIndexes, b = info.supersetIndexes;
int ai = 0, bi = 0;
while (ai < a.length && bi < b.length)
{
if (a[ai] == b[bi])
{
- consumer.accept(ai, shards[a[ai]]);
+ O next = function.apply(offset + ai, p1, p2, p3);
+ initialValue = reduce.apply(initialValue, next);
++ai; ++bi;
}
else if (a[ai] < b[bi])
@@ -321,6 +328,7 @@ public class Topology
if (bi < 0) bi = -1 -bi;
}
}
+ return initialValue;
}
public int matchesOn(Id on, IndexedPredicate<Shard> consumer)
@@ -354,7 +362,7 @@ public class Topology
return count;
}
- public int foldlIntOn(Id on, IndexedIntFunction<Shard> consumer, int offset, int initialValue, int terminalValue)
+ public <P> int foldlIntOn(Id on, IndexedIntFunction<P> consumer, P param, int offset, int initialValue, int terminalValue)
{
// TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
NodeInfo info = nodeLookup.get(on);
@@ -366,7 +374,7 @@ public class Topology
{
if (a[ai] == b[bi])
{
- initialValue = consumer.apply(offset + ai, shards[a[ai]], initialValue);
+ initialValue = consumer.apply(offset + ai, param, initialValue);
if (terminalValue == initialValue)
return terminalValue;
++ai; ++bi;
@@ -448,19 +456,6 @@ public class Topology
return ranges;
}
- // TODO: use SortedArrays impls
- private static boolean intersects(int[] is, int[] js)
- {
- for (int i = 0, j = 0 ; i < is.length && j < js.length ;)
- {
- int c = is[i] - js[j];
- if (c < 0) ++i;
- else if (c > 0) ++j;
- else return true;
- }
- return false;
- }
-
public Shard[] unsafeGetShards()
{
return shards;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 9ffce88..f33cc08 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -20,6 +20,7 @@ package accord.topology;
import accord.api.ConfigurationService;
import accord.api.RoutingKey;
+import accord.api.TopologySorter;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node.Id;
import accord.messages.EpochRequest;
@@ -37,7 +38,8 @@ import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import java.util.*;
-import java.util.function.LongConsumer;
+
+import static accord.coordinate.tracking.RequestStatus.Success;
/**
* Manages topology state changes and update bookkeeping
@@ -63,12 +65,12 @@ public class TopologyManager implements ConfigurationService.Listener
private boolean syncComplete = false;
private boolean prevSynced;
- EpochState(Id node, Topology global, boolean prevSynced)
+ EpochState(Id node, Topology global, TopologySorter sorter, boolean prevSynced)
{
this.global = global;
this.local = global.forNode(node).trim();
Preconditions.checkArgument(!global().isSubset());
- this.syncTracker = new QuorumTracker(new Single(global(), false));
+ this.syncTracker = new QuorumTracker(new Single(sorter, global()));
this.prevSynced = prevSynced;
}
@@ -79,7 +81,7 @@ public class TopologyManager implements ConfigurationService.Listener
public void recordSyncComplete(Id node)
{
- syncComplete = syncTracker.success(node);
+ syncComplete = syncTracker.recordSuccess(node) == Success;
}
Topology global()
@@ -114,7 +116,7 @@ public class TopologyManager implements ConfigurationService.Listener
Boolean result = global().foldl(keys, (i, shard, acc) -> {
if (acc == Boolean.FALSE)
return acc;
- return Boolean.valueOf(syncTracker.unsafeGet(i).hasReachedQuorum());
+ return syncTracker.unsafeGet(i).hasReachedQuorum();
}, Boolean.TRUE);
return result == Boolean.TRUE;
}
@@ -125,7 +127,7 @@ public class TopologyManager implements ConfigurationService.Listener
}
}
- private class Epochs
+ private static class Epochs
{
private final long currentEpoch;
private final EpochState[] epochs;
@@ -220,11 +222,13 @@ public class TopologyManager implements ConfigurationService.Listener
}
}
+ private final TopologySorter.Supplier sorter;
private final Id node;
private volatile Epochs epochs;
- public TopologyManager(Id node)
+ public TopologyManager(TopologySorter.Supplier sorter, Id node)
{
+ this.sorter = sorter;
this.node = node;
this.epochs = new Epochs(new EpochState[0]);
}
@@ -247,7 +251,7 @@ public class TopologyManager implements ConfigurationService.Listener
System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length);
boolean prevSynced = current.epochs.length == 0 || current.epochs[0].syncComplete();
- nextEpochs[0] = new EpochState(node, topology, prevSynced);
+ nextEpochs[0] = new EpochState(node, topology, sorter.get(topology), prevSynced);
List<AsyncPromise<Void>> futureEpochFutures = new ArrayList<>(current.futureEpochFutures);
AsyncPromise<Void> toComplete = !futureEpochFutures.isEmpty() ? futureEpochFutures.remove(0) : null;
@@ -267,6 +271,11 @@ public class TopologyManager implements ConfigurationService.Listener
epochs.syncComplete(node, epoch);
}
+ public TopologySorter.Supplier sorter()
+ {
+ return sorter;
+ }
+
public Topology current()
{
return epochs.current();
@@ -292,7 +301,7 @@ public class TopologyManager implements ConfigurationService.Listener
EpochState maxEpochState = snapshot.get(maxEpoch);
if (minEpoch == maxEpoch && !snapshot.requiresHistoricalTopologiesFor(keys, maxEpoch))
- return new Single(maxEpochState.global.forKeys(keys), true);
+ return new Single(sorter, maxEpochState.global.forKeys(keys));
int start = (int)(snapshot.currentEpoch - maxEpoch);
int limit = (int)(Math.min(1 + snapshot.currentEpoch - minEpoch, snapshot.epochs.length));
@@ -315,7 +324,7 @@ public class TopologyManager implements ConfigurationService.Listener
epochState.global.visitNodeForKeysOnceOrMore(keys, (i1, i2) -> true, nodes::add);
}
- Topologies.Multi topologies = new Topologies.Multi(count);
+ Topologies.Multi topologies = new Topologies.Multi(sorter, count);
for (int i = start; i < limit ; ++i)
{
EpochState epochState = snapshot.epochs[i];
@@ -338,14 +347,14 @@ public class TopologyManager implements ConfigurationService.Listener
Epochs snapshot = epochs;
if (minEpoch == maxEpoch)
- return new Single(snapshot.get(minEpoch).global.forKeys(keys), true);
+ return new Single(sorter, snapshot.get(minEpoch).global.forKeys(keys));
Set<Id> nodes = new LinkedHashSet<>();
int count = (int)(1 + maxEpoch - minEpoch);
for (int i = count - 1 ; i >= 0 ; --i)
snapshot.get(minEpoch + i).global().visitNodeForKeysOnceOrMore(keys, (i1, i2) -> true, nodes::add);
- Topologies.Multi topologies = new Topologies.Multi(count);
+ Topologies.Multi topologies = new Topologies.Multi(sorter, count);
for (int i = count - 1 ; i >= 0 ; --i)
topologies.add(snapshot.get(minEpoch + i).global.forKeys(keys, nodes));
@@ -355,7 +364,7 @@ public class TopologyManager implements ConfigurationService.Listener
public Topologies forEpoch(AbstractKeys<?, ?> keys, long epoch)
{
EpochState state = epochs.get(epoch);
- return new Single(state.global.forKeys(keys), true);
+ return new Single(sorter, state.global.forKeys(keys));
}
public Shard forEpochIfKnown(RoutingKey key, long epoch)
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/utils/IndexedFunction.java
similarity index 50%
copy from accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to accord-core/src/main/java/accord/utils/IndexedFunction.java
index 428c340..e4d662c 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFunction.java
@@ -16,29 +16,9 @@
* limitations under the License.
*/
-package accord.coordinate.tracking;
+package accord.utils;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
-import accord.local.Node;
-import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
-
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
+public interface IndexedFunction<T, R>
{
- public QuorumTracker(Topologies topologies)
- {
- super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
- }
-
- public QuorumTracker(Topology topology)
- {
- super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
- }
-
- // return true iff hasReachedQuorum()
- public boolean success(Node.Id node)
- {
- return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
- }
+ R apply(int i, T t);
}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
similarity index 50%
copy from accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
copy to accord-core/src/main/java/accord/utils/IndexedTriFunction.java
index 428c340..3c722d5 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
+++ b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
@@ -16,29 +16,9 @@
* limitations under the License.
*/
-package accord.coordinate.tracking;
+package accord.utils;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
-import accord.local.Node;
-import accord.topology.Topologies;
-import accord.topology.Topologies.Single;
-import accord.topology.Topology;
-
-public class QuorumTracker extends AbstractQuorumTracker<QuorumShardTracker>
+public interface IndexedTriFunction<I1, I2, I3, O>
{
- public QuorumTracker(Topologies topologies)
- {
- super(topologies, QuorumShardTracker[]::new, QuorumShardTracker::new);
- }
-
- public QuorumTracker(Topology topology)
- {
- super(new Single(topology, false), QuorumShardTracker[]::new, QuorumShardTracker::new);
- }
-
- // return true iff hasReachedQuorum()
- public boolean success(Node.Id node)
- {
- return allForNode(node, QuorumShardTracker::success) && hasReachedQuorum();
- }
+ O apply(int i0, I1 i1, I2 i2, I3 i3);
}
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 34c9ba3..94ba547 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -18,6 +18,8 @@
package accord;
+import accord.api.TopologySorter;
+import accord.impl.SizeOfIntersectionSorter;
import accord.primitives.KeyRange;
import accord.local.Node;
import accord.impl.mock.MockStore;
@@ -109,6 +111,6 @@ public class Utils
public static Topologies topologies(Topology... topologies)
{
- return new Topologies.Multi(topologies);
+ return new Topologies.Multi(SizeOfIntersectionSorter.SUPPLIER, topologies);
}
}
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index 1fa99ab..cfe5641 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -268,7 +268,7 @@ public class BurnTest
public static void main(String[] args) throws Exception
{
// Long overrideSeed = null;
- Long overrideSeed = 1727794989115080196L;
+ Long overrideSeed = -5707319446834273528L;
do
{
run(overrideSeed != null ? overrideSeed : ThreadLocalRandom.current().nextLong());
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index 9fabfee..95e4656 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -44,7 +44,7 @@ public class CoordinateTest
Node node = cluster.get(1);
Assertions.assertNotNull(node);
- TxnId txnId = new TxnId(1, 100, 0, node.id());
+ TxnId txnId = node.nextTxnId();
Keys keys = keys(10);
Txn txn = writeTxn(keys);
Route route = keys.toRoute(keys.get(0).toRoutingKey());
@@ -71,7 +71,8 @@ public class CoordinateTest
private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
{
- TxnId txnId = new TxnId(1, clock, 0, node.id());
+ TxnId txnId = node.nextTxnId();
+ txnId = new TxnId(txnId.epoch, txnId.real + clock, 0, txnId.node);
Txn txn = writeTxn(keys);
Result result = Coordinate.coordinate(node, txnId, txn, node.computeRoute(txnId, txn.keys())).get();
Assertions.assertEquals(MockStore.RESULT, result);
@@ -134,7 +135,7 @@ public class CoordinateTest
Node node = cluster.get(1);
Assertions.assertNotNull(node);
- TxnId txnId = new TxnId(1, 100, 0, node.id());
+ TxnId txnId = node.nextTxnId();
Keys oneKey = keys(10);
Keys twoKeys = keys(10, 20);
Txn txn = new Txn.InMemory(oneKey, MockStore.read(oneKey), MockStore.QUERY, MockStore.update(twoKeys));
diff --git a/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java b/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java
index b6b9b32..d175008 100644
--- a/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/PreAcceptTrackerTest.java
@@ -41,14 +41,14 @@ public class PreAcceptTrackerTest
[1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
*/
- private static void assertResponseState(FastPathTracker<?> responses,
+ private static void assertResponseState(FastPathTracker responses,
boolean quorumReached,
boolean fastPathAccepted,
boolean failed,
boolean hasOutstandingResponses)
{
Assertions.assertEquals(quorumReached, responses.hasReachedQuorum());
- Assertions.assertEquals(fastPathAccepted, responses.hasMetFastPathCriteria());
+ Assertions.assertEquals(fastPathAccepted, responses.hasFastPathAccepted());
Assertions.assertEquals(failed, responses.hasFailed());
Assertions.assertEquals(hasOutstandingResponses, responses.hasInFlight());
}
@@ -57,7 +57,7 @@ public class PreAcceptTrackerTest
void singleShard()
{
Topology subTopology = topology(topology.get(0));
- FastPathTracker responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+ FastPathTracker responses = new FastPathTracker(topologies(subTopology));
responses.recordSuccess(ids[0], false);
assertResponseState(responses, false, false, false, true);
@@ -73,7 +73,7 @@ public class PreAcceptTrackerTest
void singleShardFastPath()
{
Topology subTopology = topology(topology.get(0));
- FastPathTracker responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+ FastPathTracker responses = new FastPathTracker(topologies(subTopology));
responses.recordSuccess(ids[0], true);
assertResponseState(responses, false, false, false, true);
@@ -92,7 +92,7 @@ public class PreAcceptTrackerTest
void unexpectedResponsesAreIgnored()
{
Topology subTopology = topology(topology.get(0));
- FastPathTracker responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+ FastPathTracker responses = new FastPathTracker(topologies(subTopology));
responses.recordSuccess(ids[0], false);
assertResponseState(responses, false, false, false, true);
@@ -109,15 +109,15 @@ public class PreAcceptTrackerTest
void failure()
{
Topology subTopology = topology(topology.get(0));
- FastPathTracker<?> responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+ FastPathTracker responses = new FastPathTracker(topologies(subTopology));
responses.recordSuccess(ids[0], true);
assertResponseState(responses, false, false, false, true);
- responses.failure(ids[1]);
+ responses.recordFailure(ids[1]);
assertResponseState(responses, false, false, false, true);
- responses.failure(ids[2]);
+ responses.recordFailure(ids[2]);
assertResponseState(responses, false, false, true, false);
}
@@ -125,7 +125,7 @@ public class PreAcceptTrackerTest
void multiShard()
{
Topology subTopology = new Topology(1, new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
- FastPathTracker<Coordinate.ShardTracker> responses = new FastPathTracker<>(topologies(subTopology), Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+ FastPathTracker responses = new FastPathTracker(topologies(subTopology));
/*
(000, 100](100, 200](200, 300]
[1, 2, 3] [2, 3, 4] [3, 4, 5]
diff --git a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
index bd14031..3298489 100644
--- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
+++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java
@@ -104,63 +104,4 @@ public class TopologyChangeTest
});
}
}
-
- @Test
- void fastPathSkippedUntilSync() throws Throwable
- {
- Keys keys = keys(150);
- KeyRange range = range(100, 200);
- Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
- Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
- try (MockCluster cluster = MockCluster.builder().nodes(3)
- .messageSink(RecordingMessageSink::new)
- .topology(topology1).build())
- {
- Node node1 = cluster.get(1);
- RecordingMessageSink messageSink = (RecordingMessageSink) node1.messageSink();
- messageSink.clearHistory();
- TxnId txnId1 = coordinate(node1, keys);
- node1.commandStores().forEach(empty(), keys, 1, 1, commands -> {
- Command command = commands.command(txnId1);
- Assertions.assertTrue(command.partialDeps().isEmpty());
- }).awaitUninterruptibly();
-
- // check there was no accept phase
- Assertions.assertFalse(new ArrayList<>(messageSink.requests).stream().anyMatch(env -> env.payload instanceof Accept));
-
- cluster.configServices(1, 2, 3).forEach(config -> config.reportTopology(topology2));
- messageSink.clearHistory();
-
- // post epoch change, there _should_ be accepts, but with the original timestamp
- TxnId txnId2 = coordinate(node1, keys);
- Set<Node.Id> accepts = new ArrayList<>(messageSink.requests).stream()
- .filter(env -> env.payload instanceof Accept).map(env -> {
- Accept accept = (Accept) env.payload;
- Assertions.assertEquals(txnId2, accept.txnId);
- return env.to;
- }).collect(Collectors.toSet());
- Assertions.assertEquals(idSet(1, 2, 3), accepts);
-
- node1.commandStores().forEach(empty(), keys, 2, 2, commands -> {
- Command command = commands.command(txnId2);
- Assertions.assertTrue(command.hasBeen(Status.Committed));
- Assertions.assertTrue(command.partialDeps().contains(txnId1));
- Assertions.assertEquals(txnId2, command.executeAt());
- }).awaitUninterruptibly();
-
- EpochSync.sync(cluster, 1);
-
- // post sync, fast path should be working again, and there should be no accept phase
- messageSink.clearHistory();
- TxnId txnId3 = coordinate(node1, keys);
- Assertions.assertFalse(new ArrayList<>(messageSink.requests).stream().anyMatch(env -> env.payload instanceof Accept));
- node1.commandStores().forEach(empty(), keys, 2, 2, commands -> {
- Command command = commands.command(txnId3);
- Assertions.assertTrue(command.hasBeen(Status.Committed));
- Assertions.assertTrue(command.partialDeps().contains(txnId1));
- Assertions.assertTrue(command.partialDeps().contains(txnId2));
- Assertions.assertEquals(txnId3, command.executeAt());
- }).awaitUninterruptibly();
- }
- }
}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/FastPathTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/FastPathTrackerReconciler.java
new file mode 100644
index 0000000..905dd6d
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/FastPathTrackerReconciler.java
@@ -0,0 +1,59 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.FastPathTracker.FastPathShardTracker;
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.local.Node;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+public class FastPathTrackerReconciler extends TrackerReconciler<FastPathShardTracker, FastPathTracker, FastPathTrackerReconciler.Rsp>
+{
+ enum Rsp { FAST, SLOW, FAIL }
+
+ FastPathTrackerReconciler(Random random, Topologies topologies)
+ {
+ this(random, new FastPathTracker(topologies));
+ }
+
+ private FastPathTrackerReconciler(Random random, FastPathTracker tracker)
+ {
+ super(random, Rsp.class, tracker, new ArrayList<>(tracker.nodes()));
+ }
+
+ @Override
+ RequestStatus invoke(Rsp event, FastPathTracker tracker, Node.Id from)
+ {
+ switch (event)
+ {
+ default: throw new AssertionError();
+ case FAST: inflight.remove(from); return tracker.recordSuccess(from, true);
+ case SLOW: inflight.remove(from); return tracker.recordSuccess(from, false);
+ case FAIL: inflight.remove(from); return tracker.recordFailure(from);
+ }
+ }
+
+ @Override
+ void validate(RequestStatus status)
+ {
+ switch (status)
+ {
+ case Failed:
+ Assertions.assertTrue(tracker.any(ShardTracker::hasFailed));
+ Assertions.assertFalse(tracker.all(FastPathShardTracker::hasReachedQuorum));
+ break;
+
+ case Success:
+ Assertions.assertTrue(tracker.all(FastPathShardTracker::hasReachedQuorum));
+ Assertions.assertTrue(tracker.all(shard -> shard.hasRejectedFastPath() || shard.hasMetFastPathCriteria()));
+ Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+ break;
+
+ case NoChange:
+ Assertions.assertFalse(tracker.all(shard -> shard.hasRejectedFastPath() || shard.hasMetFastPathCriteria()) && tracker.all(FastPathShardTracker::hasReachedQuorum));
+ Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerReconciler.java
new file mode 100644
index 0000000..e94b385
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerReconciler.java
@@ -0,0 +1,56 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.local.Node;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+public class QuorumTrackerReconciler extends TrackerReconciler<QuorumShardTracker, QuorumTracker, QuorumTrackerReconciler.Rsp>
+{
+ enum Rsp { QUORUM, FAIL }
+
+ QuorumTrackerReconciler(Random random, Topologies topologies)
+ {
+ this(random, new QuorumTracker(topologies));
+ }
+
+ private QuorumTrackerReconciler(Random random, QuorumTracker tracker)
+ {
+ super(random, Rsp.class, tracker, new ArrayList<>(tracker.nodes()));
+ }
+
+ @Override
+ RequestStatus invoke(Rsp event, QuorumTracker tracker, Node.Id from)
+ {
+ switch (event)
+ {
+ default: throw new AssertionError();
+ case QUORUM: inflight.remove(from); return tracker.recordSuccess(from);
+ case FAIL: inflight.remove(from); return tracker.recordFailure(from);
+ }
+ }
+
+ @Override
+ void validate(RequestStatus status)
+ {
+ switch (status)
+ {
+ case Failed:
+ Assertions.assertTrue(tracker.any(ShardTracker::hasFailed));
+ Assertions.assertFalse(tracker.all(QuorumShardTracker::hasReachedQuorum));
+ break;
+
+ case Success:
+ Assertions.assertTrue(tracker.all(QuorumShardTracker::hasReachedQuorum));
+ Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+ break;
+
+ case NoChange:
+ Assertions.assertFalse(tracker.all(QuorumShardTracker::hasReachedQuorum));
+ Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
index 9014b92..6aab3a9 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
@@ -58,13 +58,13 @@ public class QuorumTrackerTest
Topology subTopology = topology(topology.get(0));
QuorumTracker responses = new QuorumTracker(topologies(subTopology));
- responses.success(ids[0]);
+ responses.recordSuccess(ids[0]);
assertResponseState(responses, false, false, true);
- responses.success(ids[1]);
+ responses.recordSuccess(ids[1]);
assertResponseState(responses, true, false, true);
- responses.success(ids[2]);
+ responses.recordSuccess(ids[2]);
assertResponseState(responses, true, false, false);
}
@@ -77,14 +77,14 @@ public class QuorumTrackerTest
Topology subTopology = topology(topology.get(0));
QuorumTracker responses = new QuorumTracker(topologies(subTopology));
- responses.success(ids[0]);
+ responses.recordSuccess(ids[0]);
assertResponseState(responses, false, false, true);
- responses.success(ids[1]);
+ responses.recordSuccess(ids[1]);
assertResponseState(responses, true, false, true);
Assertions.assertFalse(subTopology.get(0).nodes.contains(ids[4]));
- responses.success(ids[4]);
+ responses.recordSuccess(ids[4]);
assertResponseState(responses, true, false, true);
}
@@ -94,13 +94,13 @@ public class QuorumTrackerTest
Topology subTopology = topology(topology.get(0));
QuorumTracker responses = new QuorumTracker(topologies(subTopology));
- responses.success(ids[0]);
+ responses.recordSuccess(ids[0]);
assertResponseState(responses, false, false, true);
- responses.failure(ids[1]);
+ responses.recordFailure(ids[1]);
assertResponseState(responses, false, false, true);
- responses.failure(ids[2]);
+ responses.recordFailure(ids[2]);
assertResponseState(responses, false, true, false);
}
@@ -118,11 +118,11 @@ public class QuorumTrackerTest
Assertions.assertSame(subTopology.get(1), responses.unsafeGet(1).shard);
Assertions.assertSame(subTopology.get(2), responses.unsafeGet(2).shard);
- responses.success(ids[1]);
+ responses.recordSuccess(ids[1]);
assertResponseState(responses, false, false, true);
- responses.success(ids[2]);
+ responses.recordSuccess(ids[2]);
assertResponseState(responses, false, false, true);
- responses.success(ids[3]);
+ responses.recordSuccess(ids[3]);
assertResponseState(responses, true, false, true);
}
@@ -135,14 +135,14 @@ public class QuorumTrackerTest
QuorumTracker responses = new QuorumTracker(topologies(topology2, topology1));
- responses.success(id(1));
+ responses.recordSuccess(id(1));
assertResponseState(responses, false, false, true);
- responses.success(id(2));
+ responses.recordSuccess(id(2));
assertResponseState(responses, false, false, true);
- responses.success(id(4));
+ responses.recordSuccess(id(4));
assertResponseState(responses, false, false, true);
- responses.success(id(5));
+ responses.recordSuccess(id(5));
assertResponseState(responses, true, false, true);
}
@@ -155,14 +155,14 @@ public class QuorumTrackerTest
QuorumTracker responses = new QuorumTracker(topologies(topology2, topology1));
- responses.success(id(1));
+ responses.recordSuccess(id(1));
assertResponseState(responses, false, false, true);
- responses.success(id(2));
+ responses.recordSuccess(id(2));
assertResponseState(responses, false, false, true);
- responses.failure(id(4));
+ responses.recordFailure(id(4));
assertResponseState(responses, false, false, true);
- responses.failure(id(5));
+ responses.recordFailure(id(5));
assertResponseState(responses, false, true, true);
}
}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerReconciler.java
new file mode 100644
index 0000000..72bfb71
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerReconciler.java
@@ -0,0 +1,84 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.ReadTracker.ReadShardTracker;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Stream;
+
+public class ReadTrackerReconciler extends TrackerReconciler<ReadShardTracker, ReadTracker, ReadTrackerReconciler.Rsp>
+{
+ enum Rsp { DATA, QUORUM, SLOW, FAIL }
+
+ static class InFlightCapturingReadTracker extends ReadTracker
+ {
+ final List<Node.Id> inflight = new ArrayList<>();
+ public InFlightCapturingReadTracker(Topologies topologies)
+ {
+ super(topologies);
+ }
+
+ @Override
+ protected RequestStatus trySendMore()
+ {
+ return super.trySendMore(List::add, inflight);
+ }
+ }
+
+ ReadTrackerReconciler(Random random, Topologies topologies)
+ {
+ this(random, new InFlightCapturingReadTracker(topologies));
+ }
+
+ private ReadTrackerReconciler(Random random, InFlightCapturingReadTracker tracker)
+ {
+ super(random, Rsp.class, tracker, tracker.inflight);
+ }
+
+ @Override
+ void test()
+ {
+ tracker.trySendMore();
+ super.test();
+ }
+
+ @Override
+ RequestStatus invoke(Rsp event, ReadTracker tracker, Node.Id from)
+ {
+ switch (event)
+ {
+ default: throw new AssertionError();
+ case DATA: inflight.remove(from); return tracker.recordReadSuccess(from);
+ case QUORUM: inflight.remove(from); return tracker.recordQuorumReadSuccess(from);
+ case FAIL: inflight.remove(from); return tracker.recordReadFailure(from);
+ case SLOW: return tracker.recordSlowResponse(from);
+ }
+ }
+
+ @Override
+ void validate(RequestStatus status)
+ {
+ switch (status)
+ {
+ case Failed:
+ Assertions.assertTrue(tracker.any(ReadShardTracker::hasFailed));
+ Assertions.assertFalse(tracker.all(ReadShardTracker::hasSucceeded));
+ break;
+
+ case Success:
+ Assertions.assertTrue(tracker.all(ReadShardTracker::hasSucceeded));
+ Assertions.assertFalse(tracker.any(ReadShardTracker::hasFailed));
+ break;
+
+ case NoChange:
+ Assertions.assertFalse(tracker.all(ReadShardTracker::hasSucceeded));
+ Assertions.assertFalse(tracker.any(ReadShardTracker::hasFailed));
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
index a65a64b..96a9a68 100644
--- a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
@@ -19,22 +19,24 @@
package accord.coordinate.tracking;
import accord.impl.TopologyUtils;
-import accord.local.Node;
+import accord.local.Node.Id;
import accord.primitives.KeyRanges;
import accord.topology.Shard;
+import accord.topology.Topologies;
import accord.topology.Topology;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import static accord.Utils.*;
import static accord.utils.Utils.toArray;
public class ReadTrackerTest
{
- private static final Node.Id[] ids = toArray(ids(5), Node.Id[]::new);
+ private static final Id[] ids = toArray(ids(5), Id[]::new);
private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
private static final Topology topology = TopologyUtils.initialTopology(ids, ranges, 3);
/*
@@ -42,11 +44,39 @@ public class ReadTrackerTest
[1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
*/
+ static class AutoReadTracker extends ReadTracker
+ {
+ public AutoReadTracker(Topologies topologies)
+ {
+ super(topologies);
+ }
+
+ @Override
+ public RequestStatus trySendMore()
+ {
+ return super.trySendMore(Set::add, inflight);
+ }
+ }
+
+ static class TestReadTracker extends ReadTracker
+ {
+ public TestReadTracker(Topologies topologies)
+ {
+ super(topologies);
+ }
+
+ @Override
+ public RequestStatus trySendMore()
+ {
+ return RequestStatus.NoChange;
+ }
+ }
+
private static void assertResponseState(ReadTracker responses,
boolean complete,
boolean failed)
{
- Assertions.assertEquals(complete, responses.hasCompletedRead());
+ Assertions.assertEquals(complete, responses.hasData());
Assertions.assertEquals(failed, responses.hasFailed());
}
@@ -54,9 +84,9 @@ public class ReadTrackerTest
void singleShard()
{
Topology subTopology = topology(topology.get(0));
- ReadTracker tracker = ReadTracker.create(topologies(subTopology));
+ ReadTracker tracker = new ReadTracker(topologies(subTopology));
- tracker.recordInflightRead(ids[0]);
+ tracker.recordInFlightRead(ids[0]);
assertResponseState(tracker, false, false);
tracker.recordReadSuccess(ids[0]);
@@ -67,15 +97,15 @@ public class ReadTrackerTest
void singleShardRetry()
{
Topology subTopology = topology(topology.get(0));
- ReadTracker tracker = ReadTracker.create(topologies(subTopology));
+ ReadTracker tracker = new AutoReadTracker(topologies(subTopology));
- tracker.recordInflightRead(ids[0]);
+ tracker.recordInFlightRead(ids[0]);
assertResponseState(tracker, false, false);
tracker.recordReadFailure(ids[0]);
assertResponseState(tracker, false, false);
- tracker.recordInflightRead(ids[1]);
+ tracker.recordInFlightRead(ids[1]);
assertResponseState(tracker, false, false);
tracker.recordReadSuccess(ids[1]);
@@ -86,17 +116,17 @@ public class ReadTrackerTest
void singleShardFailure()
{
Topology subTopology = topology(topology.get(0));
- ReadTracker tracker = ReadTracker.create(topologies(subTopology));
+ ReadTracker tracker = new TestReadTracker(topologies(subTopology));
- tracker.recordInflightRead(ids[0]);
+ tracker.recordInFlightRead(ids[0]);
tracker.recordReadFailure(ids[0]);
assertResponseState(tracker, false, false);
- tracker.recordInflightRead(ids[1]);
+ tracker.recordInFlightRead(ids[1]);
tracker.recordReadFailure(ids[1]);
assertResponseState(tracker, false, false);
- tracker.recordInflightRead(ids[2]);
+ tracker.recordInFlightRead(ids[2]);
tracker.recordReadFailure(ids[2]);
assertResponseState(tracker, false, true);
}
@@ -105,13 +135,13 @@ public class ReadTrackerTest
void multiShardSuccess()
{
Topology subTopology = new Topology(1, new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
- ReadTracker responses = ReadTracker.create(topologies(subTopology));
+ ReadTracker responses = new AutoReadTracker(topologies(subTopology));
/*
(000, 100](100, 200](200, 300]
[1, 2, 3] [2, 3, 4] [3, 4, 5]
*/
- responses.recordInflightRead(ids[2]);
+ responses.recordInFlightRead(ids[2]);
responses.recordReadSuccess(ids[2]);
assertResponseState(responses, true, false);
}
@@ -120,30 +150,44 @@ public class ReadTrackerTest
void multiShardRetryAndReadSet()
{
Topology subTopology = new Topology(1, new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
- ReadTracker responses = ReadTracker.create(topologies(subTopology));
+ ReadTracker responses = new TestReadTracker(topologies(subTopology));
/*
(000, 100](100, 200](200, 300]
[1, 2, 3] [2, 3, 4] [3, 4, 5]
*/
- Assertions.assertEquals(Sets.newHashSet(ids[2]), responses.computeMinimalReadSetAndMarkInflight());
+ assertContacts(Sets.newHashSet(ids[2]), responses);
assertResponseState(responses, false, false);
responses.recordReadFailure(ids[2]);
assertResponseState(responses, false, false);
- Assertions.assertEquals(Sets.newHashSet(ids[1], ids[3]), responses.computeMinimalReadSetAndMarkInflight());
+ assertContacts(Sets.newHashSet(ids[1], ids[3]), responses);
assertResponseState(responses, false, false);
responses.recordReadFailure(ids[1]);
- Assertions.assertEquals(Sets.newHashSet(ids[0]), responses.computeMinimalReadSetAndMarkInflight());
+ assertContacts(Sets.newHashSet(ids[0]), responses);
responses.recordReadSuccess(ids[3]);
assertResponseState(responses, false, false);
- Assertions.assertEquals(Collections.emptySet(), responses.computeMinimalReadSetAndMarkInflight());
+ try
+ {
+ responses.trySendMore((i,j)->{}, null);
+ Assertions.fail();
+ }
+ catch (IllegalStateException t)
+ {
+ }
responses.recordReadSuccess(ids[0]);
assertResponseState(responses, true, false);
}
+
+ private static void assertContacts(Set<Id> expect, ReadTracker tracker)
+ {
+ Set<Id> actual = new HashSet<>();
+ tracker.trySendMore(Set::add, actual);
+ Assertions.assertEquals(expect, actual);
+ }
}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/RecoveryTrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/RecoveryTrackerReconciler.java
new file mode 100644
index 0000000..314c63f
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/RecoveryTrackerReconciler.java
@@ -0,0 +1,58 @@
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.RecoveryTracker.RecoveryShardTracker;
+import accord.local.Node;
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.Random;
+
+// TODO: check fast path accounting
+public class RecoveryTrackerReconciler extends TrackerReconciler<RecoveryShardTracker, RecoveryTracker, RecoveryTrackerReconciler.Rsp>
+{
+ enum Rsp { FAST, SLOW, FAIL }
+
+ RecoveryTrackerReconciler(Random random, Topologies topologies)
+ {
+ this(random, new RecoveryTracker(topologies));
+ }
+
+ private RecoveryTrackerReconciler(Random random, RecoveryTracker tracker)
+ {
+ super(random, Rsp.class, tracker, new ArrayList<>(tracker.nodes()));
+ }
+
+ @Override
+ RequestStatus invoke(Rsp event, RecoveryTracker tracker, Node.Id from)
+ {
+ switch (event)
+ {
+ default: throw new AssertionError();
+ case FAST: inflight.remove(from); return tracker.recordSuccess(from, true);
+ case SLOW: inflight.remove(from); return tracker.recordSuccess(from, false);
+ case FAIL: inflight.remove(from); return tracker.recordFailure(from);
+ }
+ }
+
+ @Override
+ void validate(RequestStatus status)
+ {
+ switch (status)
+ {
+ case Failed:
+ Assertions.assertTrue(tracker.any(ShardTracker::hasFailed));
+ Assertions.assertFalse(tracker.all(RecoveryShardTracker::hasReachedQuorum));
+ break;
+
+ case Success:
+ Assertions.assertTrue(tracker.all(RecoveryShardTracker::hasReachedQuorum));
+ Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+ break;
+
+ case NoChange:
+ Assertions.assertFalse(tracker.all(RecoveryShardTracker::hasReachedQuorum));
+ Assertions.assertFalse(tracker.any(ShardTracker::hasFailed));
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
new file mode 100644
index 0000000..acbb3b3
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconciler.java
@@ -0,0 +1,116 @@
+package accord.coordinate.tracking;
+
+import accord.burn.TopologyUpdates;
+import accord.impl.IntHashKey;
+import accord.impl.SizeOfIntersectionSorter;
+import accord.impl.TopologyFactory;
+import accord.local.Node.Id;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import accord.topology.TopologyRandomizer;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.*;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class TrackerReconciler<ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+{
+ final Random random;
+ final E[] events;
+ final EnumMap<E, Integer>[] counts;
+ final T tracker;
+ final List<Id> inflight;
+
+ protected TrackerReconciler(Random random, Class<E> events, T tracker, List<Id> inflight)
+ {
+ this.random = random;
+ this.events = events.getEnumConstants();
+ this.tracker = tracker;
+ this.inflight = inflight;
+ this.counts = new EnumMap[tracker.trackers.length];
+ for (int i = 0 ; i < counts.length ; ++i)
+ {
+ counts[i] = new EnumMap<>(events);
+ for (E event : this.events)
+ counts[i].put(event, 0);
+ }
+ }
+
+ Topologies topologies()
+ {
+ return tracker.topologies;
+ }
+
+ void test()
+ {
+ while (true)
+ {
+ Assertions.assertFalse(inflight.isEmpty());
+ E next = events[random.nextInt(events.length)];
+ Id from = inflight.get(random.nextInt(inflight.size()));
+ RequestStatus newStatus = invoke(next, tracker, from);
+ for (int i = 0 ; i < topologies().size() ; ++i)
+ {
+ topologies().get(i).forEachOn(from, (si, s) -> {
+ counts[si].compute(next, (ignore, cur) -> cur + 1);
+ });
+ }
+
+ validate(newStatus);
+ if (newStatus != RequestStatus.NoChange)
+ break;
+ }
+ }
+
+ abstract RequestStatus invoke(E event, T tracker, Id from);
+ abstract void validate(RequestStatus status);
+
+ protected static <ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+ List<TrackerReconciler<ST, T, E>> generate(long seed, BiFunction<Random, Topologies, ? extends TrackerReconciler<ST, T, E>> constructor)
+ {
+ System.out.println("seed: " + seed);
+ Random random = new Random(seed);
+ return topologies(random).map(topologies -> constructor.apply(random, topologies))
+ .collect(Collectors.toList());
+ }
+
+ // TODO: generalise and parameterise topology generation a bit more
+ // TODO: select a subset of the generated topologies to correctly simulate topology consumption logic
+ private static Stream<Topologies> topologies(Random random)
+ {
+ TopologyFactory factory = new TopologyFactory(2 + random.nextInt(3), IntHashKey.ranges(4 + random.nextInt(12)));
+ List<Id> nodes = cluster(factory.rf * (1 + random.nextInt(factory.shardRanges.length - 1)));
+ Topology topology = factory.toTopology(nodes);
+ int count = 1 + random.nextInt(3);
+
+ List<Topologies> result = new ArrayList<>();
+ result.add(new Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology));
+
+ if (count == 1)
+ return result.stream();
+
+ Deque<Topology> topologies = new ArrayDeque<>();
+ topologies.add(topology);
+ TopologyUpdates topologyUpdates = new TopologyUpdates();
+ TopologyRandomizer configRandomizer = new TopologyRandomizer(() -> random, topology, topologyUpdates, (id, top) -> {});
+ while (--count > 0)
+ {
+ Topology next = configRandomizer.updateTopology();
+ while (next == null)
+ next = configRandomizer.updateTopology();
+ topologies.addFirst(next);
+ result.add(new Topologies.Multi(SizeOfIntersectionSorter.SUPPLIER, topologies.toArray(new Topology[0])));
+ }
+ return result.stream();
+ }
+
+ private static List<Id> cluster(int count)
+ {
+ List<Id> cluster = new ArrayList<>();
+ for (int i = 1 ; i <= count ; ++i)
+ cluster.add(new Id(i));
+ return cluster;
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconcilerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconcilerTest.java
new file mode 100644
index 0000000..25ed28a
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/TrackerReconcilerTest.java
@@ -0,0 +1,49 @@
+package accord.coordinate.tracking;
+
+import accord.topology.Topologies;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+import java.util.function.BiFunction;
+
+public class TrackerReconcilerTest
+{
+ @Test
+ public void testReadTracker()
+ {
+ test(10000, ReadTrackerReconciler::new);
+ }
+
+ @Test
+ public void testQuorumTracker()
+ {
+ test(10000, QuorumTrackerReconciler::new);
+ }
+
+ @Test
+ public void testFastPathTracker()
+ {
+ test(10000, FastPathTrackerReconciler::new);
+ }
+
+ @Test
+ public void testRecoveryTracker()
+ {
+ test(10000, RecoveryTrackerReconciler::new);
+ }
+
+ static <ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+ void test(int count, BiFunction<Random, Topologies, ? extends TrackerReconciler<ST, T, E>> constructor)
+ {
+ long seed = System.currentTimeMillis();
+ while (--count >= 0)
+ test(seed++, constructor);
+ }
+
+ static <ST extends ShardTracker, T extends AbstractTracker<ST, ?>, E extends Enum<E>>
+ void test(long seed, BiFunction<Random, Topologies, ? extends TrackerReconciler<ST, T, E>> constructor)
+ {
+ for (TrackerReconciler<?, ?, ?> test : TrackerReconciler.generate(seed, constructor))
+ test.test();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/test/java/accord/impl/TopologyFactory.java
index 3dd450d..d52728b 100644
--- a/accord-core/src/test/java/accord/impl/TopologyFactory.java
+++ b/accord-core/src/test/java/accord/impl/TopologyFactory.java
@@ -32,17 +32,17 @@ public class TopologyFactory
{
public final int rf;
// TODO: convert to KeyRanges
- final KeyRange[] ranges;
+ public final KeyRange[] shardRanges;
- public TopologyFactory(int rf, KeyRange... ranges)
+ public TopologyFactory(int rf, KeyRange... shardRanges)
{
this.rf = rf;
- this.ranges = ranges;
+ this.shardRanges = shardRanges;
}
public Topology toTopology(Node.Id[] cluster)
{
- return TopologyUtils.initialTopology(cluster, KeyRanges.ofSortedAndDeoverlapped(ranges), rf);
+ return TopologyUtils.initialTopology(cluster, KeyRanges.ofSortedAndDeoverlapped(shardRanges), rf);
}
public Topology toTopology(List<Node.Id> cluster)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index a1f215e..1ea9210 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -40,6 +40,7 @@ import accord.burn.BurnTestConfigurationService;
import accord.burn.TopologyUpdates;
import accord.impl.SimpleProgressLog;
import accord.impl.InMemoryCommandStores;
+import accord.impl.SizeOfIntersectionSorter;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
@@ -221,7 +222,8 @@ public class Cluster implements Scheduler
BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get, topologyUpdates);
lookup.put(node, new Node(node, messageSink, configService,
nowSupplier.get(), () -> new ListStore(node), new ListAgent(30L, onFailure),
- randomSupplier.get(), sinks, SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
+ randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
+ SimpleProgressLog::new, InMemoryCommandStores.Synchronized::new));
}
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index 1d1c443..c33bf58 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -61,14 +61,14 @@ public class ListRequest implements Request
}
@Override
- protected Action check(Id from, CheckStatusOk ok)
+ protected Action checkSufficient(Id from, CheckStatusOk ok)
{
++count;
- return ok.saveStatus.hasBeen(PreApplied) ? Action.Accept : Action.Reject;
+ return ok.saveStatus.hasBeen(PreApplied) ? Action.Approve : Action.Reject;
}
@Override
- protected void onDone(Done done, Throwable failure)
+ protected void onDone(CheckShards.Success done, Throwable failure)
{
if (failure != null) callback.accept(null, failure);
else if (merged.saveStatus.hasBeen(Status.Invalidated)) callback.accept(Outcome.Invalidated, null);
@@ -77,7 +77,7 @@ public class ListRequest implements Request
}
@Override
- protected boolean isSufficient(Id from, CheckStatusOk ok)
+ protected boolean isSufficient(CheckStatusOk ok)
{
throw new UnsupportedOperationException();
}
diff --git a/accord-core/src/test/java/accord/impl/mock/EpochSync.java b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
index 276ff3b..ef05c0f 100644
--- a/accord-core/src/test/java/accord/impl/mock/EpochSync.java
+++ b/accord-core/src/test/java/accord/impl/mock/EpochSync.java
@@ -100,14 +100,14 @@ public class EpochSync implements Runnable
public CommandSync(Node node, AbstractRoute route, SyncCommitted message, Topology topology)
{
- this.tracker = new QuorumTracker(new Single(topology.forKeys(route), false));
+ this.tracker = new QuorumTracker(new Single(node.topology().sorter(), topology.forKeys(route)));
node.send(tracker.nodes(), message, this);
}
@Override
public synchronized void onSuccess(Node.Id from, SimpleReply reply)
{
- tracker.success(from);
+ tracker.recordSuccess(from);
if (tracker.hasReachedQuorum())
trySuccess(null);
}
@@ -115,7 +115,7 @@ public class EpochSync implements Runnable
@Override
public synchronized void onFailure(Node.Id from, Throwable failure)
{
- tracker.failure(from);
+ tracker.recordFailure(from);
if (tracker.hasFailed())
tryFailure(failure);
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 544723d..996cd66 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -21,10 +21,7 @@ package accord.impl.mock;
import accord.NetworkFilter;
import accord.api.MessageSink;
import accord.coordinate.Timeout;
-import accord.impl.InMemoryCommandStores;
-import accord.impl.TopologyUtils;
-import accord.local.CommandStores;
-import accord.impl.SimpleProgressLog;
+import accord.impl.*;
import accord.local.Node;
import accord.local.Node.Id;
import accord.primitives.KeyRanges;
@@ -34,7 +31,6 @@ import accord.primitives.TxnId;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
-import accord.impl.TestAgent;
import accord.topology.Topology;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -111,6 +107,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node>
new TestAgent(),
new Random(random.nextLong()),
new ThreadPoolScheduler(),
+ SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
InMemoryCommandStores.SingleThread::new);
}
diff --git a/accord-core/src/test/java/accord/local/CommandTest.java b/accord-core/src/test/java/accord/local/CommandTest.java
index cc3efe8..0b55caf 100644
--- a/accord-core/src/test/java/accord/local/CommandTest.java
+++ b/accord-core/src/test/java/accord/local/CommandTest.java
@@ -146,7 +146,7 @@ public class CommandTest
{
return new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()),
new MockCluster.Clock(100), () -> storeSupport.data, new TestAgent(), new Random(), null,
- ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
+ SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
}
@Test
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 4ade147..0955bf7 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -68,6 +68,7 @@ public class PreAcceptTest
new TestAgent(),
new Random(),
scheduler,
+ SizeOfIntersectionSorter.SUPPLIER,
SimpleProgressLog::new,
InMemoryCommandStores.Synchronized::new);
}
diff --git a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
index 3c5e168..acbfa4e 100644
--- a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
+++ b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java
@@ -18,6 +18,7 @@
package accord.messages;
+import accord.api.TopologySorter;
import accord.primitives.KeyRange;
import accord.primitives.Route;
import accord.topology.Topologies;
@@ -43,7 +44,7 @@ public class TxnRequestScopeTest
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(3, 4, 5), idSet(4, 5)));
- Topologies.Multi topologies = new Topologies.Multi();
+ Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a, b, s)->0);
topologies.add(topology2);
topologies.add(topology1);
@@ -74,7 +75,7 @@ public class TxnRequestScopeTest
shard(range1, idList(4, 5, 6), idSet(4, 5)),
shard(range2, idList(1, 2, 3), idSet(1, 2)) );
- Topologies.Multi topologies = new Topologies.Multi();
+ Topologies.Multi topologies = new Topologies.Multi((TopologySorter.StaticSorter)(a,b,s)->0);
topologies.add(topology2);
topologies.add(topology1);
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index cb68d8c..58890e1 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -18,6 +18,8 @@
package accord.topology;
+import accord.api.TopologySorter;
+import accord.impl.SizeOfIntersectionSorter;
import accord.local.Node;
import accord.primitives.KeyRange;
import accord.primitives.Keys;
@@ -27,6 +29,7 @@ import org.junit.jupiter.api.Test;
import static accord.Utils.*;
import static accord.impl.IntKey.keys;
import static accord.impl.IntKey.range;
+import static accord.impl.SizeOfIntersectionSorter.SUPPLIER;
public class TopologyManagerTest
{
@@ -39,7 +42,7 @@ public class TopologyManagerTest
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
- TopologyManager service = new TopologyManager(ID);
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
Assertions.assertSame(Topology.EMPTY, service.current());
service.onTopologyUpdate(topology1);
@@ -64,7 +67,7 @@ public class TopologyManagerTest
shard(range(100, 200), idList(1, 2, 3), idSet(3, 4)),
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)));
- TopologyManager service = new TopologyManager(ID);
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
@@ -95,7 +98,7 @@ public class TopologyManagerTest
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(1, 2)));
- TopologyManager service = new TopologyManager(ID);
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
service.onTopologyUpdate(topology3);
@@ -128,7 +131,7 @@ public class TopologyManagerTest
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
- TopologyManager service = new TopologyManager(ID);
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
// sync epoch 2
@@ -147,7 +150,7 @@ public class TopologyManagerTest
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
- TopologyManager service = new TopologyManager(ID);
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
Assertions.assertSame(Topology.EMPTY, service.current());
service.onTopologyUpdate(topology1);
@@ -179,7 +182,7 @@ public class TopologyManagerTest
shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
shard(range(200, 300), idList(4, 5, 6), idSet(5, 6)));
- TopologyManager service = new TopologyManager(ID);
+ TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
@@ -193,6 +196,5 @@ public class TopologyManagerTest
Topologies actual = service.withUnsyncedEpochs(keys(150, 250), 2, 2);
Assertions.assertEquals(topologies(topology2, topology(1, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))),
actual);
- Assertions.assertFalse(actual.fastPathPermitted());
}
}
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index 2b6fd84..4be9914 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -27,28 +27,31 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Supplier;
+import java.util.function.*;
+// TODO: add change replication factor
public class TopologyRandomizer
{
private static final Logger logger = LoggerFactory.getLogger(TopologyRandomizer.class);
private final Random random;
- private final Function<Node.Id, Node> lookup;
+ private final BiConsumer<Node.Id, Topology> notifier;
private final List<Topology> epochs = new ArrayList<>();
private final Map<Node.Id, KeyRanges> previouslyReplicated = new HashMap<>();
private final TopologyUpdates topologyUpdates;
public TopologyRandomizer(Supplier<Random> randomSupplier, Topology initialTopology, TopologyUpdates topologyUpdates, Function<Node.Id, Node> lookup)
+ {
+ this(randomSupplier, initialTopology, topologyUpdates, (id, topology) -> topologyUpdates.notify(lookup.apply(id), topology.nodes(), topology));
+ }
+ public TopologyRandomizer(Supplier<Random> randomSupplier, Topology initialTopology, TopologyUpdates topologyUpdates, BiConsumer<Node.Id, Topology> notifier)
{
this.random = randomSupplier.get();
this.topologyUpdates = topologyUpdates;
- this.lookup = lookup;
this.epochs.add(Topology.EMPTY);
this.epochs.add(initialTopology);
for (Node.Id node : initialTopology.nodes())
previouslyReplicated.put(node, initialTopology.rangesForNode(node));
+ this.notifier = notifier;
}
private enum UpdateType
@@ -209,13 +212,17 @@ public class TopologyRandomizer
return false;
}
- public synchronized void maybeUpdateTopology()
- {
+ public synchronized void maybeUpdateTopology() {
// if we don't limit the number of pending topology changes in flight,
// the topology randomizer will keep the burn test busy indefinitely
if (topologyUpdates.pendingTopologies() > 5 || random.nextInt(200) != 0)
return;
+ updateTopology();
+ }
+
+ public synchronized Topology updateTopology()
+ {
Topology current = epochs.get(epochs.size() - 1);
Shard[] shards = current.unsafeGetShards().clone();
int mutations = random.nextInt(current.size());
@@ -231,7 +238,7 @@ public class TopologyRandomizer
// In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty
// convoluted without the ability to jettison epochs.
if (reassignsRanges(current, shards, previouslyReplicated))
- return;
+ return null;
Map<Node.Id, KeyRanges> nextAdditions = getAdditions(current, nextTopology);
for (Map.Entry<Node.Id, KeyRanges> entry : nextAdditions.entrySet())
@@ -248,7 +255,7 @@ public class TopologyRandomizer
List<Node.Id> nodes = new ArrayList<>(nextTopology.nodes());
int originatorIdx = random.nextInt(nodes.size());
- Node originator = lookup.apply(nodes.remove(originatorIdx));
- topologyUpdates.notify(originator, nextTopology.nodes(), nextTopology);
+ notifier.accept(nodes.remove(originatorIdx), nextTopology);
+ return nextTopology;
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 97a765d..64b0ec0 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -32,6 +32,7 @@ import java.util.function.LongSupplier;
import java.util.function.Supplier;
import accord.coordinate.Timeout;
+import accord.impl.SizeOfIntersectionSorter;
import accord.local.CommandStores;
import accord.impl.SimpleProgressLog;
import accord.impl.InMemoryCommandStores;
@@ -296,7 +297,8 @@ public class Cluster implements Scheduler
MessageSink messageSink = sinks.create(node, randomSupplier.get());
lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology),
nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE,
- randomSupplier.get(), sinks, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
+ randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER,
+ SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new));
}
List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 77531c3..d861bf7 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -34,6 +34,7 @@ import java.util.function.Supplier;
import accord.coordinate.Timeout;
import accord.impl.SimpleProgressLog;
import accord.impl.InMemoryCommandStores;
+import accord.impl.SizeOfIntersectionSorter;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
@@ -160,7 +161,7 @@ public class Main
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis,
MaelstromStore::new, MaelstromAgent.INSTANCE, new Random(), scheduler,
- SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
+ SizeOfIntersectionSorter.SUPPLIER, SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new);
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org