You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2021/11/01 19:29:19 UTC
[cassandra-accord] branch trunk updated: Instance refactor
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new aec91ad Instance refactor
aec91ad is described below
commit aec91ad7e2ca439d857d928658346d93c342f51f
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Oct 26 04:26:16 2021 -0500
Instance refactor
* add topology decoupled command shard
* add range splitting
* add locally sharded command metadta classes
* rework topology changes to include local topology
* make shard collections immutable
* add multi-shard response tracker
Co-authored-by: Blake Eggleston <be...@apple.com>
Co-authored-by: Benedict Elliott Smith <be...@apple.com>
---
accord-core/src/main/java/accord/api/KeyRange.java | 104 ++++-
accord-core/src/main/java/accord/api/Read.java | 4 +-
accord-core/src/main/java/accord/api/Write.java | 3 +-
.../main/java/accord/coordinate/AcceptPhase.java | 27 +-
.../src/main/java/accord/coordinate/Agree.java | 78 ++--
.../src/main/java/accord/coordinate/Execute.java | 83 ++--
.../src/main/java/accord/coordinate/Preempted.java | 2 +-
.../src/main/java/accord/coordinate/Recover.java | 87 ++--
.../src/main/java/accord/coordinate/Timeout.java | 2 +-
.../coordinate/tracking/AbstractQuorumTracker.java | 83 ++++
.../tracking/AbstractResponseTracker.java | 80 ++++
.../coordinate/tracking/FastPathTracker.java | 46 +++
.../accord/coordinate/tracking/QuorumTracker.java | 17 +
.../accord/coordinate/tracking/ReadTracker.java | 146 +++++++
.../src/main/java/accord/local/Command.java | 22 +-
.../src/main/java/accord/local/CommandStore.java | 449 +++++++++++++++++++++
.../src/main/java/accord/local/CommandStores.java | 154 +++++++
.../src/main/java/accord/local/CommandsForKey.java | 10 +-
.../src/main/java/accord/local/Instance.java | 58 ---
accord-core/src/main/java/accord/local/Node.java | 41 +-
.../main/java/accord/messages/BeginRecovery.java | 2 -
.../src/main/java/accord/messages/PreAccept.java | 8 +-
.../src/main/java/accord/messages/ReadData.java | 20 +-
.../main/java/accord/messages/WaitOnCommit.java | 9 +-
.../src/main/java/accord/topology/KeyRanges.java | 148 ++++++-
.../src/main/java/accord/topology/Shard.java | 14 +-
.../src/main/java/accord/topology/Topology.java | 99 ++++-
.../src/main/java/accord/txn/Dependencies.java | 11 +-
accord-core/src/main/java/accord/txn/Txn.java | 48 ++-
accord-core/src/main/java/accord/txn/Writes.java | 6 +-
.../main/java/accord/utils/IndexedPredicate.java | 6 +
accord-core/src/test/java/accord/Utils.java | 14 +
.../src/test/java/accord/burn/BurnTest.java | 1 -
.../java/accord/coordinate/CoordinateTest.java | 56 ++-
.../accord/coordinate/PreacceptTrackerTest.java | 135 +++++++
.../test/java/accord/coordinate/RecoverTest.java | 86 ++--
.../coordinate/tracking/QuorumTrackerTest.java | 107 +++++
.../coordinate/tracking/ReadTrackerTest.java | 131 ++++++
.../src/test/java/accord/impl/IntHashKey.java | 30 ++
accord-core/src/test/java/accord/impl/IntKey.java | 35 ++
.../src/test/java/accord/impl/TopologyFactory.java | 26 +-
.../src/test/java/accord/impl/TopologyUtils.java | 57 +++
.../test/java/accord/impl/TopologyUtilsTest.java | 17 +
.../src/test/java/accord/impl/basic/Cluster.java | 43 +-
.../src/test/java/accord/impl/basic/NodeSink.java | 2 +-
.../src/test/java/accord/impl/list/ListRead.java | 18 +-
.../src/test/java/accord/impl/list/ListWrite.java | 14 +-
.../test/java/accord/impl/mock/MockCluster.java | 40 +-
.../test/java/accord/local/CommandStoreTest.java | 92 +++++
.../test/java/accord/messages/PreAcceptTest.java | 110 ++---
.../test/java/accord/topology/TopologyTest.java | 25 +-
.../src/test/java/accord/utils/KeyRangeTest.java | 159 +++++++-
.../src/test/java/accord/utils/KeyRangesTest.java | 52 +++
.../src/main/java/accord/maelstrom/Cluster.java | 40 +-
.../main/java/accord/maelstrom/MaelstromKey.java | 33 ++
.../main/java/accord/maelstrom/MaelstromRead.java | 18 +-
.../main/java/accord/maelstrom/MaelstromWrite.java | 14 +-
.../src/main/java/accord/maelstrom/Main.java | 66 +--
58 files changed, 2717 insertions(+), 571 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/KeyRange.java b/accord-core/src/main/java/accord/api/KeyRange.java
index 9e6b850..fb11c1e 100644
--- a/accord-core/src/main/java/accord/api/KeyRange.java
+++ b/accord-core/src/main/java/accord/api/KeyRange.java
@@ -1,5 +1,6 @@
package accord.api;
+import accord.topology.KeyRanges;
import accord.txn.Keys;
import com.google.common.base.Preconditions;
@@ -39,6 +40,12 @@ public abstract class KeyRange<K extends Key<K>>
{
return true;
}
+
+ @Override
+ public KeyRange<K> tryMerge(KeyRange<K> that)
+ {
+ return KeyRange.tryMergeExclusiveInclusive(this, that);
+ }
}
public static abstract class StartInclusive<K extends Key<K>> extends KeyRange<K>
@@ -69,6 +76,34 @@ public abstract class KeyRange<K extends Key<K>>
{
return false;
}
+
+ @Override
+ public KeyRange<K> tryMerge(KeyRange<K> that)
+ {
+ return KeyRange.tryMergeExclusiveInclusive(this, that);
+ }
+ }
+
+ private static <K extends Key<K>> KeyRange<K> tryMergeExclusiveInclusive(KeyRange<K> left, KeyRange<K> right)
+ {
+ if (left.getClass() != right.getClass())
+ return null;
+
+ Preconditions.checkArgument(left instanceof EndInclusive || left instanceof StartInclusive);
+
+ int cmp = left.compareIntersecting(right);
+
+ if (cmp == 0)
+ return left.subRange(left.start.compareTo(right.start) < 0 ? left.start : right.start,
+ left.end.compareTo(right.end) > 0 ? left.end : right.end);
+
+ if (cmp > 0 && right.end.equals(left.start))
+ return left.subRange(right.start, left.end);
+
+ if (cmp < 0 && left.end.equals(right.start))
+ return left.subRange(left.start, right.end);
+
+ return null;
}
private final K start;
@@ -95,6 +130,20 @@ public abstract class KeyRange<K extends Key<K>>
public abstract boolean endInclusive();
+ /**
+ * Return a new range covering this and the given range if the ranges are intersecting or touching. That is,
+ * no keys can exist between the touching ends of the range.
+ */
+ public abstract KeyRange<K> tryMerge(KeyRange<K> that);
+
+ public abstract KeyRange<K> subRange(K start, K end);
+
+ /**
+ * Split this range into roughly equally sized subranges
+ * @param count the number of subranges to create
+ */
+ public abstract KeyRanges split(int count);
+
@Override
public boolean equals(Object o)
{
@@ -127,6 +176,44 @@ public abstract class KeyRange<K extends Key<K>>
return compareKey(key) == 0;
}
+
+ /**
+ * Returns a negative integer, zero, or a positive integer if both points of the provided range are less than, the
+ * range intersects this range, or both points are greater than this range
+ */
+ public int compareIntersecting(KeyRange<K> that)
+ {
+ if (this.start.compareTo(that.end) >= 0)
+ return 1;
+ if (this.end.compareTo(that.start) <= 0)
+ return -1;
+ return 0;
+ }
+
+ public boolean fullyContains(KeyRange<K> that)
+ {
+ return that.start.compareTo(this.start) >= 0 && that.end.compareTo(this.end) <= 0;
+ }
+
+ public boolean intersects(Keys keys)
+ {
+ return lowKeyIndex(keys) >= 0;
+ }
+
+ /**
+ * Returns a range covering the overlapping parts of this and the provided range, returns
+ * null if the ranges do not overlap
+ */
+ public KeyRange<K> intersection(KeyRange<K> that)
+ {
+ if (this.compareIntersecting(that) != 0)
+ return null;
+
+ K start = this.start.compareTo(that.start) > 0 ? this.start : that.start;
+ K end = this.end.compareTo(that.end) < 0 ? this.end : that.end;
+ return subRange(start, end);
+ }
+
/**
* returns the index of the first key larger than what's covered by this range
*/
@@ -144,20 +231,25 @@ public abstract class KeyRange<K extends Key<K>>
}
/**
- * returns the index of the lowest key contained in this range
+ * returns the index of the lowest key contained in this range. If the keys object contains no intersecting
+ * keys, <code>(-(<i>insertion point</i>) - 1)</code> is returned. Where <i>insertion point</i> is where an
+ * intersecting key would be inserted into the keys array
* @param keys
*/
- public int lowKeyIndex(Keys keys)
+ public int lowKeyIndex(Keys keys, int lowerBound, int upperBound)
{
if (keys.isEmpty()) return -1;
- int i = keys.search(0, keys.size(), this,
+ int i = keys.search(lowerBound, upperBound, this,
(k, r) -> ((KeyRange) r).compareKey((Key) k) < 0 ? -1 : 1);
- if (i < 0) i = -1 - i;
+ int minIdx = -1 - i;
- if (i == 0 && !containsKey((K) keys.get(0))) i = -1;
+ return (minIdx < keys.size() && containsKey((K) keys.get(minIdx))) ? minIdx : i;
+ }
- return i;
+ public int lowKeyIndex(Keys keys)
+ {
+ return lowKeyIndex(keys, 0, keys.size());
}
}
diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java
index 2149418..fa2780a 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -1,5 +1,7 @@
package accord.api;
+import accord.topology.KeyRanges;
+
/**
* A read to be performed on potentially multiple shards, the inputs of which may be fed to a {@link Query}
*
@@ -7,5 +9,5 @@ package accord.api;
*/
public interface Read
{
- Data read(KeyRange range, Store store);
+ Data read(KeyRanges ranges, Store store);
}
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
index eba020c..1479c86 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -1,5 +1,6 @@
package accord.api;
+import accord.topology.KeyRanges;
import accord.txn.Timestamp;
/**
@@ -9,5 +10,5 @@ import accord.txn.Timestamp;
*/
public interface Write
{
- void apply(KeyRange range, Timestamp executeAt, Store store);
+ void apply(KeyRanges range, Timestamp executeAt, Store store);
}
diff --git a/accord-core/src/main/java/accord/coordinate/AcceptPhase.java b/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
index eb79af7..29f9331 100644
--- a/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
+++ b/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
@@ -4,7 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import accord.messages.Preempted;
+import accord.coordinate.tracking.QuorumTracker;
import accord.txn.Ballot;
import accord.messages.Callback;
import accord.local.Node;
@@ -24,13 +24,11 @@ class AcceptPhase extends CompletableFuture<Agreed>
final Ballot ballot;
final TxnId txnId;
final Txn txn;
- final Shards shards;
+ final Shards shards; // TODO: remove, hide in participants
private List<AcceptOk> acceptOks;
private Timestamp proposed;
- private int[] accepts;
- private int[] failures;
- private int acceptQuorums;
+ private QuorumTracker acceptTracker;
AcceptPhase(Node node, Ballot ballot, TxnId txnId, Txn txn, Shards shards)
{
@@ -45,9 +43,8 @@ class AcceptPhase extends CompletableFuture<Agreed>
{
this.proposed = executeAt;
this.acceptOks = new ArrayList<>();
- this.accepts = new int[shards.size()];
- this.failures = new int[shards.size()];
- node.send(shards, new Accept(ballot, txnId, txn, executeAt, deps), new Callback<AcceptReply>()
+ this.acceptTracker = new QuorumTracker(shards);
+ node.send(acceptTracker.nodes(), new Accept(ballot, txnId, txn, executeAt, deps), new Callback<AcceptReply>()
{
@Override
public void onSuccess(Id from, AcceptReply response)
@@ -58,10 +55,9 @@ class AcceptPhase extends CompletableFuture<Agreed>
@Override
public void onFailure(Id from, Throwable throwable)
{
- shards.forEachOn(from, (i, shard) -> {
- if (++failures[i] >= shard.slowPathQuorumSize)
- completeExceptionally(new accord.messages.Timeout());
- });
+ acceptTracker.recordFailure(from);
+ if (acceptTracker.hasFailed())
+ completeExceptionally(new Timeout());
}
});
}
@@ -79,12 +75,9 @@ class AcceptPhase extends CompletableFuture<Agreed>
AcceptOk ok = (AcceptOk) reply;
acceptOks.add(ok);
- shards.forEachOn(from, txn.keys(), (i, shard) -> {
- if (++accepts[i] == shard.slowPathQuorumSize)
- ++acceptQuorums;
- });
+ acceptTracker.recordSuccess(from);
- if (acceptQuorums == shards.size())
+ if (acceptTracker.hasReachedQuorum())
onAccepted();
}
diff --git a/accord-core/src/main/java/accord/coordinate/Agree.java b/accord-core/src/main/java/accord/coordinate/Agree.java
index 0d726b8..b12f118 100644
--- a/accord-core/src/main/java/accord/coordinate/Agree.java
+++ b/accord-core/src/main/java/accord/coordinate/Agree.java
@@ -4,8 +4,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
-import accord.messages.Preempted;
-import accord.messages.Timeout;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.topology.Shard;
import accord.txn.Ballot;
import accord.messages.Callback;
import accord.local.Node;
@@ -25,19 +25,32 @@ import accord.messages.PreAccept.PreAcceptReply;
*/
class Agree extends AcceptPhase implements Callback<PreAcceptReply>
{
+ static class ShardTracker extends FastPathTracker.FastPathShardTracker
+ {
+ public ShardTracker(Shard shard)
+ {
+ super(shard);
+ }
+
+ @Override
+ public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
+ {
+ return withFastPathTimestamp && shard.fastPathElectorate.contains(node);
+ }
+
+ @Override
+ public boolean hasMetFastPathCriteria()
+ {
+ return fastPathAccepts >= shard.fastPathQuorumSize;
+ }
+ }
+
final Keys keys;
public enum PreacceptOutcome { COMMIT, ACCEPT }
- // TODO: handle reconfigurations
- private int[] preAccepts;
- private int[] fastPathPreAccepts;
- private int[] failures;
- private int[] responsesOutstanding;
+ private final FastPathTracker<ShardTracker> tracker;
- private int preAccepted;
- private int fastPathAccepted;
- private int noOutstandingResponses;
private PreacceptOutcome preacceptOutcome;
private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
@@ -48,22 +61,9 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
{
super(node, Ballot.ZERO, txnId, txn, node.cluster().forKeys(txn.keys()));
this.keys = txn.keys();
- this.failures = new int[shards.size()];
- this.preAccepts = new int[shards.size()];
- this.fastPathPreAccepts = new int[shards.size()];
- this.responsesOutstanding = new int[shards.size()];
- shards.forEach((i, shard) -> {
- this.responsesOutstanding[i] = shard.nodes.size();
- });
-
+ tracker = new FastPathTracker<>(shards, ShardTracker[]::new, ShardTracker::new);
- node.send(shards, new PreAccept(txnId, txn), this);
- }
-
- private void messageReceived(int shard)
- {
- if (--responsesOutstanding[shard] == 0)
- noOutstandingResponses++;
+ node.send(tracker.nodes(), new PreAccept(txnId, txn), this);
}
@Override
@@ -78,11 +78,9 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
if (isDone() || isPreAccepted())
return;
- shards.forEachOn(from, (i, shard) -> {
- messageReceived(i);
- if (++failures[i] >= shard.slowPathQuorumSize)
- completeExceptionally(new Timeout());
- });
+ tracker.recordFailure(from);
+ if (tracker.hasFailed())
+ completeExceptionally(new Timeout());
// if no other responses are expected and the slow quorum has been satisfied, proceed
if (shouldSlowPathAccept())
@@ -105,22 +103,15 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
preAcceptOks.add(ok);
boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
- shards.forEachOn(from, (i, shard) -> {
- messageReceived(i);
- if (fastPath && shard.fastPathElectorate.contains(from) && ++fastPathPreAccepts[i] == shard.fastPathQuorumSize)
- ++fastPathAccepted;
+ tracker.recordSuccess(from, fastPath);
- if (++preAccepts[i] == shard.slowPathQuorumSize)
- ++preAccepted;
- });
-
- if (isFastPathAccepted() || shouldSlowPathAccept())
+ if (tracker.hasMetFastPathCriteria() || shouldSlowPathAccept())
onPreAccepted();
}
private void onPreAccepted()
{
- if (isFastPathAccepted())
+ if (tracker.hasMetFastPathCriteria())
{
preacceptOutcome = PreacceptOutcome.COMMIT;
Dependencies deps = new Dependencies();
@@ -150,14 +141,9 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply>
}
}
- private boolean isFastPathAccepted()
- {
- return fastPathAccepted == shards.size();
- }
-
private boolean shouldSlowPathAccept()
{
- return noOutstandingResponses == shards.size() && preAccepted == shards.size();
+ return !tracker.hasInFlight() && tracker.hasReachedQuorum();
}
private boolean isPreAccepted()
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
index 502e2e0..1ca01e0 100644
--- a/accord-core/src/main/java/accord/coordinate/Execute.java
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -1,10 +1,12 @@
package accord.coordinate;
+import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import accord.api.Data;
-import accord.messages.Preempted;
+import accord.coordinate.tracking.ReadTracker;
import accord.api.Result;
import accord.messages.Callback;
import accord.local.Node;
@@ -12,7 +14,6 @@ import accord.txn.Dependencies;
import accord.messages.Apply;
import accord.messages.ReadData.ReadReply;
import accord.messages.ReadData.ReadWaiting;
-import accord.topology.Shard;
import accord.topology.Shards;
import accord.local.Node.Id;
import accord.txn.Timestamp;
@@ -22,6 +23,7 @@ import accord.txn.Keys;
import accord.messages.Commit;
import accord.messages.ReadData;
import accord.messages.ReadData.ReadOk;
+import com.google.common.base.Preconditions;
class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
{
@@ -32,12 +34,9 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
final Shards shards;
final Keys keys;
final Dependencies deps;
- final int[] attempts;
- final int[] inFlight;
- final boolean[] hasData;
+ final ReadTracker tracker;
private Data data;
final int replicaIndex;
- int count = 0;
private Execute(Node node, Agreed agreed)
{
@@ -48,9 +47,7 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
this.deps = agreed.deps;
this.executeAt = agreed.executeAt;
this.shards = agreed.shards;
- this.attempts = new int[shards.size()];
- this.inFlight = new int[shards.size()];
- this.hasData = new boolean[shards.size()];
+ this.tracker = new ReadTracker(shards);
this.replicaIndex = node.random().nextInt(shards.get(0).nodes.size());
// TODO: perhaps compose these different behaviours differently?
@@ -62,25 +59,21 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
}
else
{
- // TODO: we're sending duplicate commits
- shards.forEach((i, shard) -> {
- for (int n = 0 ; n < shard.nodes.size() ; ++n)
+ Set<Id> readSet = tracker.computeMinimalReadSetAndMarkInflight();
+ for (Node.Id to : tracker.nodes())
+ {
+ boolean read = readSet.contains(to);
+ Commit send = new Commit(txnId, txn, executeAt, agreed.deps, read);
+ if (read)
{
- Id to = shard.nodes.get(n);
- // TODO: Topology needs concept of locality/distance
- boolean read = n == replicaIndex % shard.nodes.size();
- Commit send = new Commit(txnId, txn, executeAt, agreed.deps, read);
- if (read)
- {
- node.send(to, send, this);
- shards.forEachOn(to, (j, s) -> ++inFlight[j]);
- }
- else
- {
- node.send(to, send);
- }
+ node.send(to, send, this);
}
- });
+ else
+ {
+ node.send(to, send);
+ }
+
+ }
}
}
@@ -108,13 +101,9 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
data = data == null ? ((ReadOk) reply).data
: data.merge(((ReadOk) reply).data);
- shards.forEachOn(from, (i, shard) -> {
- --inFlight[i];
- if (!hasData[i]) ++count;
- hasData[i] = true;
- });
+ tracker.recordReadSuccess(from);
- if (count == shards.size())
+ if (tracker.hasCompletedRead())
{
Result result = txn.result(data);
node.send(shards, new Apply(txnId, txn, executeAt, deps, txn.execute(executeAt, data), result));
@@ -127,28 +116,18 @@ class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
{
// try again with another random node
// TODO: API hooks
- if (!(throwable instanceof accord.messages.Timeout))
+ if (!(throwable instanceof Timeout))
throwable.printStackTrace();
- shards.forEachOn(from, (i, shard) -> {
- // TODO: less naive selection of replica to consult
- if (--inFlight[i] == 0 && !hasData[i])
- read(i);
- if (inFlight[i] == 0 && !hasData[i])
- completeExceptionally(throwable);
- });
- }
-
- private void read(int shardIndex)
- {
- Shard shard = shards.get(shardIndex);
- if (attempts[shardIndex] == shard.nodes.size())
- return;
-
- int nodeIndex = (replicaIndex + attempts[shardIndex]++) % shard.nodes.size();
- Node.Id to = shard.nodes.get(nodeIndex);
- shards.forEachOn(to, (i, s) -> ++inFlight[i]);
- node.send(to, new ReadData(txnId, txn), this);
+ tracker.recordReadFailure(from);
+ Set<Id> readFrom = tracker.computeMinimalReadSetAndMarkInflight();
+ if (readFrom == null)
+ {
+ Preconditions.checkState(tracker.hasFailed());
+ completeExceptionally(throwable);
+ }
+ else
+ node.send(readFrom, new ReadData(txnId, txn), this);
}
static CompletionStage<Result> execute(Node instance, Agreed agreed)
diff --git a/accord-core/src/main/java/accord/coordinate/Preempted.java b/accord-core/src/main/java/accord/coordinate/Preempted.java
index ad96801..0157376 100644
--- a/accord-core/src/main/java/accord/coordinate/Preempted.java
+++ b/accord-core/src/main/java/accord/coordinate/Preempted.java
@@ -1,4 +1,4 @@
-package accord.messages;
+package accord.coordinate;
/**
* Thrown when a coordinator is preempted by another recovery
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java
index 816c5a3..2f261ab 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -4,7 +4,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import accord.messages.Preempted;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.topology.Shard;
import accord.txn.Ballot;
import accord.messages.Callback;
import accord.local.Node;
@@ -27,14 +29,11 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
{
class RetryAfterCommits implements Callback<WaitOnCommitOk>
{
- final int[] failures;
- final int[] commits;
- int commitQuorums;
+ final QuorumTracker retryTracker;
RetryAfterCommits(Dependencies waitOn)
{
- commits = new int[waitOn.size()];
- failures = new int[waitOn.size()];
+ retryTracker = new QuorumTracker(shards);
for (Map.Entry<TxnId, Txn> e : waitOn)
node.send(shards, new WaitOnCommit(e.getKey(), e.getValue().keys()), this);
}
@@ -44,15 +43,12 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
{
synchronized (Recover.this)
{
- if (isDone() || commitQuorums == commits.length)
+ if (isDone() || retryTracker.hasReachedQuorum())
return;
- shards.forEachOn(from, (i, shard) -> {
- if (++commits[i] == shard.slowPathQuorumSize)
- ++commitQuorums;
- });
+ retryTracker.recordSuccess(from);
- if (commitQuorums == commits.length)
+ if (retryTracker.hasReachedQuorum())
{
new Recover(node, ballot, txnId, txn, shards).handle((success, failure) -> {
if (success != null) complete(success);
@@ -71,20 +67,42 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
if (isDone())
return;
- shards.forEachOn(from, (i, shard) -> {
- if (++failures[i] >= shard.slowPathQuorumSize)
- completeExceptionally(new accord.messages.Timeout());
- });
+ retryTracker.recordFailure(from);
+ if (retryTracker.hasFailed())
+ completeExceptionally(new Timeout());
}
}
}
+ // TODO: not sure it makes sense to extend FastPathTracker, as intent here is a bit different
+ static class ShardTracker extends FastPathTracker.FastPathShardTracker
+ {
+ int responsesFromElectorate;
+ public ShardTracker(Shard shard)
+ {
+ super(shard);
+ }
+
+ @Override
+ public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
+ {
+ if (!shard.fastPathElectorate.contains(node))
+ return false;
+
+ ++responsesFromElectorate;
+ return withFastPathTimestamp;
+ }
+
+ @Override
+ public boolean hasMetFastPathCriteria()
+ {
+ int fastPathRejections = responsesFromElectorate - fastPathAccepts;
+ return fastPathRejections <= shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
+ }
+ }
+
final List<RecoverOk> recoverOks = new ArrayList<>();
- int[] failure;
- int[] recovery;
- int[] recoveryWithFastPath;
- int recoveryWithFastPathQuorums = 0;
- int recoveryQuorums = 0;
+ final FastPathTracker<ShardTracker> tracker;
public Recover(Node node, Ballot ballot, TxnId txnId, Txn txn)
{
@@ -94,16 +112,14 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
private Recover(Node node, Ballot ballot, TxnId txnId, Txn txn, Shards shards)
{
super(node, ballot, txnId, txn, shards);
- this.failure = new int[this.shards.size()];
- this.recovery = new int[this.shards.size()];
- this.recoveryWithFastPath = new int[this.shards.size()];
- node.send(this.shards, new BeginRecovery(txnId, txn, ballot), this);
+ tracker = new FastPathTracker<>(shards, ShardTracker[]::new, ShardTracker::new);
+ node.send(tracker.nodes(), new BeginRecovery(txnId, txn, ballot), this);
}
@Override
public synchronized void onSuccess(Id from, RecoverReply response)
{
- if (isDone() || recoveryQuorums == shards.size())
+ if (isDone() || tracker.hasReachedQuorum())
return;
if (!response.isOK())
@@ -115,15 +131,9 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
RecoverOk ok = (RecoverOk) response;
recoverOks.add(ok);
boolean fastPath = ok.executeAt.compareTo(txnId) == 0;
- shards.forEachOn(from, (i, shard) -> {
- if (fastPath && ++recoveryWithFastPath[i] == shard.recoveryFastPathSize)
- ++recoveryWithFastPathQuorums;
-
- if (++recovery[i] == shard.slowPathQuorumSize)
- ++recoveryQuorums;
- });
+ tracker.recordSuccess(from, fastPath);
- if (recoveryQuorums == shards.size())
+ if (tracker.hasReachedQuorum())
recover();
}
@@ -173,7 +183,7 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
}
Timestamp executeAt;
- if (rejectsFastPath || recoveryWithFastPathQuorums < shards.size())
+ if (rejectsFastPath || !tracker.hasMetFastPathCriteria())
{
executeAt = maxExecuteAt;
}
@@ -197,9 +207,8 @@ class Recover extends AcceptPhase implements Callback<RecoverReply>
if (isDone())
return;
- shards.forEachOn(from, (i, shard) -> {
- if (++failure[i] >= shard.slowPathQuorumSize)
- completeExceptionally(new accord.messages.Timeout());
- });
+ tracker.recordFailure(from);
+ if (tracker.hasFailed())
+ completeExceptionally(new Timeout());
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Timeout.java b/accord-core/src/main/java/accord/coordinate/Timeout.java
index d405fa7..c706ff2 100644
--- a/accord-core/src/main/java/accord/coordinate/Timeout.java
+++ b/accord-core/src/main/java/accord/coordinate/Timeout.java
@@ -1,4 +1,4 @@
-package accord.messages;
+package accord.coordinate;
/**
* Thrown when a transaction exceeds its specified timeout for obtaining a result for a client
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
new file mode 100644
index 0000000..b48e1a3
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractQuorumTracker.java
@@ -0,0 +1,83 @@
+package accord.coordinate.tracking;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Shards;
+
+public class AbstractQuorumTracker<T extends AbstractQuorumTracker.QuorumShardTracker> extends AbstractResponseTracker<T>
+{
+ static class QuorumShardTracker extends ShardTracker
+ {
+ private final Set<Node.Id> inflight;
+ private int success = 0;
+ private int failures = 0;
+ public QuorumShardTracker(Shard shard)
+ {
+ super(shard);
+ this.inflight = new HashSet<>(shard.nodes);
+ }
+
+ public boolean onSuccess(Node.Id id)
+ {
+ if (!inflight.remove(id))
+ return false;
+ success++;
+ return true;
+ }
+
+ boolean onFailure(Node.Id id)
+ {
+ if (!inflight.remove(id))
+ return false;
+ failures++;
+ return true;
+ }
+
+ boolean hasFailed()
+ {
+ return failures >= shard.slowPathQuorumSize;
+ }
+
+ boolean hasReachedQuorum()
+ {
+ return success >= shard.slowPathQuorumSize;
+ }
+
+ boolean hasInFlight()
+ {
+ return !inflight.isEmpty();
+ }
+ }
+
+ public AbstractQuorumTracker(Shards shards, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+ {
+ super(shards, arrayFactory, trackerFactory);
+ }
+
+ // TODO: refactor to return true if this call caused the state change to failed
+ public void recordFailure(Node.Id node)
+ {
+ forEachTrackerForNode(node, QuorumShardTracker::onFailure);
+ }
+
+ public boolean hasReachedQuorum()
+ {
+ return all(QuorumShardTracker::hasReachedQuorum);
+ }
+
+ public boolean hasFailed()
+ {
+ return any(QuorumShardTracker::hasFailed);
+ }
+
+ public boolean hasInFlight()
+ {
+ return any(QuorumShardTracker::hasInFlight);
+ }
+
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
new file mode 100644
index 0000000..6989e9f
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java
@@ -0,0 +1,80 @@
+package accord.coordinate.tracking;
+
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+
+abstract class AbstractResponseTracker<T extends AbstractResponseTracker.ShardTracker>
+{
+ private final Shards shards;
+ private final T[] trackers;
+
+ static class ShardTracker
+ {
+ public final Shard shard;
+
+ public ShardTracker(Shard shard)
+ {
+ this.shard = shard;
+ }
+ }
+
+ public AbstractResponseTracker(Shards shards, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+ {
+ this.shards = shards;
+ this.trackers = arrayFactory.apply(shards.size());
+ shards.forEach((i, shard) -> trackers[i] = trackerFactory.apply(shard));
+ }
+
+ void forEachTrackerForNode(Node.Id node, BiConsumer<T, Node.Id> consumer)
+ {
+ shards.forEachOn(node, (i, shard) -> consumer.accept(trackers[i], node));
+ }
+
+ int matchingTrackersForNode(Node.Id node, Predicate<T> consumer)
+ {
+ return shards.matchesOn(node, (i, shard) -> consumer.test(trackers[i]));
+ }
+
+ boolean all(Predicate<T> predicate)
+ {
+ for (T tracker : trackers)
+ if (!predicate.test(tracker))
+ return false;
+ return true;
+ }
+
+ boolean any(Predicate<T> predicate)
+ {
+ for (T tracker : trackers)
+ if (predicate.test(tracker))
+ return true;
+ return false;
+ }
+
+ <V> V accumulate(BiFunction<T, V, V> function, V start)
+ {
+ for (T tracker : trackers)
+ start = function.apply(tracker, start);
+ return start;
+ }
+
+ public Set<Node.Id> nodes()
+ {
+ return shards.nodes();
+ }
+
+ @VisibleForTesting
+ public T unsafeGet(int i)
+ {
+ return trackers[i];
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
new file mode 100644
index 0000000..b10ae83
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -0,0 +1,46 @@
+package accord.coordinate.tracking;
+
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Shards;
+
+public class FastPathTracker<T extends FastPathTracker.FastPathShardTracker> extends AbstractQuorumTracker<T>
+{
+ public abstract static class FastPathShardTracker extends QuorumTracker.QuorumShardTracker
+ {
+ protected int fastPathAccepts = 0;
+
+ public FastPathShardTracker(Shard shard)
+ {
+ super(shard);
+ }
+
+ public abstract boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp);
+
+ public void onSuccess(Node.Id node, boolean withFastPathTimestamp)
+ {
+ if (onSuccess(node) && includeInFastPath(node, withFastPathTimestamp))
+ fastPathAccepts++;
+ }
+
+ public abstract boolean hasMetFastPathCriteria();
+ }
+
+ public FastPathTracker(Shards shards, IntFunction<T[]> arrayFactory, Function<Shard, T> trackerFactory)
+ {
+ super(shards, arrayFactory, trackerFactory);
+ }
+
+ public void recordSuccess(Node.Id node, boolean withFastPathTimestamp)
+ {
+ forEachTrackerForNode(node, (tracker, n) -> tracker.onSuccess(n, withFastPathTimestamp));
+ }
+
+ public boolean hasMetFastPathCriteria()
+ {
+ return all(FastPathShardTracker::hasMetFastPathCriteria);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
new file mode 100644
index 0000000..b36b1ef
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumTracker.java
@@ -0,0 +1,17 @@
+package accord.coordinate.tracking;
+
+import accord.local.Node;
+import accord.topology.Shards;
+
+public class QuorumTracker extends AbstractQuorumTracker<QuorumTracker.QuorumShardTracker>
+{
+ public QuorumTracker(Shards shards)
+ {
+ super(shards, QuorumShardTracker[]::new, QuorumShardTracker::new);
+ }
+
+ public void recordSuccess(Node.Id node)
+ {
+ forEachTrackerForNode(node, QuorumShardTracker::onSuccess);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
new file mode 100644
index 0000000..d24f5b7
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -0,0 +1,146 @@
+package accord.coordinate.tracking;
+
+import accord.local.Node.Id;
+import accord.topology.Shard;
+import accord.topology.Shards;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+
+public class ReadTracker extends AbstractResponseTracker<ReadTracker.ReadShardTracker>
+{
+ static class ReadShardTracker extends AbstractResponseTracker.ShardTracker
+ {
+ private final Set<Id> inflight = new HashSet<>();
+ private boolean hasData = false;
+ private int contacted;
+
+ public ReadShardTracker(Shard shard)
+ {
+ super(shard);
+ }
+
+ public void recordInflightRead(Id node)
+ {
+ if (!inflight.add(node))
+ throw new IllegalStateException();
+ ++contacted;
+ }
+
+ public void recordReadSuccess(Id node)
+ {
+ Preconditions.checkArgument(shard.nodes.contains(node));
+ inflight.remove(node);
+ hasData = true;
+ }
+
+ public boolean shouldRead()
+ {
+ return !hasData && inflight.isEmpty();
+ }
+
+ public void recordReadFailure(Id node)
+ {
+ inflight.remove(node);
+ }
+
+ public boolean hasCompletedRead()
+ {
+ return hasData;
+ }
+
+ public boolean hasFailed()
+ {
+ return !hasData && inflight.isEmpty() && contacted == shard.nodes.size();
+ }
+ }
+
+ private final List<Id> candidates;
+
+ public ReadTracker(Shards shards)
+ {
+ super(shards, ReadShardTracker[]::new, ReadShardTracker::new);
+ candidates = new ArrayList<>(shards.nodes());
+ }
+
+ @VisibleForTesting
+ void recordInflightRead(Id node)
+ {
+ forEachTrackerForNode(node, ReadShardTracker::recordInflightRead);
+ }
+
+ public void recordReadSuccess(Id node)
+ {
+ forEachTrackerForNode(node, ReadShardTracker::recordReadSuccess);
+ }
+
+ public void recordReadFailure(Id node)
+ {
+ forEachTrackerForNode(node, ReadShardTracker::recordReadFailure);
+ }
+
+ public boolean hasCompletedRead()
+ {
+ return all(ReadShardTracker::hasCompletedRead);
+ }
+
+ public boolean hasFailed()
+ {
+ return any(ReadShardTracker::hasFailed);
+ }
+
+ private int intersectionSize(Id node, Set<ReadShardTracker> target)
+ {
+ return matchingTrackersForNode(node, target::contains);
+ }
+
+ private int compareIntersections(Id left, Id right, Set<ReadShardTracker> target)
+ {
+ return Integer.compare(intersectionSize(left, target), intersectionSize(right, target));
+ }
+
+ /**
+ * Return the smallest set of nodes needed to satisfy required reads.
+ *
+ * Returns null if the read cannot be completed.
+ */
+ public Set<Id> computeMinimalReadSetAndMarkInflight()
+ {
+ Set<ReadShardTracker> toRead = accumulate((tracker, accumulate) -> {
+ if (!tracker.shouldRead())
+ return accumulate;
+
+ if (accumulate == null)
+ accumulate = new HashSet<>();
+
+ accumulate.add(tracker);
+ return accumulate;
+ }, null);
+
+ if (toRead == null)
+ return Collections.emptySet();
+
+ assert !toRead.isEmpty();
+ Set<Id> nodes = new HashSet<>();
+ while (!toRead.isEmpty())
+ {
+ if (candidates.isEmpty())
+ return null;
+
+ // TODO: Topology needs concept of locality/distance
+ candidates.sort((a, b) -> compareIntersections(a, b, toRead));
+
+ int i = candidates.size() - 1;
+ Id node = candidates.get(i);
+ nodes.add(node);
+ recordInflightRead(node);
+ candidates.remove(i);
+ forEachTrackerForNode(node, (tracker, ignore) -> toRead.remove(tracker));
+ }
+
+ return nodes;
+ }
+
+}
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index e8d6891..9eca55c 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -22,7 +22,7 @@ import static accord.local.Status.ReadyToExecute;
public class Command implements Listener, Consumer<Listener>
{
- public final Instance instance;
+ public final CommandStore commandStore;
private final TxnId txnId;
private Txn txn;
private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
@@ -38,9 +38,9 @@ public class Command implements Listener, Consumer<Listener>
private final Listeners listeners = new Listeners();
- public Command(Instance instance, TxnId id)
+ public Command(CommandStore commandStore, TxnId id)
{
- this.instance = instance;
+ this.commandStore = commandStore;
this.txnId = id;
}
@@ -109,17 +109,17 @@ public class Command implements Listener, Consumer<Listener>
if (hasBeen(PreAccepted))
return true;
- Timestamp max = txn.maxConflict(instance);
+ Timestamp max = txn.maxConflict(commandStore);
// unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
// - use a global logical clock to issue new timestamps; or
// - assign each shard _and_ process a unique id, and use both as components of the timestamp
- Timestamp witnessed = txnId.compareTo(max) > 0 ? txnId : instance.node().uniqueNow(max);
+ Timestamp witnessed = txnId.compareTo(max) > 0 ? txnId : commandStore.uniqueNow(max);
this.txn = txn;
this.executeAt = witnessed;
this.status = PreAccepted;
- txn.register(instance, this);
+ txn.register(commandStore, this);
listeners.forEach(this);
return true;
}
@@ -149,7 +149,7 @@ public class Command implements Listener, Consumer<Listener>
if (executeAt.equals(this.executeAt))
return false;
- instance.node().agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
+ commandStore.agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
}
witness(txn);
@@ -159,9 +159,9 @@ public class Command implements Listener, Consumer<Listener>
this.waitingOnCommit = new TreeMap<>();
this.waitingOnApply = new TreeMap<>();
- for (TxnId id : savedDeps().on(instance.shard))
+ for (TxnId id : savedDeps().on(commandStore))
{
- Command command = instance.command(id);
+ Command command = commandStore.command(id);
switch (command.status)
{
default:
@@ -204,7 +204,7 @@ public class Command implements Listener, Consumer<Listener>
else if (!hasBeen(Committed))
commit(txn, deps, executeAt);
else if (!executeAt.equals(this.executeAt))
- instance.node().agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
+ commandStore.agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
this.executeAt = executeAt;
this.writes = writes;
@@ -281,7 +281,7 @@ public class Command implements Listener, Consumer<Listener>
listeners.forEach(this);
break;
case Executed:
- writes.apply(instance);
+ writes.apply(commandStore);
status = Applied;
listeners.forEach(this);
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java
new file mode 100644
index 0000000..f064e5e
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -0,0 +1,449 @@
+package accord.local;
+
+import accord.api.Agent;
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.api.Store;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import accord.topology.Topology;
+import accord.txn.Keys;
+import accord.txn.Timestamp;
+import accord.txn.TxnId;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Single threaded internal shard of accord transaction metadata
+ */
+public abstract class CommandStore
+{
+ public interface Factory
+ {
+ CommandStore create(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store);
+ Factory SYNCHRONIZED = Synchronized::new;
+ Factory SINGLE_THREAD = SingleThread::new;
+ Factory SINGLE_THREAD_DEBUG = SingleThreadDebug::new;
+ }
+
+ private final int index;
+ private final Node.Id nodeId;
+ private final Function<Timestamp, Timestamp> uniqueNow;
+ private final Agent agent;
+ private final Store store;
+
+ /**
+ * maps ranges handled by this command store to their current shards by index
+ */
+ static class RangeMapping
+ {
+ private static final RangeMapping EMPTY = new RangeMapping(KeyRanges.EMPTY, new Shard[0], Shards.EMPTY);
+ final KeyRanges ranges;
+ final Shard[] shards;
+ final Topology topology;
+
+ public RangeMapping(KeyRanges ranges, Shard[] shards, Topology topology)
+ {
+ Preconditions.checkArgument(ranges.size() == shards.length);
+ this.ranges = ranges;
+ this.shards = shards;
+ this.topology = topology;
+ }
+
+ private static class Builder
+ {
+ private final Topology localTopology;
+ private final List<KeyRange> ranges;
+ private final List<Shard> shards;
+
+ public Builder(int minSize, Topology localTopology)
+ {
+ this.localTopology = localTopology;
+ this.ranges = new ArrayList<>(minSize);
+ this.shards = new ArrayList<>(minSize);
+ }
+
+ public void addMapping(KeyRange range, Shard shard)
+ {
+ Preconditions.checkArgument(shard.range.fullyContains(range));
+ ranges.add(range);
+ shards.add(shard);
+ }
+
+ public RangeMapping build()
+ {
+ return new RangeMapping(new KeyRanges(ranges), shards.toArray(Shard[]::new), localTopology);
+ }
+ }
+ }
+
+ public CommandStore(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+ {
+ this.index = index;
+ this.nodeId = nodeId;
+ this.uniqueNow = uniqueNow;
+ this.agent = agent;
+ this.store = store;
+ }
+
+ private volatile RangeMapping rangeMap = RangeMapping.EMPTY;
+
+
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new TreeMap<>();
+
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new Command(this, id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key, ignore -> new CommandsForKey());
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ public Store store()
+ {
+ return store;
+ }
+
+ public Timestamp uniqueNow(Timestamp atLeast)
+ {
+ return uniqueNow.apply(atLeast);
+ }
+
+ public Agent agent()
+ {
+ return agent;
+ }
+
+ public Node.Id nodeId()
+ {
+ return nodeId;
+ }
+
+ public KeyRanges ranges()
+ {
+ // TODO: check thread safety of callers
+ return rangeMap.ranges;
+ }
+
+ public Set<Node.Id> nodesFor(Command command)
+ {
+ RangeMapping mapping = rangeMap;
+ Keys keys = command.txn().keys;
+
+ Set<Node.Id> result = new HashSet<>();
+ int lowerBound = 0;
+ for (int i=0; i<mapping.ranges.size(); i++)
+ {
+ KeyRange range = mapping.ranges.get(i);
+ int lowKeyIdx = range.lowKeyIndex(keys, lowerBound, keys.size());
+
+ if (lowKeyIdx < -keys.size())
+ break;
+
+ if (lowKeyIdx < 0)
+ {
+ // all remaining keys are greater than this range, so go to the next one
+ lowerBound = -1 - lowKeyIdx;
+ continue;
+ }
+
+ // otherwise this range intersects with the txn, so add it's shard's endpoings
+ // TODO: filter pending nodes for reads
+ result.addAll(mapping.shards[i].nodes);
+ lowerBound = lowKeyIdx;
+ }
+
+ return result;
+ }
+
+ static RangeMapping mapRanges(KeyRanges mergedRanges, Topology localTopology)
+ {
+ RangeMapping.Builder builder = new RangeMapping.Builder(mergedRanges.size(), localTopology);
+ int shardIdx = 0;
+ for (int rangeIdx=0; rangeIdx<mergedRanges.size(); rangeIdx++)
+ {
+ KeyRange mergedRange = mergedRanges.get(rangeIdx);
+ while (shardIdx < localTopology.size())
+ {
+ Shard shard = localTopology.get(shardIdx);
+
+ int cmp = shard.range.compareIntersecting(mergedRange);
+ if (cmp > 0)
+ throw new IllegalStateException("mapped shards should always be intersecting or greater than the current shard");
+
+ if (cmp < 0)
+ {
+ shardIdx++;
+ continue;
+ }
+
+ if (shard.range.fullyContains(mergedRange))
+ {
+ builder.addMapping(mergedRange, shard);
+ break;
+ }
+ else
+ {
+ KeyRange intersection = mergedRange.intersection(shard.range);
+ Preconditions.checkState(intersection.start().equals(mergedRange.start()));
+ builder.addMapping(intersection, shard);
+ mergedRange = mergedRange.subRange(intersection.end(), mergedRange.end());
+ shardIdx++;
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ void updateTopology(Topology topology, KeyRanges added, KeyRanges removed)
+ {
+ KeyRanges newRanges = rangeMap.ranges.difference(removed).union(added).mergeTouching();
+ rangeMap = mapRanges(newRanges, topology);
+
+ for (KeyRange range : removed)
+ {
+ NavigableMap<Key, CommandsForKey> subMap = commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive());
+ Iterator<Key> keyIterator = subMap.keySet().iterator();
+ while (keyIterator.hasNext())
+ {
+ Key key = keyIterator.next();
+ CommandsForKey forKey = commandsForKey.get(key);
+ if (forKey != null)
+ {
+ for (Command command : forKey)
+ if (command.txn() != null && !rangeMap.ranges.intersects(command.txn().keys))
+ commands.remove(command.txnId());
+ }
+ keyIterator.remove();
+ }
+ }
+ }
+
+ public int index()
+ {
+ return index;
+ }
+
+ public boolean intersects(Keys keys)
+ {
+ return rangeMap.ranges.intersects(keys);
+ }
+
+ public static void onEach(Collection<CommandStore> stores, Consumer<? super CommandStore> consumer)
+ {
+ for (CommandStore store : stores)
+ store.process(consumer);
+ }
+
+ <R> void processInternal(Function<? super CommandStore, R> function, CompletableFuture<R> future)
+ {
+ try
+ {
+ future.complete(function.apply(this));
+ }
+ catch (Throwable e)
+ {
+ future.completeExceptionally(e);
+ }
+ }
+
+ void processInternal(Consumer<? super CommandStore> consumer, CompletableFuture<Void> future)
+ {
+ try
+ {
+ consumer.accept(this);
+ future.complete(null);
+ }
+ catch (Throwable e)
+ {
+ future.completeExceptionally(e);
+ }
+ }
+
+ public abstract <R> CompletionStage<R> process(Function<? super CommandStore, R> function);
+
+ public abstract CompletionStage<Void> process(Consumer<? super CommandStore> consumer);
+
+ public abstract void shutdown();
+
+ public static class Synchronized extends CommandStore
+ {
+ public Synchronized(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+ {
+ super(index, nodeId, uniqueNow, agent, store);
+ }
+
+ @Override
+ public synchronized <R> CompletionStage<R> process(Function<? super CommandStore, R> func)
+ {
+ CompletableFuture<R> future = new CompletableFuture<>();
+ processInternal(func, future);
+ return future;
+ }
+
+ @Override
+ public synchronized CompletionStage<Void> process(Consumer<? super CommandStore> consumer)
+ {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ processInternal(consumer, future);
+ return future;
+ }
+
+ @Override
+ public void shutdown() {}
+ }
+
+ public static class SingleThread extends CommandStore
+ {
+ private final ExecutorService executor;
+
+ private class FunctionWrapper<R> extends CompletableFuture<R> implements Runnable
+ {
+ private final Function<? super CommandStore, R> function;
+
+ public FunctionWrapper(Function<? super CommandStore, R> function)
+ {
+ this.function = function;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(function, this);
+ }
+ }
+
+ private class ConsumerWrapper extends CompletableFuture<Void> implements Runnable
+ {
+ private final Consumer<? super CommandStore> consumer;
+
+ public ConsumerWrapper(Consumer<? super CommandStore> consumer)
+ {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run()
+ {
+ processInternal(consumer, this);
+ }
+ }
+
+ public SingleThread(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+ {
+ super(index, nodeId, uniqueNow, agent, store);
+ executor = Executors.newSingleThreadExecutor(r -> {
+ Thread thread = new Thread(r);
+ thread.setName(CommandStore.class.getSimpleName() + '[' + nodeId + ':' + index + ']');
+ return thread;
+ });
+ }
+
+ @Override
+ public <R> CompletionStage<R> process(Function<? super CommandStore, R> function)
+ {
+ FunctionWrapper<R> future = new FunctionWrapper<>(function);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public CompletionStage<Void> process(Consumer<? super CommandStore> consumer)
+ {
+ ConsumerWrapper future = new ConsumerWrapper(consumer);
+ executor.execute(future);
+ return future;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ executor.shutdown();
+ }
+ }
+
+ public static class SingleThreadDebug extends SingleThread
+ {
+ private final AtomicReference<Thread> expectedThread = new AtomicReference<>();
+
+ public SingleThreadDebug(int index, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store)
+ {
+ super(index, nodeId, uniqueNow, agent, store);
+ }
+
+ private void assertThread()
+ {
+ Thread current = Thread.currentThread();
+ Thread expected;
+ while (true)
+ {
+ expected = expectedThread.get();
+ if (expected != null)
+ break;
+ expectedThread.compareAndSet(null, Thread.currentThread());
+ }
+ Preconditions.checkState(expected == current);
+ }
+
+ @Override
+ public Command command(TxnId txnId)
+ {
+ assertThread();
+ return super.command(txnId);
+ }
+
+ @Override
+ public boolean hasCommand(TxnId txnId)
+ {
+ assertThread();
+ return super.hasCommand(txnId);
+ }
+
+ @Override
+ public CommandsForKey commandsForKey(Key key)
+ {
+ assertThread();
+ return super.commandsForKey(key);
+ }
+
+ @Override
+ public boolean hasCommandsForKey(Key key)
+ {
+ assertThread();
+ return super.hasCommandsForKey(key);
+ }
+
+ @Override
+ <R> void processInternal(Function<? super CommandStore, R> function, CompletableFuture<R> future)
+ {
+ assertThread();
+ super.processInternal(function, future);
+ }
+
+ @Override
+ void processInternal(Consumer<? super CommandStore> consumer, CompletableFuture<Void> future)
+ {
+ assertThread();
+ super.processInternal(consumer, future);
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
new file mode 100644
index 0000000..e28f6ba
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -0,0 +1,154 @@
+package accord.local;
+
+import accord.api.Agent;
+import accord.api.KeyRange;
+import accord.api.Store;
+import accord.topology.KeyRanges;
+import accord.topology.Shards;
+import accord.topology.Topology;
+import accord.txn.Keys;
+import accord.txn.Timestamp;
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Manages the single threaded metadata shards
+ */
+public class CommandStores
+{
+ private Topology localTopology = Shards.EMPTY;
+ private final CommandStore[] commandStores;
+
+ public CommandStores(int num, Node.Id nodeId, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory)
+ {
+ this.commandStores = new CommandStore[num];
+ for (int i=0; i<num; i++)
+ commandStores[i] = shardFactory.create(i, nodeId, uniqueNow, agent, store);
+ }
+
+ public synchronized void shutdown()
+ {
+ for (CommandStore commandStore : commandStores)
+ commandStore.shutdown();
+ }
+
+ public Stream<CommandStore> stream()
+ {
+ return StreamSupport.stream(new ShardSpliterator(), false);
+ }
+
+ public Stream<CommandStore> forKeys(Keys keys)
+ {
+ // TODO: filter shards before sending to their thread?
+ return stream().filter(commandShard -> commandShard.intersects(keys));
+ }
+
+ static List<KeyRanges> shardRanges(KeyRanges ranges, int shards)
+ {
+ List<List<KeyRange>> sharded = new ArrayList<>(shards);
+ for (int i=0; i<shards; i++)
+ sharded.add(new ArrayList<>(ranges.size()));
+
+ for (KeyRange range : ranges)
+ {
+ KeyRanges split = range.split(shards);
+ Preconditions.checkState(split.size() <= shards);
+ for (int i=0; i<split.size(); i++)
+ sharded.get(i).add(split.get(i));
+ }
+
+ List<KeyRanges> result = new ArrayList<>(shards);
+ for (int i=0; i<shards; i++)
+ {
+ result.add(new KeyRanges(sharded.get(i).toArray(KeyRange[]::new)));
+ }
+
+ return result;
+ }
+
+ public synchronized void updateTopology(Topology newTopology)
+ {
+ KeyRanges removed = localTopology.ranges().difference(newTopology.ranges());
+ KeyRanges added = newTopology.ranges().difference(localTopology.ranges());
+ List<KeyRanges> sharded = shardRanges(added, commandStores.length);
+ stream().forEach(commands -> commands.updateTopology(newTopology, sharded.get(commands.index()), removed));
+ localTopology = newTopology;
+ }
+
+ private class ShardSpliterator implements Spliterator<CommandStore>
+ {
+ int i = 0;
+
+ @Override
+ public boolean tryAdvance(Consumer<? super CommandStore> action)
+ {
+ if (i < commandStores.length)
+ {
+ CommandStore shard = commandStores[i++];
+ try
+ {
+ shard.process(action).toCompletableFuture().get();
+ }
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+ return i < commandStores.length;
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super CommandStore> action)
+ {
+ if (i >= commandStores.length)
+ return;
+
+ CompletableFuture<Void>[] futures = new CompletableFuture[commandStores.length - i];
+ for (; i< commandStores.length; i++)
+ futures[i] = commandStores[i].process(action).toCompletableFuture();
+
+ try
+ {
+ for (CompletableFuture<Void> future : futures)
+ future.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwable cause = e.getCause();
+ throw new RuntimeException(cause != null ? cause : e);
+ }
+ }
+
+ @Override
+ public Spliterator<CommandStore> trySplit()
+ {
+ return null;
+ }
+
+ @Override
+ public long estimateSize()
+ {
+ return commandStores.length;
+ }
+
+ @Override
+ public int characteristics()
+ {
+ return Spliterator.SIZED | Spliterator.NONNULL | Spliterator.DISTINCT | Spliterator.IMMUTABLE;
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 36802e9..8ee847b 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -1,12 +1,14 @@
package accord.local;
+import java.util.Iterator;
import java.util.NavigableMap;
import java.util.TreeMap;
import accord.txn.Timestamp;
import accord.txn.TxnId;
+import com.google.common.collect.Iterators;
-public class CommandsForKey implements Listener
+public class CommandsForKey implements Listener, Iterable<Command>
{
// TODO: efficiency
public final NavigableMap<Timestamp, Command> uncommitted = new TreeMap<>();
@@ -43,4 +45,10 @@ public class CommandsForKey implements Listener
uncommitted.put(command.txnId(), command);
command.addListener(this);
}
+
+ @Override
+ public Iterator<Command> iterator()
+ {
+ return Iterators.concat(uncommitted.values().iterator(), committedByExecuteAt.values().iterator());
+ }
}
diff --git a/accord-core/src/main/java/accord/local/Instance.java b/accord-core/src/main/java/accord/local/Instance.java
deleted file mode 100644
index c842b1e..0000000
--- a/accord-core/src/main/java/accord/local/Instance.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package accord.local;
-
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import accord.api.Key;
-import accord.api.Store;
-import accord.topology.Shard;
-import accord.txn.TxnId;
-
-/**
- * node-local accord metadata per shard
- */
-public class Instance
-{
- public final Shard shard;
- private final Node node;
- private final Store store;
- private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
- private final NavigableMap<Key, CommandsForKey> commandsForKey = new TreeMap<>();
-
- public Instance(Shard shard, Node node, Store store)
- {
- this.shard = shard;
- this.node = node;
- this.store = store;
- }
-
- public Command command(TxnId txnId)
- {
- return commands.computeIfAbsent(txnId, id -> new Command(this, id));
- }
-
- public boolean hasCommand(TxnId txnId)
- {
- return commands.containsKey(txnId);
- }
-
- public CommandsForKey commandsForKey(Key key)
- {
- return commandsForKey.computeIfAbsent(key, ignore -> new CommandsForKey());
- }
-
- public boolean hasCommandsForKey(Key key)
- {
- return commandsForKey.containsKey(key);
- }
-
- public Store store()
- {
- return store;
- }
-
- public Node node()
- {
- return node;
- }
-}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 7724186..05c9cb0 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -70,10 +70,14 @@ public class Node
}
}
+ public static int numCommandShards()
+ {
+ return 8; // TODO: make configurable
+ }
+
+ private final CommandStores commandStores;
private final Id id;
private final Topology cluster;
- private final Shards local;
- private final Instance[] instances;
private final MessageSink messageSink;
private final Random random;
@@ -87,20 +91,24 @@ public class Node
private final Map<TxnId, CompletionStage<Result>> coordinating = new ConcurrentHashMap<>();
private final Set<TxnId> pendingRecovery = Collections.newSetFromMap(new ConcurrentHashMap<>());
- public Node(Id id, Topology cluster, Shards local, MessageSink messageSink, Random random, LongSupplier nowSupplier, Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler)
+ public Node(Id id, Topology cluster, MessageSink messageSink, Random random, LongSupplier nowSupplier,
+ Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStore.Factory commandStoreFactory)
{
this.id = id;
this.cluster = cluster;
this.random = random;
this.agent = agent;
this.now = new AtomicReference<>(new Timestamp(nowSupplier.getAsLong(), 0, id));
- this.local = local;
this.messageSink = messageSink;
- this.instances = new Instance[local.size()];
this.nowSupplier = nowSupplier;
this.scheduler = scheduler;
- for (int i = 0 ; i < instances.length ; ++i)
- instances[i] = new Instance(local.get(i), this, dataSupplier.get());
+ this.commandStores = new CommandStores(numCommandShards(), id, this::uniqueNow, agent, dataSupplier.get(), commandStoreFactory);
+ this.commandStores.updateTopology(cluster.forNode(id));
+ }
+
+ public void shutdown()
+ {
+ commandStores.shutdown();
}
public Timestamp uniqueNow()
@@ -136,13 +144,12 @@ public class Node
return cluster;
}
- public Stream<Instance> local(Keys keys)
+ public Stream<CommandStore> local(Keys keys)
{
- // TODO: efficiency
- return Stream.of(local.select(keys, instances, Instance[]::new));
+ return commandStores.forKeys(keys);
}
- public Optional<Instance> local(Key key)
+ public Optional<CommandStore> local(Key key)
{
return local(Keys.of(key)).reduce((i1, i2) -> {
throw new IllegalStateException("more than one instance encountered for key");
@@ -190,6 +197,18 @@ public class Node
});
}
+ public <T> void send(Collection<Id> to, Request send)
+ {
+ for (Id dst: to)
+ send(dst, send);
+ }
+
+ public <T> void send(Collection<Id> to, Request send, Callback<T> callback)
+ {
+ for (Id dst: to)
+ send(dst, send, callback);
+ }
+
// send to a specific node
public <T> void send(Id to, Request send, Callback<T> callback)
{
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 2c3aeb2..785968c 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -1,7 +1,5 @@
package accord.messages;
-import accord.messages.Reply;
-import accord.messages.Request;
import accord.api.Result;
import accord.txn.Writes;
import accord.txn.Ballot;
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index 0ae2e76..d14c6fd 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -4,11 +4,9 @@ import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
-import accord.local.Instance;
+import accord.local.CommandStore;
import accord.local.Node;
import accord.local.Node.Id;
-import accord.messages.Reply;
-import accord.messages.Request;
import accord.txn.Timestamp;
import accord.local.Command;
import accord.txn.Dependencies;
@@ -113,10 +111,10 @@ public class PreAccept implements Request
}
}
- static Dependencies calculateDeps(Instance instance, TxnId txnId, Txn txn, Timestamp executeAt)
+ static Dependencies calculateDeps(CommandStore commandStore, TxnId txnId, Txn txn, Timestamp executeAt)
{
NavigableMap<TxnId, Txn> deps = new TreeMap<>();
- txn.conflictsMayExecuteBefore(instance, executeAt).forEach(conflict -> {
+ txn.conflictsMayExecuteBefore(commandStore, executeAt).forEach(conflict -> {
if (conflict.txnId().equals(txnId))
return;
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index 45f9f30..8ad2937 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -5,15 +5,9 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import accord.local.Instance;
-import accord.local.Node;
+import accord.local.*;
import accord.local.Node.Id;
import accord.api.Data;
-import accord.messages.Reply;
-import accord.messages.Request;
-import accord.local.Command;
-import accord.local.Listener;
-import accord.local.Status;
import accord.txn.Txn;
import accord.txn.TxnId;
import accord.txn.Timestamp;
@@ -31,7 +25,7 @@ public class ReadData implements Request
Data data;
boolean isObsolete; // TODO: respond with the Executed result we have stored?
- Set<Instance> waitingOn;
+ Set<CommandStore> waitingOn;
Scheduled waitingOnReporter;
LocalRead(TxnId txnId, Node node, Id replyToNode, long replyToMessage)
@@ -56,7 +50,7 @@ public class ReadData implements Request
@Override
public void run()
{
- Iterator<Instance> i = waitingOn.iterator();
+ Iterator<CommandStore> i = waitingOn.iterator();
Command blockedBy = null;
while (i.hasNext() && null == (blockedBy = i.next().command(txnId).blockedBy()));
if (blockedBy == null) return;
@@ -94,7 +88,7 @@ public class ReadData implements Request
Data next = command.txn().read(command);
data = data == null ? next : data.merge(next);
- waitingOn.remove(command.instance);
+ waitingOn.remove(command.commandStore);
if (waitingOn.isEmpty())
{
waitingOnReporter.cancel();
@@ -108,7 +102,8 @@ public class ReadData implements Request
{
isObsolete = true;
waitingOnReporter.cancel();
- node.send(command.instance.shard, new Apply(command.txnId(), command.txn(), command.executeAt(), command.savedDeps(), command.writes(), command.result()));
+ // FIXME: this may result in redundant messages being sent when a shard is split across several command shards
+ node.send(command.commandStore.nodesFor(command), new Apply(command.txnId(), command.txn(), command.executeAt(), command.savedDeps(), command.writes(), command.result()));
node.reply(replyToNode, replyToMessage, new ReadNack());
}
}
@@ -117,7 +112,8 @@ public class ReadData implements Request
{
// TODO: simple hash set supporting concurrent modification, or else avoid concurrent modification
waitingOn = txn.local(node).collect(Collectors.toCollection(() -> new DeterministicIdentitySet<>()));
- waitingOn.forEach(instance -> {
+ // FIXME: fix/check thread safety
+ CommandStore.onEach(waitingOn, instance -> {
Command command = instance.command(txnId);
command.witness(txn);
switch (command.status())
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
index 2007215..e825bc8 100644
--- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -3,13 +3,8 @@ package accord.messages;
import java.util.List;
import java.util.stream.Collectors;
-import accord.local.Instance;
-import accord.local.Node;
+import accord.local.*;
import accord.local.Node.Id;
-import accord.messages.Reply;
-import accord.messages.Request;
-import accord.local.Command;
-import accord.local.Listener;
import accord.txn.TxnId;
import accord.txn.Keys;
@@ -60,7 +55,7 @@ public class WaitOnCommit implements Request
synchronized void setup(TxnId txnId, Keys keys)
{
- List<Instance> instances = node.local(keys).collect(Collectors.toList());
+ List<CommandStore> instances = node.local(keys).collect(Collectors.toList());
waitingOn = instances.size();
instances.forEach(instance -> {
Command command = instance.command(txnId);
diff --git a/accord-core/src/main/java/accord/topology/KeyRanges.java b/accord-core/src/main/java/accord/topology/KeyRanges.java
index 90ed527..3702276 100644
--- a/accord-core/src/main/java/accord/topology/KeyRanges.java
+++ b/accord-core/src/main/java/accord/topology/KeyRanges.java
@@ -2,26 +2,57 @@ package accord.topology;
import accord.api.Key;
import accord.api.KeyRange;
+import accord.txn.Keys;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
-import java.util.Arrays;
+import java.util.*;
-public class KeyRanges
+public class KeyRanges implements Iterable<KeyRange>
{
public static final KeyRanges EMPTY = new KeyRanges(new KeyRange[0]);
+ // TODO: fix raw parameterized use
private final KeyRange[] ranges;
public KeyRanges(KeyRange[] ranges)
{
+ Preconditions.checkNotNull(ranges);
this.ranges = ranges;
}
+ public KeyRanges(List<KeyRange> ranges)
+ {
+ this(ranges.toArray(KeyRange[]::new));
+ }
+
@Override
public String toString()
{
return Arrays.toString(ranges);
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ KeyRanges ranges1 = (KeyRanges) o;
+ return Arrays.equals(ranges, ranges1.ranges);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Arrays.hashCode(ranges);
+ }
+
+ @Override
+ public Iterator<KeyRange> iterator()
+ {
+ return Iterators.forArray(ranges);
+ }
+
public int rangeIndexForKey(int lowerBound, int upperBound, Key key)
{
return Arrays.binarySearch(ranges, lowerBound, upperBound, key,
@@ -38,11 +69,122 @@ public class KeyRanges
return ranges.length;
}
+ public KeyRange get(int i)
+ {
+ return ranges[i];
+ }
+
+ public boolean isEmpty()
+ {
+ return size() == 0;
+ }
+
public KeyRanges select(int[] indexes)
{
KeyRange[] selection = new KeyRange[indexes.length];
for (int i=0; i<indexes.length; i++)
- selection[i] = ranges[i];
+ selection[i] = ranges[indexes[i]];
return new KeyRanges(selection);
}
+
+ public boolean intersects(Keys keys)
+ {
+ for (int i=0; i<ranges.length; i++)
+ if (ranges[i].intersects(keys))
+ return true;
+ return false;
+ }
+
+ /**
+ * Subtracts the given set of key ranges from this
+ * @param that
+ * @return
+ */
+ public KeyRanges difference(KeyRanges that)
+ {
+ List<KeyRange> result = new ArrayList<>(this.size() + that.size());
+ int thatIdx = 0;
+
+ for (int thisIdx=0; thisIdx<this.size(); thisIdx++)
+ {
+ KeyRange thisRange = this.ranges[thisIdx];
+ while (thatIdx < that.size())
+ {
+ KeyRange thatRange = that.ranges[thatIdx];
+
+ int cmp = thisRange.compareIntersecting(thatRange);
+ if (cmp > 0)
+ {
+ thatIdx++;
+ continue;
+ }
+ if (cmp < 0) break;
+
+ int scmp = thisRange.start().compareTo(thatRange.start());
+ int ecmp = thisRange.end().compareTo(thatRange.end());
+
+ if (scmp < 0)
+ result.add(thisRange.subRange(thisRange.start(), thatRange.start()));
+
+ if (ecmp <= 0)
+ {
+ thisRange = null;
+ break;
+ }
+ else
+ {
+ thisRange = thisRange.subRange(thatRange.end(), thisRange.end());
+ thatIdx++;
+ }
+ }
+ if (thisRange != null)
+ result.add(thisRange);
+ }
+ return new KeyRanges(result.toArray(KeyRange[]::new));
+ }
+
+ /**
+ * Adds a set of non-overlapping ranges
+ */
+ public KeyRanges union(KeyRanges that)
+ {
+ KeyRange[] combined = new KeyRange[this.ranges.length + that.ranges.length];
+ System.arraycopy(this.ranges, 0, combined, 0, this.ranges.length);
+ System.arraycopy(that.ranges, 0, combined, this.ranges.length, that.ranges.length);
+ Arrays.sort(combined, Comparator.comparing(KeyRange::start));
+
+ for (int i=1; i<combined.length; i++)
+ Preconditions.checkArgument(combined[i].compareIntersecting(combined[i -1]) != 0);
+
+ return new KeyRanges(combined);
+ }
+
+ public KeyRanges union(KeyRange range)
+ {
+ return union(new KeyRanges(new KeyRange[]{range}));
+ }
+
+ public KeyRanges mergeTouching()
+ {
+ if (ranges.length == 0)
+ return this;
+ List<KeyRange> result = new ArrayList<>(ranges.length);
+ KeyRange current = ranges[0];
+ for (int i=1; i<ranges.length; i++)
+ {
+ KeyRange merged = current.tryMerge(ranges[i]);
+ if (merged != null)
+ {
+ current = merged;
+ }
+ else
+ {
+ result.add(current);
+ current = ranges[i];
+ }
+ }
+ result.add(current);
+ return new KeyRanges(result.toArray(KeyRange[]::new));
+ }
+
}
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
index fcd7ee2..99357ce 100644
--- a/accord-core/src/main/java/accord/topology/Shard.java
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -8,11 +8,15 @@ import accord.local.Node.Id;
import accord.api.Key;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
public class Shard
{
public final KeyRange range;
+ // TODO: use BTreeSet to combine these two (or introduce version that operates over long values)
public final List<Id> nodes;
+ public final Set<Id> nodeSet;
public final Set<Id> fastPathElectorate;
public final int recoveryFastPathSize;
public final int fastPathQuorumSize;
@@ -21,9 +25,10 @@ public class Shard
public Shard(KeyRange range, List<Id> nodes, Set<Id> fastPathElectorate)
{
this.range = range;
- this.nodes = nodes;
+ this.nodes = ImmutableList.copyOf(nodes);
+ this.nodeSet = ImmutableSet.copyOf(nodes);
int f = maxToleratedFailures(nodes.size());
- this.fastPathElectorate = fastPathElectorate;
+ this.fastPathElectorate = ImmutableSet.copyOf(fastPathElectorate);
int e = fastPathElectorate.size();
this.recoveryFastPathSize = (f+1)/2;
this.slowPathQuorumSize = f + 1;
@@ -43,6 +48,11 @@ public class Shard
return (f + electorate)/2 + 1;
}
+ public int rf()
+ {
+ return nodes.size();
+ }
+
public boolean contains(Key key)
{
return range.containsKey(key);
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index f568307..5f645f2 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -7,6 +7,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.IntFunction;
import java.util.stream.IntStream;
@@ -15,6 +16,7 @@ import accord.local.Node.Id;
import accord.api.Key;
import accord.txn.Keys;
import accord.utils.IndexedConsumer;
+import accord.utils.IndexedPredicate;
public class Topology extends AbstractCollection<Shard>
{
@@ -22,7 +24,7 @@ public class Topology extends AbstractCollection<Shard>
final KeyRanges ranges;
final Map<Id, Shards.NodeInfo> nodeLookup;
final KeyRanges subsetOfRanges;
- final int[] supersetIndexes;
+ final int[] supersetRangeIndexes;
static class NodeInfo
{
@@ -47,7 +49,7 @@ public class Topology extends AbstractCollection<Shard>
this.ranges = new KeyRanges(Arrays.stream(shards).map(shard -> shard.range).toArray(KeyRange[]::new));
this.shards = shards;
this.subsetOfRanges = ranges;
- this.supersetIndexes = IntStream.range(0, shards.length).toArray();
+ this.supersetRangeIndexes = IntStream.range(0, shards.length).toArray();
this.nodeLookup = new HashMap<>();
Map<Id, List<Integer>> build = new HashMap<>();
for (int i = 0 ; i < shards.length ; ++i)
@@ -69,7 +71,7 @@ public class Topology extends AbstractCollection<Shard>
this.ranges = ranges;
this.nodeLookup = nodeLookup;
this.subsetOfRanges = subsetOfRanges;
- this.supersetIndexes = supersetIndexes;
+ this.supersetRangeIndexes = supersetIndexes;
}
public Shards forNode(Id node)
@@ -99,7 +101,7 @@ public class Topology extends AbstractCollection<Shard>
subsetIndex = subsetOfRanges.rangeIndexForKey(subsetIndex, subsetOfRanges.size(), select.get(i));
if (subsetIndex < 0 || subsetIndex >= subsetOfRanges.size())
throw new IllegalArgumentException("Range not found for " + select.get(i));
- int supersetIndex = supersetIndexes[subsetIndex];
+ int supersetIndex = supersetRangeIndexes[subsetIndex];
newSubset[count++] = supersetIndex;
Shard shard = shards[supersetIndex];
// find the first key outside this range
@@ -108,6 +110,14 @@ public class Topology extends AbstractCollection<Shard>
if (count != newSubset.length)
newSubset = Arrays.copyOf(newSubset, count);
KeyRanges rangeSubset = ranges.select(newSubset);
+
+ // TODO: more efficient sharing of nodeLookup state
+ Map<Id, NodeInfo> nodeLookup = new HashMap<>();
+ for (Map.Entry<Id, NodeInfo> e : this.nodeLookup.entrySet())
+ {
+ if (intersects(newSubset, e.getValue().supersetIndexes))
+ nodeLookup.put(e.getKey(), e.getValue());
+ }
return new Shards(shards, ranges, nodeLookup, rangeSubset, newSubset);
}
@@ -118,11 +128,11 @@ public class Topology extends AbstractCollection<Shard>
public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
{
Shards.NodeInfo info = nodeLookup.get(on);
- for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
+ for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetRangeIndexes.length && k < info.supersetIndexes.length ;)
{
Key key = select.get(i);
- Shard shard = shards[supersetIndexes[j]];
- int c = supersetIndexes[j] - info.supersetIndexes[k];
+ Shard shard = shards[supersetRangeIndexes[j]];
+ int c = supersetRangeIndexes[j] - info.supersetIndexes[k];
if (c < 0) ++j;
else if (c > 0) ++k;
else
@@ -139,7 +149,9 @@ public class Topology extends AbstractCollection<Shard>
{
// TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
Shards.NodeInfo info = nodeLookup.get(on);
- int[] a = supersetIndexes, b = info.supersetIndexes;
+ if (info == null)
+ return;
+ int[] a = supersetRangeIndexes, b = info.supersetIndexes;
int ai = 0, bi = 0;
while (ai < a.length && bi < b.length)
{
@@ -161,19 +173,48 @@ public class Topology extends AbstractCollection<Shard>
}
}
+ public int matchesOn(Id on, IndexedPredicate<Shard> consumer)
+ {
+ // TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
+ int count = 0;
+ Shards.NodeInfo info = nodeLookup.get(on);
+ int[] a = supersetRangeIndexes, b = info.supersetIndexes;
+ int ai = 0, bi = 0;
+ while (ai < a.length && bi < b.length)
+ {
+ if (a[ai] == b[bi])
+ {
+ if (consumer.test(ai, shards[a[ai]]))
+ ++count;
+ ++ai; ++bi;
+ }
+ else if (a[ai] < b[bi])
+ {
+ ai = Arrays.binarySearch(a, ai + 1, a.length, b[bi]);
+ if (ai < 0) ai = -1 -ai;
+ }
+ else
+ {
+ bi = Arrays.binarySearch(b, bi + 1, b.length, a[ai]);
+ if (bi < 0) bi = -1 -bi;
+ }
+ }
+ return count;
+ }
+
public void forEach(IndexedConsumer<Shard> consumer)
{
- for (int i = 0 ; i < supersetIndexes.length ; ++i)
- consumer.accept(i, shards[supersetIndexes[i]]);
+ for (int i = 0; i < supersetRangeIndexes.length ; ++i)
+ consumer.accept(i, shards[supersetRangeIndexes[i]]);
}
public <T> T[] select(Keys select, T[] indexedByShard, IntFunction<T[]> constructor)
{
List<T> selection = new ArrayList<>();
- for (int i = 0, j = 0 ; i < select.size() && j < supersetIndexes.length ;)
+ for (int i = 0, j = 0 ; i < select.size() && j < supersetRangeIndexes.length ;)
{
Key k = select.get(i);
- Shard shard = shards[supersetIndexes[j]];
+ Shard shard = shards[supersetRangeIndexes[j]];
int c = shard.range.compareKey(k);
if (c < 0) ++i;
@@ -187,7 +228,7 @@ public class Topology extends AbstractCollection<Shard>
@Override
public Iterator<Shard> iterator()
{
- return IntStream.of(supersetIndexes).mapToObj(i -> shards[i]).iterator();
+ return IntStream.of(supersetRangeIndexes).mapToObj(i -> shards[i]).iterator();
}
@Override
@@ -196,8 +237,38 @@ public class Topology extends AbstractCollection<Shard>
return subsetOfRanges.size();
}
+ public int maxRf()
+ {
+ int rf = Integer.MIN_VALUE;
+ for (int i : supersetRangeIndexes)
+ rf = Math.max(rf, shards[i].rf());
+ return rf;
+ }
+
public Shard get(int index)
{
- return shards[supersetIndexes[index]];
+ return shards[supersetRangeIndexes[index]];
+ }
+
+ public Set<Id> nodes()
+ {
+ return nodeLookup.keySet();
+ }
+
+ public KeyRanges ranges()
+ {
+ return ranges;
+ }
+
+ private static boolean intersects(int[] is, int[] js)
+ {
+ for (int i = 0, j = 0 ; i < is.length && j < js.length ;)
+ {
+ int c = is[i] - js[j];
+ if (c < 0) ++i;
+ else if (c > 0) ++j;
+ else return true;
+ }
+ return false;
}
}
diff --git a/accord-core/src/main/java/accord/txn/Dependencies.java b/accord-core/src/main/java/accord/txn/Dependencies.java
index 0c6fa83..4e9e114 100644
--- a/accord-core/src/main/java/accord/txn/Dependencies.java
+++ b/accord-core/src/main/java/accord/txn/Dependencies.java
@@ -7,7 +7,7 @@ import java.util.Objects;
import java.util.TreeMap;
import accord.local.Command;
-import accord.topology.Shard;
+import accord.local.CommandStore;
import com.google.common.annotations.VisibleForTesting;
// TODO: do not send Txn
@@ -64,13 +64,12 @@ public class Dependencies implements Iterable<Entry<TxnId, Txn>>
return deps.get(txnId);
}
- public Iterable<TxnId> on(Shard shard)
+ public Iterable<TxnId> on(CommandStore commandStore)
{
- // TODO: efficiency
return deps.entrySet()
- .stream()
- .filter(e -> e.getValue().keys().stream().anyMatch(shard::contains))
- .map(Entry::getKey)::iterator;
+ .stream()
+ .filter(e -> commandStore.intersects(e.getValue().keys()))
+ .map(Entry::getKey)::iterator;
}
@Override
diff --git a/accord-core/src/main/java/accord/txn/Txn.java b/accord-core/src/main/java/accord/txn/Txn.java
index 1249d97..e67a6a2 100644
--- a/accord-core/src/main/java/accord/txn/Txn.java
+++ b/accord-core/src/main/java/accord/txn/Txn.java
@@ -4,10 +4,8 @@ import java.util.Comparator;
import java.util.stream.Stream;
import accord.api.*;
-import accord.local.Command;
-import accord.local.CommandsForKey;
-import accord.local.Instance;
-import accord.local.Node;
+import accord.local.*;
+import accord.topology.KeyRanges;
public class Txn
{
@@ -74,32 +72,32 @@ public class Txn
return "read:" + read.toString() + (update != null ? ", update:" + update : "");
}
- public Data read(KeyRange range, Store store)
+ public Data read(KeyRanges range, Store store)
{
return read.read(range, store);
}
public Data read(Command command)
{
- Instance instance = command.instance;
- return read(instance.shard.range, instance.store());
+ CommandStore commandStore = command.commandStore;
+ return read(commandStore.ranges(), commandStore.store());
}
// TODO: move these somewhere else?
- public Stream<Instance> local(Node node)
+ public Stream<CommandStore> local(Node node)
{
return node.local(keys());
}
- public Timestamp maxConflict(Instance instance)
+ public Timestamp maxConflict(CommandStore commandStore)
{
- return maxConflict(instance, keys());
+ return maxConflict(commandStore, keys());
}
- public Stream<Command> conflictsMayExecuteBefore(Instance instance, Timestamp mayExecuteBefore)
+ public Stream<Command> conflictsMayExecuteBefore(CommandStore commandStore, Timestamp mayExecuteBefore)
{
return keys().stream().flatMap(key -> {
- CommandsForKey forKey = instance.commandsForKey(key);
+ CommandsForKey forKey = commandStore.commandsForKey(key);
return Stream.concat(
forKey.uncommitted.headMap(mayExecuteBefore, false).values().stream(),
// TODO: only return latest of Committed?
@@ -108,48 +106,48 @@ public class Txn
});
}
- public Stream<Command> uncommittedStartedBefore(Instance instance, TxnId startedBefore)
+ public Stream<Command> uncommittedStartedBefore(CommandStore commandStore, TxnId startedBefore)
{
return keys().stream().flatMap(key -> {
- CommandsForKey forKey = instance.commandsForKey(key);
+ CommandsForKey forKey = commandStore.commandsForKey(key);
return forKey.uncommitted.headMap(startedBefore, false).values().stream();
});
}
- public Stream<Command> committedStartedBefore(Instance instance, TxnId startedBefore)
+ public Stream<Command> committedStartedBefore(CommandStore commandStore, TxnId startedBefore)
{
return keys().stream().flatMap(key -> {
- CommandsForKey forKey = instance.commandsForKey(key);
+ CommandsForKey forKey = commandStore.commandsForKey(key);
return forKey.committedById.headMap(startedBefore, false).values().stream();
});
}
- public Stream<Command> uncommittedStartedAfter(Instance instance, TxnId startedAfter)
+ public Stream<Command> uncommittedStartedAfter(CommandStore commandStore, TxnId startedAfter)
{
return keys().stream().flatMap(key -> {
- CommandsForKey forKey = instance.commandsForKey(key);
+ CommandsForKey forKey = commandStore.commandsForKey(key);
return forKey.uncommitted.tailMap(startedAfter, false).values().stream();
});
}
- public Stream<Command> committedExecutesAfter(Instance instance, TxnId startedAfter)
+ public Stream<Command> committedExecutesAfter(CommandStore commandStore, TxnId startedAfter)
{
return keys().stream().flatMap(key -> {
- CommandsForKey forKey = instance.commandsForKey(key);
+ CommandsForKey forKey = commandStore.commandsForKey(key);
return forKey.committedByExecuteAt.tailMap(startedAfter, false).values().stream();
});
}
- public void register(Instance instance, Command command)
+ public void register(CommandStore commandStore, Command command)
{
- assert instance == command.instance;
- keys().forEach(key -> instance.commandsForKey(key).register(command));
+ assert commandStore == command.commandStore;
+ keys().forEach(key -> commandStore.commandsForKey(key).register(command));
}
- protected Timestamp maxConflict(Instance instance, Keys keys)
+ protected Timestamp maxConflict(CommandStore commandStore, Keys keys)
{
return keys.stream()
- .map(instance::commandsForKey)
+ .map(commandStore::commandsForKey)
.map(CommandsForKey::max)
.max(Comparator.naturalOrder())
.orElse(Timestamp.NONE);
diff --git a/accord-core/src/main/java/accord/txn/Writes.java b/accord-core/src/main/java/accord/txn/Writes.java
index 4913e77..42d7a88 100644
--- a/accord-core/src/main/java/accord/txn/Writes.java
+++ b/accord-core/src/main/java/accord/txn/Writes.java
@@ -1,7 +1,7 @@
package accord.txn;
import accord.api.Write;
-import accord.local.Instance;
+import accord.local.CommandStore;
public class Writes
{
@@ -16,10 +16,10 @@ public class Writes
this.write = write;
}
- public void apply(Instance instance)
+ public void apply(CommandStore commandStore)
{
if (write != null)
- write.apply(instance.shard.range, executeAt, instance.store());
+ write.apply(commandStore.ranges(), executeAt, commandStore.store());
}
@Override
diff --git a/accord-core/src/main/java/accord/utils/IndexedPredicate.java b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
new file mode 100644
index 0000000..358f490
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedPredicate<V>
+{
+ boolean test(int i, V v);
+}
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 3421753..51beff3 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -1,7 +1,11 @@
package accord;
+import accord.api.KeyRange;
import accord.local.Node;
import accord.impl.mock.MockStore;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
import accord.txn.Txn;
import accord.txn.Keys;
import com.google.common.base.Preconditions;
@@ -36,6 +40,16 @@ public class Utils
return rlist;
}
+ public static KeyRanges ranges(KeyRange... ranges)
+ {
+ return new KeyRanges(ranges);
+ }
+
+ public static Shards shards(Shard... shards)
+ {
+ return new Shards(shards);
+ }
+
public static Txn writeTxn(Keys keys)
{
return new Txn(keys, MockStore.READ, MockStore.QUERY, MockStore.UPDATE);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index 8f3821e..7c8444a 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -234,7 +234,6 @@ public class BurnTest
while (true)
{
long seed = ThreadLocalRandom.current().nextLong();
-// long seed = 5844871443302548687L;
System.out.println("Seed " + seed);
Random random = new Random(seed);
List<Id> clients = generateIds(true, 1 + random.nextInt(4));
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index d819b94..70ec398 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -2,9 +2,9 @@ package accord.coordinate;
import accord.local.Node;
import accord.impl.mock.MockCluster;
-import accord.impl.IntKey;
import accord.api.Result;
import accord.impl.mock.MockStore;
+import accord.txn.Keys;
import accord.txn.Txn;
import accord.txn.TxnId;
import org.junit.jupiter.api.Assertions;
@@ -12,34 +12,62 @@ import org.junit.jupiter.api.Test;
import static accord.Utils.ids;
import static accord.Utils.writeTxn;
+import static accord.impl.IntKey.keys;
public class CoordinateTest
{
@Test
void simpleTest() throws Throwable
{
- MockCluster cluster = MockCluster.builder().build();
- Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ try (MockCluster cluster = MockCluster.builder().build())
+ {
+ Node node = cluster.get(1);
+ Assertions.assertNotNull(node);
- TxnId txnId = new TxnId(100, 0, node.id());
- Txn txn = writeTxn(IntKey.keys(10));
- Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
- Assertions.assertEquals(MockStore.RESULT, result);
+ TxnId txnId = new TxnId(100, 0, node.id());
+ Txn txn = writeTxn(keys(10));
+ Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
+ Assertions.assertEquals(MockStore.RESULT, result);
+ }
}
@Test
void slowPathTest() throws Throwable
{
- MockCluster cluster = MockCluster.builder().nodes(7).replication(7).build();
- cluster.networkFilter.isolate(ids(5, 7));
+ try (MockCluster cluster = MockCluster.builder().nodes(7).replication(7).build())
+ {
+ cluster.networkFilter.isolate(ids(5, 7));
- Node node = cluster.get(1);
- Assertions.assertNotNull(node);
+ Node node = cluster.get(1);
+ Assertions.assertNotNull(node);
- TxnId txnId = new TxnId(100, 0, node.id());
- Txn txn = writeTxn(IntKey.keys(10));
+ TxnId txnId = new TxnId(100, 0, node.id());
+ Txn txn = writeTxn(keys(10));
+ Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
+ Assertions.assertEquals(MockStore.RESULT, result);
+ }
+ }
+
+ private TxnId coordinate(Node node, long clock, Keys keys) throws Throwable
+ {
+ TxnId txnId = new TxnId(clock, 0, node.id());
+ Txn txn = writeTxn(keys);
Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
Assertions.assertEquals(MockStore.RESULT, result);
+ return txnId;
+ }
+
+ @Test
+ void multiKeyTest() throws Throwable
+ {
+ try (MockCluster cluster = MockCluster.builder().nodes(6).maxKey(600).build())
+ {
+ Node node = cluster.get(1);
+ Assertions.assertNotNull(node);
+
+ TxnId txnId1 = coordinate(node, 100, keys(50, 350, 550));
+ TxnId txnId2 = coordinate(node, 150, keys(250, 350, 450));
+ TxnId txnId3 = coordinate(node, 125, keys(50, 60, 70, 80, 350, 550));
+ }
}
}
diff --git a/accord-core/src/test/java/accord/coordinate/PreacceptTrackerTest.java b/accord-core/src/test/java/accord/coordinate/PreacceptTrackerTest.java
new file mode 100644
index 0000000..d2baf87
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/PreacceptTrackerTest.java
@@ -0,0 +1,135 @@
+package accord.coordinate;
+
+import accord.Utils;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static accord.Utils.shards;
+
+public class PreacceptTrackerTest
+{
+ private static final Node.Id[] ids = Utils.ids(5).toArray(Node.Id[]::new);
+ private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+ private static final Shards topology = TopologyUtils.initialTopology(ids, ranges, 3);
+ /*
+ (000, 100](100, 200](200, 300](300, 400](400, 500]
+ [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
+ */
+
+ private static void assertResponseState(FastPathTracker<?> responses,
+ boolean quorumReached,
+ boolean fastPathAccepted,
+ boolean failed,
+ boolean hasOutstandingResponses)
+ {
+ Assertions.assertEquals(quorumReached, responses.hasReachedQuorum());
+ Assertions.assertEquals(fastPathAccepted, responses.hasMetFastPathCriteria());
+ Assertions.assertEquals(failed, responses.hasFailed());
+ Assertions.assertEquals(hasOutstandingResponses, responses.hasInFlight());
+ }
+
+ @Test
+ void singleShard()
+ {
+ Shards subShards = shards(topology.get(0));
+ FastPathTracker responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+ responses.recordSuccess(ids[0], false);
+ assertResponseState(responses, false, false, false, true);
+
+ responses.recordSuccess(ids[1], false);
+ assertResponseState(responses, true, false, false, true);
+
+ responses.recordSuccess(ids[2], false);
+ assertResponseState(responses, true, false, false, false);
+ }
+
+ @Test
+ void singleShardFastPath()
+ {
+ Shards subShards = shards(topology.get(0));
+ FastPathTracker responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+ responses.recordSuccess(ids[0], true);
+ assertResponseState(responses, false, false, false, true);
+
+ responses.recordSuccess(ids[1], true);
+ assertResponseState(responses, true, false, false, true);
+
+ responses.recordSuccess(ids[2], true);
+ assertResponseState(responses, true, true, false, false);
+ }
+
+ /**
+ * responses from unexpected endpoints should be ignored
+ */
+ @Test
+ void unexpectedResponsesAreIgnored()
+ {
+ Shards subShards = shards(topology.get(0));
+ FastPathTracker responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+ responses.recordSuccess(ids[0], false);
+ assertResponseState(responses, false, false, false, true);
+
+ responses.recordSuccess(ids[1], false);
+ assertResponseState(responses, true, false, false, true);
+
+ Assertions.assertFalse(subShards.get(0).nodes.contains(ids[4]));
+ responses.recordSuccess(ids[4], false);
+ assertResponseState(responses, true, false, false, true);
+ }
+
+ @Test
+ void failure()
+ {
+ Shards subShards = shards(topology.get(0));
+ FastPathTracker<?> responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+
+ responses.recordSuccess(ids[0], true);
+ assertResponseState(responses, false, false, false, true);
+
+ responses.recordFailure(ids[1]);
+ assertResponseState(responses, false, false, false, true);
+
+ responses.recordFailure(ids[2]);
+ assertResponseState(responses, false, false, true, false);
+ }
+
+ @Test
+ void multiShard()
+ {
+ Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+ FastPathTracker<Agree.ShardTracker> responses = new FastPathTracker<>(subShards, Agree.ShardTracker[]::new, Agree.ShardTracker::new);
+ /*
+ (000, 100](100, 200](200, 300]
+ [1, 2, 3] [2, 3, 4] [3, 4, 5]
+ */
+
+ Assertions.assertSame(subShards.get(0), responses.unsafeGet(0).shard);
+ Assertions.assertSame(subShards.get(1), responses.unsafeGet(1).shard);
+ Assertions.assertSame(subShards.get(2), responses.unsafeGet(2).shard);
+
+ responses.recordSuccess(ids[1], true);
+ assertResponseState(responses, false, false, false, true);
+
+ responses.recordSuccess(ids[2], true);
+ assertResponseState(responses, false, false, false, true);
+
+ responses.recordSuccess(ids[3], true);
+ // the middle shard will have reached fast path
+ Assertions.assertTrue(responses.unsafeGet(1).hasMetFastPathCriteria());
+ // but since the others haven't, it won't report it as accepted
+ assertResponseState(responses, true, false, false, true);
+
+ responses.recordSuccess(ids[0], true);
+ responses.recordSuccess(ids[4], true);
+ assertResponseState(responses, true, true, false, false);
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/RecoverTest.java b/accord-core/src/test/java/accord/coordinate/RecoverTest.java
index 3f39b51..02795b4 100644
--- a/accord-core/src/test/java/accord/coordinate/RecoverTest.java
+++ b/accord-core/src/test/java/accord/coordinate/RecoverTest.java
@@ -1,11 +1,9 @@
package accord.coordinate;
-import accord.local.Instance;
import accord.local.Node;
import accord.impl.mock.MockCluster;
import accord.impl.IntKey;
import accord.api.Key;
-import accord.messages.Timeout;
import accord.local.*;
import accord.txn.Keys;
import accord.messages.PreAccept;
@@ -13,7 +11,6 @@ import accord.txn.Txn;
import accord.txn.TxnId;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.concurrent.CompletionStage;
@@ -26,17 +23,16 @@ import static accord.Utils.writeTxn;
public class RecoverTest
{
-
- private static Instance getInstance(Node node, Key key)
+ private static CommandStore getCommandShard(Node node, Key key)
{
return node.local(key).orElseThrow();
}
private static Command getCommand(Node node, Key key, TxnId txnId)
{
- Instance instance = getInstance(node, key);
- Assertions.assertTrue(instance.hasCommand(txnId));
- return instance.command(txnId);
+ CommandStore commandStore = getCommandShard(node, key);
+ Assertions.assertTrue(commandStore.hasCommand(txnId));
+ return commandStore.command(txnId);
}
private static void assertStatus(Node node, Key key, TxnId txnId, Status status)
@@ -49,8 +45,8 @@ public class RecoverTest
private static void assertMissing(Node node, Key key, TxnId txnId)
{
- Instance instance = getInstance(node, key);
- Assertions.assertFalse(instance.hasCommand(txnId));
+ CommandStore commandStore = getCommandShard(node, key);
+ Assertions.assertFalse(commandStore.hasCommand(txnId));
}
private static void assertTimeout(CompletionStage<?> f)
@@ -75,39 +71,41 @@ public class RecoverTest
void conflictTest() throws Throwable
{
Key key = IntKey.key(10);
- MockCluster cluster = MockCluster.builder().nodes(9).replication(9).build();
- cluster.networkFilter.isolate(ids(7, 9));
- cluster.networkFilter.addFilter(anyId(), isId(ids(5, 6)), notMessageType(PreAccept.class));
-
- TxnId txnId1 = new TxnId(100, 0, id(100));
- Txn txn1 = writeTxn(Keys.of(key));
- assertTimeout(Coordinate.execute(cluster.get(1), txnId1, txn1));
-
- TxnId txnId2 = new TxnId(50, 0, id(101));
- Txn txn2 = writeTxn(Keys.of(key));
- cluster.networkFilter.clear();
- cluster.networkFilter.isolate(ids(1, 7));
- assertTimeout(Coordinate.execute(cluster.get(9), txnId2, txn2));
-
- cluster.nodes(ids(1, 4)).forEach(n -> assertStatus(n, key, txnId1, Status.Accepted));
- cluster.nodes(ids(5, 6)).forEach(n -> assertStatus(n, key, txnId1, Status.PreAccepted));
- cluster.nodes(ids(7, 9)).forEach(n -> assertMissing(n, key, txnId1));
-
- cluster.nodes(ids(1, 7)).forEach(n -> assertMissing(n, key, txnId2));
- cluster.nodes(ids(8, 9)).forEach(n -> assertStatus(n, key, txnId2, Status.PreAccepted));
-
- //
- cluster.networkFilter.clear();
- cluster.networkFilter.isolate(ids(1, 4));
- Coordinate.recover(cluster.get(8), txnId2, txn2).toCompletableFuture().get();
-
- List<Node> nodes = cluster.nodes(ids(5, 9));
- Assertions.assertTrue(txnId2.compareTo(txnId1) < 0);
- nodes.forEach(n -> assertStatus(n, key, txnId2, Status.Applied));
- nodes.forEach(n -> {
- assertStatus(n, key, txnId2, Status.Applied);
- Command command = getCommand(n, key, txnId2);
- Assertions.assertEquals(txnId1, command.executeAt());
- });
+ try (MockCluster cluster = MockCluster.builder().nodes(9).replication(9).build())
+ {
+ cluster.networkFilter.isolate(ids(7, 9));
+ cluster.networkFilter.addFilter(anyId(), isId(ids(5, 6)), notMessageType(PreAccept.class));
+
+ TxnId txnId1 = new TxnId(100, 0, id(100));
+ Txn txn1 = writeTxn(Keys.of(key));
+ assertTimeout(Coordinate.execute(cluster.get(1), txnId1, txn1));
+
+ TxnId txnId2 = new TxnId(50, 0, id(101));
+ Txn txn2 = writeTxn(Keys.of(key));
+ cluster.networkFilter.clear();
+ cluster.networkFilter.isolate(ids(1, 7));
+ assertTimeout(Coordinate.execute(cluster.get(9), txnId2, txn2));
+
+ cluster.nodes(ids(1, 4)).forEach(n -> assertStatus(n, key, txnId1, Status.Accepted));
+ cluster.nodes(ids(5, 6)).forEach(n -> assertStatus(n, key, txnId1, Status.PreAccepted));
+ cluster.nodes(ids(7, 9)).forEach(n -> assertMissing(n, key, txnId1));
+
+ cluster.nodes(ids(1, 7)).forEach(n -> assertMissing(n, key, txnId2));
+ cluster.nodes(ids(8, 9)).forEach(n -> assertStatus(n, key, txnId2, Status.PreAccepted));
+
+ //
+ cluster.networkFilter.clear();
+ cluster.networkFilter.isolate(ids(1, 4));
+ Coordinate.recover(cluster.get(8), txnId2, txn2).toCompletableFuture().get();
+
+ List<Node> nodes = cluster.nodes(ids(5, 9));
+ Assertions.assertTrue(txnId2.compareTo(txnId1) < 0);
+ nodes.forEach(n -> assertStatus(n, key, txnId2, Status.Applied));
+ nodes.forEach(n -> {
+ assertStatus(n, key, txnId2, Status.Applied);
+ Command command = getCommand(n, key, txnId2);
+ Assertions.assertEquals(txnId1, command.executeAt());
+ });
+ }
}
}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
new file mode 100644
index 0000000..832274a
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/QuorumTrackerTest.java
@@ -0,0 +1,107 @@
+package accord.coordinate.tracking;
+
+import accord.Utils;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static accord.Utils.shards;
+
+public class QuorumTrackerTest
+{
+ private static final Node.Id[] ids = Utils.ids(5).toArray(Node.Id[]::new);
+ private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+ private static final Shards topology = TopologyUtils.initialTopology(ids, ranges, 3);
+ /*
+ (000, 100](100, 200](200, 300](300, 400](400, 500]
+ [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
+ */
+
+ private static void assertResponseState(QuorumTracker responses,
+ boolean quorumReached,
+ boolean failed,
+ boolean hasOutstandingResponses)
+ {
+ Assertions.assertEquals(quorumReached, responses.hasReachedQuorum());
+ Assertions.assertEquals(failed, responses.hasFailed());
+ Assertions.assertEquals(hasOutstandingResponses, responses.hasInFlight());
+ }
+
+ @Test
+ void singleShard()
+ {
+ Shards subShards = shards(topology.get(0));
+ QuorumTracker responses = new QuorumTracker(subShards);
+
+ responses.recordSuccess(ids[0]);
+ assertResponseState(responses, false, false, true);
+
+ responses.recordSuccess(ids[1]);
+ assertResponseState(responses, true, false, true);
+
+ responses.recordSuccess(ids[2]);
+ assertResponseState(responses, true, false, false);
+ }
+
+ /**
+ * responses from unexpected endpoints should be ignored
+ */
+ @Test
+ void unexpectedResponsesAreIgnored()
+ {
+ Shards subShards = shards(topology.get(0));
+ QuorumTracker responses = new QuorumTracker(subShards);
+
+ responses.recordSuccess(ids[0]);
+ assertResponseState(responses, false, false, true);
+
+ responses.recordSuccess(ids[1]);
+ assertResponseState(responses, true, false, true);
+
+ Assertions.assertFalse(subShards.get(0).nodes.contains(ids[4]));
+ responses.recordSuccess(ids[4]);
+ assertResponseState(responses, true, false, true);
+ }
+
+ @Test
+ void failure()
+ {
+ Shards subShards = shards(topology.get(0));
+ QuorumTracker responses = new QuorumTracker(subShards);
+
+ responses.recordSuccess(ids[0]);
+ assertResponseState(responses, false, false, true);
+
+ responses.recordFailure(ids[1]);
+ assertResponseState(responses, false, false, true);
+
+ responses.recordFailure(ids[2]);
+ assertResponseState(responses, false, true, false);
+ }
+
+ @Test
+ void multiShard()
+ {
+ Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+ QuorumTracker responses = new QuorumTracker(subShards);
+ /*
+ (000, 100](100, 200](200, 300]
+ [1, 2, 3] [2, 3, 4] [3, 4, 5]
+ */
+
+ Assertions.assertSame(subShards.get(0), responses.unsafeGet(0).shard);
+ Assertions.assertSame(subShards.get(1), responses.unsafeGet(1).shard);
+ Assertions.assertSame(subShards.get(2), responses.unsafeGet(2).shard);
+
+ responses.recordSuccess(ids[1]);
+ assertResponseState(responses, false, false, true);
+ responses.recordSuccess(ids[2]);
+ assertResponseState(responses, false, false, true);
+ responses.recordSuccess(ids[3]);
+ assertResponseState(responses, true, false, true);
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
new file mode 100644
index 0000000..0513dd1
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/tracking/ReadTrackerTest.java
@@ -0,0 +1,131 @@
+package accord.coordinate.tracking;
+
+import accord.Utils;
+import accord.impl.TopologyUtils;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static accord.Utils.shards;
+
+public class ReadTrackerTest
+{
+ private static final Node.Id[] ids = Utils.ids(5).toArray(Node.Id[]::new);
+ private static final KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+ private static final Shards topology = TopologyUtils.initialTopology(ids, ranges, 3);
+ /*
+ (000, 100](100, 200](200, 300](300, 400](400, 500]
+ [1, 2, 3] [2, 3, 4] [3, 4, 5] [4, 5, 1] [5, 1, 2]
+ */
+
+ private static void assertResponseState(ReadTracker responses,
+ boolean complete,
+ boolean failed)
+ {
+ Assertions.assertEquals(complete, responses.hasCompletedRead());
+ Assertions.assertEquals(failed, responses.hasFailed());
+ }
+
+ @Test
+ void singleShard()
+ {
+ Shards subShards = shards(topology.get(0));
+ ReadTracker tracker = new ReadTracker(subShards);
+
+ tracker.recordInflightRead(ids[0]);
+ assertResponseState(tracker, false, false);
+
+ tracker.recordReadSuccess(ids[0]);
+ assertResponseState(tracker, true, false);
+ }
+
+ @Test
+ void singleShardRetry()
+ {
+ Shards subShards = shards(topology.get(0));
+ ReadTracker tracker = new ReadTracker(subShards);
+
+ tracker.recordInflightRead(ids[0]);
+ assertResponseState(tracker, false, false);
+
+ tracker.recordReadFailure(ids[0]);
+ assertResponseState(tracker, false, false);
+
+ tracker.recordInflightRead(ids[1]);
+ assertResponseState(tracker, false, false);
+
+ tracker.recordReadSuccess(ids[1]);
+ assertResponseState(tracker, true, false);
+ }
+
+ @Test
+ void singleShardFailure()
+ {
+ Shards subShards = shards(topology.get(0));
+ ReadTracker tracker = new ReadTracker(subShards);
+
+ tracker.recordInflightRead(ids[0]);
+ tracker.recordReadFailure(ids[0]);
+ assertResponseState(tracker, false, false);
+
+ tracker.recordInflightRead(ids[1]);
+ tracker.recordReadFailure(ids[1]);
+ assertResponseState(tracker, false, false);
+
+ tracker.recordInflightRead(ids[2]);
+ tracker.recordReadFailure(ids[2]);
+ assertResponseState(tracker, false, true);
+ }
+
+ @Test
+ void multiShardSuccess()
+ {
+ Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+ ReadTracker responses = new ReadTracker(subShards);
+ /*
+ (000, 100](100, 200](200, 300]
+ [1, 2, 3] [2, 3, 4] [3, 4, 5]
+ */
+
+ responses.recordInflightRead(ids[2]);
+ responses.recordReadSuccess(ids[2]);
+ assertResponseState(responses, true, false);
+ }
+
+ @Test
+ void multiShardRetryAndReadSet()
+ {
+ Shards subShards = new Shards(new Shard[]{topology.get(0), topology.get(1), topology.get(2)});
+ ReadTracker responses = new ReadTracker(subShards);
+ /*
+ (000, 100](100, 200](200, 300]
+ [1, 2, 3] [2, 3, 4] [3, 4, 5]
+ */
+
+ Assertions.assertEquals(Sets.newHashSet(ids[2]), responses.computeMinimalReadSetAndMarkInflight());
+
+ assertResponseState(responses, false, false);
+
+ responses.recordReadFailure(ids[2]);
+ assertResponseState(responses, false, false);
+
+ Assertions.assertEquals(Sets.newHashSet(ids[1], ids[3]), responses.computeMinimalReadSetAndMarkInflight());
+ assertResponseState(responses, false, false);
+
+ responses.recordReadFailure(ids[1]);
+ Assertions.assertEquals(Sets.newHashSet(ids[0]), responses.computeMinimalReadSetAndMarkInflight());
+
+ responses.recordReadSuccess(ids[3]);
+ assertResponseState(responses, false, false);
+ Assertions.assertEquals(Collections.emptySet(), responses.computeMinimalReadSetAndMarkInflight());
+
+ responses.recordReadSuccess(ids[0]);
+ assertResponseState(responses, true, false);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 7d1c696..3a044a6 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -6,6 +6,7 @@ import java.util.zip.CRC32C;
import accord.api.Key;
import accord.api.KeyRange;
+import accord.topology.KeyRanges;
import accord.txn.Keys;
public class IntHashKey implements Key<IntHashKey>
@@ -16,6 +17,35 @@ public class IntHashKey implements Key<IntHashKey>
{
super(start, end);
}
+
+ @Override
+ public KeyRange<IntHashKey> subRange(IntHashKey start, IntHashKey end)
+ {
+ return new Range(start, end);
+ }
+
+ @Override
+ public KeyRanges split(int count)
+ {
+ int startHash = start().hash;
+ int endHash = end().hash;
+ int currentSize = endHash - startHash;
+ if (currentSize < count)
+ return new KeyRanges(new KeyRange[]{this});
+ int interval = currentSize / count;
+
+ int last = 0;
+ KeyRange[] ranges = new KeyRange[count];
+ for (int i=0; i<count; i++)
+ {
+ int subStart = i > 0 ? last : startHash;
+ int subEnd = i < count - 1 ? subStart + interval : endHash;
+ ranges[i] = new Range(new IntHashKey(Integer.MIN_VALUE, subStart),
+ new IntHashKey(Integer.MIN_VALUE, subEnd));
+ last = subEnd;
+ }
+ return new KeyRanges(ranges);
+ }
}
public final int key;
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index eb57364..a57f995 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -3,9 +3,11 @@ package accord.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.function.BiFunction;
import accord.api.Key;
import accord.api.KeyRange;
+import accord.topology.KeyRanges;
import accord.txn.Keys;
public class IntKey implements Key<IntKey>
@@ -16,6 +18,39 @@ public class IntKey implements Key<IntKey>
{
super(start, end);
}
+
+ @Override
+ public KeyRange<IntKey> subRange(IntKey start, IntKey end)
+ {
+ return new Range(start, end);
+ }
+
+ @Override
+ public KeyRanges split(int count)
+ {
+ return splitRange(this, count, Range::new);
+ }
+ }
+
+ public static KeyRanges splitRange(KeyRange<IntKey> range, int count, BiFunction<IntKey, IntKey, KeyRange<IntKey>> ctor)
+ {
+ int start = range.start().key;
+ int end = range.end().key;
+ int currentSize = end - start;
+ if (currentSize < count)
+ return new KeyRanges(new KeyRange[]{range});
+ int interval = currentSize / count;
+
+ int nextStart = start;
+ KeyRange[] ranges = new KeyRange[count];
+ for (int i=0; i<count; i++)
+ {
+ int subStart = nextStart;
+ int subEnd = i < count - 1 ? subStart + interval : end;
+ ranges[i] = ctor.apply(key(subStart), key(subEnd));
+ nextStart = subEnd;
+ }
+ return new KeyRanges(ranges);
}
public final int key;
diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/test/java/accord/impl/TopologyFactory.java
index ccdde7e..3cdd985 100644
--- a/accord-core/src/test/java/accord/impl/TopologyFactory.java
+++ b/accord-core/src/test/java/accord/impl/TopologyFactory.java
@@ -2,19 +2,17 @@ package accord.impl;
import accord.local.Node;
-import accord.local.Node.Id;
import accord.api.Key;
import accord.api.KeyRange;
-import accord.topology.Shard;
+import accord.topology.KeyRanges;
import accord.topology.Shards;
-import accord.utils.WrapAroundList;
-import accord.utils.WrapAroundSet;
import java.util.*;
public class TopologyFactory<K extends Key<K>>
{
public final int rf;
+ // TODO: convert to KeyRanges
final KeyRange<K>[] ranges;
public TopologyFactory(int rf, KeyRange<K>... ranges)
@@ -25,25 +23,7 @@ public class TopologyFactory<K extends Key<K>>
public Shards toShards(Node.Id[] cluster)
{
- final Map<Node.Id, Integer> lookup = new HashMap<>();
- for (int i = 0 ; i < cluster.length ; ++i)
- lookup.put(cluster[i], i);
-
- List<WrapAroundList<Id>> electorates = new ArrayList<>();
- List<Set<Node.Id>> fastPathElectorates = new ArrayList<>();
-
- for (int i = 0 ; i < cluster.length + rf - 1 ; ++i)
- {
- WrapAroundList<Node.Id> electorate = new WrapAroundList<>(cluster, i % cluster.length, (i + rf) % cluster.length);
- Set<Node.Id> fastPathElectorate = new WrapAroundSet<>(lookup, electorate);
- electorates.add(electorate);
- fastPathElectorates.add(fastPathElectorate);
- }
-
- final List<Shard> shards = new ArrayList<>();
- for (int i = 0 ; i < ranges.length ; ++i)
- shards.add(new Shard(ranges[i], electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
- return new Shards(shards.toArray(Shard[]::new));
+ return TopologyUtils.initialTopology(cluster, new KeyRanges(ranges), rf);
}
public Shards toShards(List<Node.Id> cluster)
diff --git a/accord-core/src/test/java/accord/impl/TopologyUtils.java b/accord-core/src/test/java/accord/impl/TopologyUtils.java
new file mode 100644
index 0000000..ade2863
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/TopologyUtils.java
@@ -0,0 +1,57 @@
+package accord.impl;
+
+import accord.api.Key;
+import accord.api.KeyRange;
+import accord.local.Node;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import accord.utils.WrapAroundList;
+import accord.utils.WrapAroundSet;
+
+import java.util.*;
+
+public class TopologyUtils
+{
+ public static KeyRanges initialRanges(int num, int maxKey)
+ {
+ int rangeSize = maxKey / num;
+ KeyRange<IntKey>[] ranges = new KeyRange[num];
+ int end = 0;
+ for (int i=0; i<num; i++)
+ {
+ int start = end;
+ end = i == num - 1 ? maxKey : start + rangeSize;
+ ranges[i] = IntKey.range(start, end);
+ }
+ return new KeyRanges(ranges);
+ }
+
+ public static <K extends Key<K>> Shards initialTopology(Node.Id[] cluster, KeyRanges ranges, int rf)
+ {
+ final Map<Node.Id, Integer> lookup = new HashMap<>();
+ for (int i = 0 ; i < cluster.length ; ++i)
+ lookup.put(cluster[i], i);
+
+ List<WrapAroundList<Node.Id>> electorates = new ArrayList<>();
+ List<Set<Node.Id>> fastPathElectorates = new ArrayList<>();
+
+ for (int i = 0 ; i < cluster.length + rf - 1 ; ++i)
+ {
+ WrapAroundList<Node.Id> electorate = new WrapAroundList<>(cluster, i % cluster.length, (i + rf) % cluster.length);
+ Set<Node.Id> fastPathElectorate = new WrapAroundSet<>(lookup, electorate);
+ electorates.add(electorate);
+ fastPathElectorates.add(fastPathElectorate);
+ }
+
+ final List<Shard> shards = new ArrayList<>();
+ for (int i = 0 ; i < ranges.size() ; ++i)
+ shards.add(new Shard(ranges.get(i), electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
+ return new Shards(shards.toArray(Shard[]::new));
+ }
+
+ public static <K extends Key<K>> Shards initialTopology(List<Node.Id> cluster, KeyRanges ranges, int rf)
+ {
+ return initialTopology(cluster.toArray(Node.Id[]::new), ranges, rf);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/TopologyUtilsTest.java b/accord-core/src/test/java/accord/impl/TopologyUtilsTest.java
new file mode 100644
index 0000000..66f9bea
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/TopologyUtilsTest.java
@@ -0,0 +1,17 @@
+package accord.impl;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static accord.Utils.ranges;
+import static accord.impl.IntKey.range;
+
+public class TopologyUtilsTest
+{
+ @Test
+ void initialRangesTest()
+ {
+ Assertions.assertEquals(ranges(range(0, 100), range(100, 200), range(200, 300)),
+ TopologyUtils.initialRanges(3, 300));
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 5ffafe9..a00727b 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -17,6 +17,7 @@ import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import accord.local.CommandStore;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
@@ -30,7 +31,6 @@ import accord.topology.Shards;
public class Cluster implements Scheduler
{
-
final Function<Id, Node> lookup;
final PendingQueue pending;
final Consumer<Packet> responseSink;
@@ -143,23 +143,30 @@ public class Cluster implements Scheduler
{
Shards shards = topologyFactory.toShards(nodes);
Map<Id, Node> lookup = new HashMap<>();
- Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
- for (Id node : nodes)
- lookup.put(node, new Node(node, shards, shards.forNode(node), sinks.create(node, randomSupplier.get()),
- randomSupplier.get(), nowSupplier.get(), ListStore::new, ListAgent.INSTANCE, sinks));
-
- List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
- sinks.recurring(() ->
+ try
{
- Collections.shuffle(nodesList, randomSupplier.get());
- int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
- sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
- }, 5L, TimeUnit.SECONDS);
-
- Packet next;
- while ((next = in.get()) != null)
- sinks.add(next);
-
- while (sinks.processPending());
+ Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
+ for (Id node : nodes)
+ lookup.put(node, new Node(node, shards, sinks.create(node, randomSupplier.get()), randomSupplier.get(),
+ nowSupplier.get(), ListStore::new, ListAgent.INSTANCE, sinks, CommandStore.Factory.SYNCHRONIZED));
+
+ List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
+ sinks.recurring(() ->
+ {
+ Collections.shuffle(nodesList, randomSupplier.get());
+ int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
+ sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
+ }, 5L, TimeUnit.SECONDS);
+
+ Packet next;
+ while ((next = in.get()) != null)
+ sinks.add(next);
+
+ while (sinks.processPending());
+ }
+ finally
+ {
+ lookup.values().forEach(Node::shutdown);
+ }
}
}
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index 0482567..f48877e 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -6,10 +6,10 @@ import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import accord.coordinate.Timeout;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
-import accord.messages.Timeout;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index f70ffeb..57c5317 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -1,6 +1,7 @@
package accord.impl.list;
import accord.api.*;
+import accord.topology.KeyRanges;
import accord.txn.Keys;
import static java.lang.Math.max;
@@ -15,15 +16,20 @@ public class ListRead implements Read
}
@Override
- public Data read(KeyRange range, Store store)
+ public Data read(KeyRanges ranges, Store store)
{
ListStore s = (ListStore)store;
ListData result = new ListData();
- int lowIdx = range.lowKeyIndex(keys);
- if (lowIdx < 0)
- return result;
- for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
- result.put(keys.get(i), s.get(keys.get(i)));
+ for (KeyRange range : ranges)
+ {
+ int lowIdx = range.lowKeyIndex(keys);
+ if (lowIdx < -keys.size())
+ return result;
+ if (lowIdx < 0)
+ continue;
+ for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
+ result.put(keys.get(i), s.get(keys.get(i)));
+ }
return result;
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 20694f1..0f9aeaf 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -8,18 +8,22 @@ import accord.api.Key;
import accord.api.KeyRange;
import accord.api.Store;
import accord.api.Write;
+import accord.topology.KeyRanges;
import accord.txn.Timestamp;
import accord.utils.Timestamped;
public class ListWrite extends TreeMap<Key, int[]> implements Write
{
@Override
- public void apply(KeyRange range, Timestamp executeAt, Store store)
+ public void apply(KeyRanges ranges, Timestamp executeAt, Store store)
{
ListStore s = (ListStore) store;
- NavigableMap<Key, int[]> selection = subMap(range.start(), range.startInclusive(),
- range.end(), range.endInclusive());
- for (Map.Entry<Key, int[]> e : selection.entrySet())
- s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+ for (KeyRange range : ranges)
+ {
+ NavigableMap<Key, int[]> selection = subMap(range.start(), range.startInclusive(),
+ range.end(), range.endInclusive());
+ for (Map.Entry<Key, int[]> e : selection.entrySet())
+ s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+ }
}
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 8580448..a55b533 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -1,19 +1,20 @@
package accord.impl.mock;
import accord.NetworkFilter;
+import accord.coordinate.Timeout;
+import accord.impl.TopologyUtils;
+import accord.local.CommandStore;
import accord.local.Node;
import accord.local.Node.Id;
-import accord.messages.Timeout;
+import accord.topology.KeyRanges;
import accord.utils.ThreadPoolScheduler;
import accord.txn.TxnId;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
-import accord.impl.IntKey;
import accord.impl.TestAgent;
import accord.topology.Shards;
import accord.topology.Topology;
-import accord.impl.TopologyFactory;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,7 +26,7 @@ import java.util.function.LongSupplier;
import static accord.Utils.id;
-public class MockCluster implements Network
+public class MockCluster implements Network, AutoCloseable
{
private static final Logger logger = LoggerFactory.getLogger(MockCluster.class);
@@ -46,6 +47,12 @@ public class MockCluster implements Network
init();
}
+ @Override
+ public void close()
+ {
+ nodes.values().forEach(Node::shutdown);
+ }
+
private synchronized Id nextNodeId()
{
return id(nextNodeId++);
@@ -61,35 +68,34 @@ public class MockCluster implements Network
return System.currentTimeMillis();
}
- private Node createNode(Id id, Shards local, Topology topology)
+ private Node createNode(Id id, Topology topology)
{
MockStore store = new MockStore();
return new Node(id,
- topology,
- local,
- new SimpleMessageSink(id, this),
- new Random(random.nextLong()),
- this::now,
- () -> store,
- new TestAgent(),
- new ThreadPoolScheduler());
+ topology,
+ new SimpleMessageSink(id, this),
+ new Random(random.nextLong()),
+ this::now,
+ () -> store,
+ new TestAgent(),
+ new ThreadPoolScheduler(),
+ CommandStore.Factory.SINGLE_THREAD);
}
private void init()
{
- Preconditions.checkArgument(config.initialNodes == config.replication, "TODO");
List<Id> ids = new ArrayList<>(config.initialNodes);
for (int i=0; i<config.initialNodes; i++)
{
Id nextId = nextNodeId();
ids.add(nextId);
}
- TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(config.replication, IntKey.range(0, config.maxKey));
- Shards topology = topologyFactory.toShards(ids);
+ KeyRanges ranges = TopologyUtils.initialRanges(config.initialNodes, config.maxKey);
+ Shards topology = TopologyUtils.initialTopology(ids, ranges, config.replication);
for (int i=0; i<config.initialNodes; i++)
{
Id id = ids.get(i);
- Node node = createNode(id, topology.forNode(id), topology);
+ Node node = createNode(id, topology);
nodes.put(id, node);
}
}
diff --git a/accord-core/src/test/java/accord/local/CommandStoreTest.java b/accord-core/src/test/java/accord/local/CommandStoreTest.java
new file mode 100644
index 0000000..8974475
--- /dev/null
+++ b/accord-core/src/test/java/accord/local/CommandStoreTest.java
@@ -0,0 +1,92 @@
+package accord.local;
+
+import accord.Utils;
+import accord.api.KeyRange;
+import accord.impl.IntKey;
+import accord.impl.TopologyUtils;
+import accord.topology.KeyRanges;
+import accord.topology.Shard;
+import accord.topology.Topology;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static accord.impl.IntKey.key;
+
+public class CommandStoreTest
+{
+ @Test
+ void topologyChangeTest()
+ {
+ List<Node.Id> ids = Utils.ids(5);
+ KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+ Topology topology = TopologyUtils.initialTopology(ids, ranges, 3);
+ Topology local = topology.forNode(ids.get(0));
+
+ KeyRanges shards = CommandStores.shardRanges(local.ranges(), 10).get(0);
+ Assertions.assertEquals(ranges(r(0, 10), r(300, 310), r(400, 410)), shards);
+
+ CommandStore commandStore = new CommandStore.Synchronized(0, ids.get(0), null, null, null);
+ commandStore.updateTopology(topology, shards.union(r(350, 360)), KeyRanges.EMPTY);
+ commandStore.commandsForKey(key(355));
+ commandStore.commandsForKey(key(356));
+ commandStore.commandsForKey(key(357));
+
+ Assertions.assertTrue(commandStore.hasCommandsForKey(key(355)));
+ Assertions.assertTrue(commandStore.hasCommandsForKey(key(356)));
+ Assertions.assertTrue(commandStore.hasCommandsForKey(key(357)));
+
+ commandStore.updateTopology(topology, KeyRanges.EMPTY, ranges(r(355, 360)));
+
+ Assertions.assertTrue(commandStore.hasCommandsForKey(key(355)));
+ Assertions.assertFalse(commandStore.hasCommandsForKey(key(356)));
+ Assertions.assertFalse(commandStore.hasCommandsForKey(key(357)));
+ }
+
+ private static Shard[] shards(Topology topology, int... indexes)
+ {
+ Shard[] shards = new Shard[indexes.length];
+
+ for (int i=0; i<indexes.length; i++)
+ shards[i] = topology.get(indexes[i]);
+ return shards;
+ }
+
+ private static void assertMapping(KeyRanges ranges, Shard[] shards, CommandStore.RangeMapping mapping)
+ {
+ Assertions.assertEquals(ranges, mapping.ranges);
+ Assertions.assertArrayEquals(shards, mapping.shards);
+ }
+
+ private static KeyRanges ranges(KeyRange... ranges)
+ {
+ return new KeyRanges(ranges);
+ }
+
+ private static KeyRange<IntKey> r(int start, int end)
+ {
+ return IntKey.range(start, end);
+ }
+
+ @Test
+ void mapRangesTest()
+ {
+ List<Node.Id> ids = Utils.ids(5);
+ KeyRanges ranges = TopologyUtils.initialRanges(5, 500);
+ Topology topology = TopologyUtils.initialTopology(ids, ranges, 3);
+ Topology local = topology.forNode(ids.get(0));
+
+ KeyRanges shards = CommandStores.shardRanges(local.ranges(), 10).get(0);
+ Assertions.assertEquals(ranges(r(0, 10), r(300, 310), r(400, 410)), shards);
+
+ assertMapping(shards, shards(local, 0, 1, 2),
+ CommandStore.mapRanges(shards, local));
+ assertMapping(ranges(r(0, 10), r(300, 310), r(390, 400), r(400, 410)), shards(local, 0, 1, 1, 2),
+ CommandStore.mapRanges(ranges(r(0, 10), r(300, 310), r(390, 410)), local));
+
+ assertMapping(ranges(r(0, 10), r(300, 310), r(350, 360), r(400, 410)), shards(local, 0, 1, 1, 2),
+ CommandStore.mapRanges(ranges(r(0, 10), r(300, 310), r(350, 360), r(400, 410)), local));
+
+ }
+}
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 3082f70..cdba24b 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -1,6 +1,5 @@
package accord.messages;
-import accord.local.Instance;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
@@ -42,7 +41,8 @@ public class PreAcceptTest
Random random = new Random();
MockStore store = new MockStore();
Scheduler scheduler = new ThreadPoolScheduler();
- return new Node(nodeId, TOPOLOGY, TOPOLOGY.forNode(nodeId), messageSink, random, clock, () -> store, new TestAgent(), scheduler);
+ return new Node(nodeId, TOPOLOGY, messageSink, random, clock, () -> store,
+ new TestAgent(), scheduler, CommandStore.Factory.SINGLE_THREAD);
}
@Test
@@ -52,23 +52,30 @@ public class PreAcceptTest
Clock clock = new Clock(100);
Node node = createNode(ID1, messageSink, clock);
- IntKey key = IntKey.key(10);
- Instance instance = node.local(key).orElseThrow();
- Assertions.assertFalse(instance.hasCommandsForKey(key));
-
- TxnId txnId = clock.idForNode(ID2);
- Txn txn = writeTxn(Keys.of(key));
- PreAccept preAccept = new PreAccept(txnId, txn);
- clock.increment(10);
- preAccept.process(node, ID2, 0);
-
- Command command = instance.commandsForKey(key).uncommitted.get(txnId);
- Assertions.assertEquals(Status.PreAccepted, command.status());
-
- messageSink.assertHistorySizes(0, 1);
- Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
- Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, new Dependencies()),
- messageSink.responses.get(0).payload);
+ try
+ {
+ IntKey key = IntKey.key(10);
+ CommandStore commandStore = node.local(key).orElseThrow();
+ Assertions.assertFalse(commandStore.hasCommandsForKey(key));
+
+ TxnId txnId = clock.idForNode(ID2);
+ Txn txn = writeTxn(Keys.of(key));
+ PreAccept preAccept = new PreAccept(txnId, txn);
+ clock.increment(10);
+ preAccept.process(node, ID2, 0);
+
+ Command command = commandStore.commandsForKey(key).uncommitted.get(txnId);
+ Assertions.assertEquals(Status.PreAccepted, command.status());
+
+ messageSink.assertHistorySizes(0, 1);
+ Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, new Dependencies()),
+ messageSink.responses.get(0).payload);
+ }
+ finally
+ {
+ node.shutdown();
+ }
}
@Test
@@ -77,15 +84,21 @@ public class PreAcceptTest
RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
Clock clock = new Clock(100);
Node node = createNode(ID1, messageSink, clock);
-
- IntKey key = IntKey.key(10);
- Instance instance = node.local(key).orElseThrow();
- Assertions.assertFalse(instance.hasCommandsForKey(key));
-
- TxnId txnId = clock.idForNode(ID2);
- Txn txn = writeTxn(Keys.of(key));
- PreAccept preAccept = new PreAccept(txnId, txn);
- preAccept.process(node, ID2, 0);
+ try
+ {
+ IntKey key = IntKey.key(10);
+ CommandStore commandStore = node.local(key).orElseThrow();
+ Assertions.assertFalse(commandStore.hasCommandsForKey(key));
+
+ TxnId txnId = clock.idForNode(ID2);
+ Txn txn = writeTxn(Keys.of(key));
+ PreAccept preAccept = new PreAccept(txnId, txn);
+ preAccept.process(node, ID2, 0);
+ }
+ finally
+ {
+ node.shutdown();
+ }
}
@Test
@@ -99,22 +112,29 @@ public class PreAcceptTest
RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
Clock clock = new Clock(100);
Node node = createNode(ID1, messageSink, clock);
-
- IntKey key1 = IntKey.key(10);
- PreAccept preAccept1 = new PreAccept(clock.idForNode(ID2), writeTxn(Keys.of(key1)));
- preAccept1.process(node, ID2, 0);
-
- messageSink.clearHistory();
- IntKey key2 = IntKey.key(11);
- TxnId txnId2 = new TxnId(50, 0, ID3);
- PreAccept preAccept2 = new PreAccept(txnId2, writeTxn(Keys.of(key1, key2)));
- clock.increment(10);
- preAccept2.process(node, ID3, 0);
-
- messageSink.assertHistorySizes(0, 1);
- Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
- Dependencies expectedDeps = new Dependencies();
- Assertions.assertEquals(new PreAccept.PreAcceptOk(new TxnId(110, 0, ID1), expectedDeps),
- messageSink.responses.get(0).payload);
+ try
+ {
+
+ IntKey key1 = IntKey.key(10);
+ PreAccept preAccept1 = new PreAccept(clock.idForNode(ID2), writeTxn(Keys.of(key1)));
+ preAccept1.process(node, ID2, 0);
+
+ messageSink.clearHistory();
+ IntKey key2 = IntKey.key(11);
+ TxnId txnId2 = new TxnId(50, 0, ID3);
+ PreAccept preAccept2 = new PreAccept(txnId2, writeTxn(Keys.of(key1, key2)));
+ clock.increment(10);
+ preAccept2.process(node, ID3, 0);
+
+ messageSink.assertHistorySizes(0, 1);
+ Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
+ Dependencies expectedDeps = new Dependencies();
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(new TxnId(110, 0, ID1), expectedDeps),
+ messageSink.responses.get(0).payload);
+ }
+ finally
+ {
+ node.shutdown();
+ }
}
}
diff --git a/accord-core/src/test/java/accord/topology/TopologyTest.java b/accord-core/src/test/java/accord/topology/TopologyTest.java
index 76a1fa9..154e0f6 100644
--- a/accord-core/src/test/java/accord/topology/TopologyTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyTest.java
@@ -18,7 +18,6 @@ import static accord.impl.IntKey.range;
public class TopologyTest
{
-
private static void assertRangeForKey(Topology topology, int key, int start, int end)
{
Key expectedKey = key(key);
@@ -55,12 +54,28 @@ public class TopologyTest
return IntKey.range(start, end);
}
+ private static void assertNoRangeForKey(Topology topology, int key)
+ {
+ try
+ {
+ topology.forKey(key(key));
+ Assertions.fail("Expected exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // noop
+ }
+ }
+
@Test
void forKeyTest()
{
- Topology topology = topology(r(0, 100), r(100, 200), r(200, 300));
+ Topology topology = topology(r(0, 100), r(100, 200), r(300, 400));
+ assertNoRangeForKey(topology, -50);
assertRangeForKey(topology, 50, 0, 100);
assertRangeForKey(topology, 100, 0, 100);
+ assertNoRangeForKey(topology, 250);
+ assertRangeForKey(topology, 350, 300, 400);
}
@Test
@@ -68,10 +83,4 @@ public class TopologyTest
{
}
-
- @Test
- void sequentialRanges()
- {
- // TODO: confirm non-sequential ranges are handled properly
- }
}
diff --git a/accord-core/src/test/java/accord/utils/KeyRangeTest.java b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
index b6ce4a4..b14efa4 100644
--- a/accord-core/src/test/java/accord/utils/KeyRangeTest.java
+++ b/accord-core/src/test/java/accord/utils/KeyRangeTest.java
@@ -3,6 +3,7 @@ package accord.utils;
import accord.api.Key;
import accord.api.KeyRange;
import accord.impl.IntKey;
+import accord.topology.KeyRanges;
import accord.txn.Keys;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -14,14 +15,59 @@ public class KeyRangeTest
return new IntKey(v);
}
+ private static KeyRange<IntKey> r(int start, int end)
+ {
+ return IntKey.range(start, end);
+ }
+
+ private static class EndInclusiveIntRange extends KeyRange.EndInclusive<IntKey>
+ {
+ public EndInclusiveIntRange(IntKey start, IntKey end)
+ {
+ super(start, end);
+ }
+
+ @Override
+ public KeyRange<IntKey> subRange(IntKey start, IntKey end)
+ {
+ return new EndInclusiveIntRange(start, end);
+ }
+
+ @Override
+ public KeyRanges split(int count)
+ {
+ return IntKey.splitRange(this, count, EndInclusiveIntRange::new);
+ }
+ }
+
+ private static class StartInclusiveIntRange extends KeyRange.StartInclusive<IntKey>
+ {
+ public StartInclusiveIntRange(IntKey start, IntKey end)
+ {
+ super(start, end);
+ }
+
+ @Override
+ public KeyRange<IntKey> subRange(IntKey start, IntKey end)
+ {
+ return new StartInclusiveIntRange(start, end);
+ }
+
+ @Override
+ public KeyRanges split(int count)
+ {
+ return IntKey.splitRange(this, count, StartInclusiveIntRange::new);
+ }
+ }
+
static KeyRange<IntKey> rangeEndIncl(int start, int end)
{
- return new KeyRange.EndInclusive<>(k(start), k(end)) {};
+ return new EndInclusiveIntRange(k(start), k(end));
}
static KeyRange<IntKey> rangeStartIncl(int start, int end)
{
- return new KeyRange.StartInclusive<>(k(start), k(end)) {};
+ return new StartInclusiveIntRange(k(start), k(end));
}
static Keys keys(int... values)
@@ -108,7 +154,7 @@ public class KeyRangeTest
assertHigherKeyIndex(7, rangeStartIncl(20, 25), keys);
}
- private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+ private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys, int lowerBound, int upperBound)
{
if (expectedIdx >= 0 && expectedIdx < keys.size())
{
@@ -116,14 +162,20 @@ public class KeyRangeTest
}
else
{
- Assertions.assertFalse(range.containsKey(keys.get(0)));
- Assertions.assertFalse(range.containsKey(keys.get(keys.size() - 1)));
+ Assertions.assertFalse(range.containsKey(keys.get(lowerBound)));
+ Assertions.assertFalse(range.containsKey(keys.get(upperBound - 1)));
}
- int actualIdx = range.lowKeyIndex(keys);
+ int actualIdx = range.lowKeyIndex(keys, lowerBound, upperBound);
Assertions.assertEquals(expectedIdx, actualIdx);
}
+ private static void assertLowKeyIndex(int expectedIdx, KeyRange range, Keys keys)
+ {
+
+ assertLowKeyIndex(expectedIdx, range, keys, 0, keys.size());
+ }
+
@Test
void lowKeyIndexTest()
{
@@ -142,9 +194,98 @@ public class KeyRangeTest
assertLowKeyIndex(6, rangeEndIncl(15, 20), keys);
assertLowKeyIndex(5, rangeStartIncl(15, 20), keys);
- assertLowKeyIndex(7, rangeEndIncl(16, 20), keys);
+ assertLowKeyIndex(-8, rangeEndIncl(16, 20), keys);
assertLowKeyIndex(6, rangeStartIncl(16, 20), keys);
- assertLowKeyIndex(7, rangeEndIncl(20, 25), keys);
- assertLowKeyIndex(7, rangeStartIncl(20, 25), keys);
+ assertLowKeyIndex(-8, rangeEndIncl(20, 25), keys);
+ assertLowKeyIndex(-8, rangeStartIncl(20, 25), keys);
+
+ // non-intersecting
+ assertLowKeyIndex(-2, rangeStartIncl(12, 14), keys(10, 16));
+ assertLowKeyIndex(-2, rangeStartIncl(12, 14), keys(10, 16, 18), 1, 3);
+ assertLowKeyIndex(-3, rangeStartIncl(12, 14), keys(10, 16, 18), 2, 3);
+ }
+
+ @Test
+ void fullyContainsTest()
+ {
+ Assertions.assertTrue(r(100, 200).fullyContains(r(100, 200)));
+ Assertions.assertTrue(r(100, 200).fullyContains(r(150, 200)));
+ Assertions.assertTrue(r(100, 200).fullyContains(r(100, 150)));
+ Assertions.assertTrue(r(100, 200).fullyContains(r(125, 175)));
+
+ Assertions.assertFalse(r(100, 200).fullyContains(r(50, 60)));
+ Assertions.assertFalse(r(100, 200).fullyContains(r(100, 250)));
+ Assertions.assertFalse(r(100, 200).fullyContains(r(150, 250)));
+ Assertions.assertFalse(r(100, 200).fullyContains(r(50, 200)));
+ Assertions.assertFalse(r(100, 200).fullyContains(r(50, 150)));
+ Assertions.assertFalse(r(100, 200).fullyContains(r(250, 260)));
+ }
+
+ @Test
+ void compareIntersectingTest()
+ {
+ Assertions.assertEquals(1, r(100, 200).compareIntersecting(r(0, 100)));
+ Assertions.assertEquals(1, r(100, 200).compareIntersecting(r(0, 99)));
+
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(0, 101)));
+
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(99, 199)));
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(99, 200)));
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(99, 201)));
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(101, 199)));
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(125, 175)));
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(100, 201)));
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(101, 201)));
+
+ Assertions.assertEquals(0, r(100, 200).compareIntersecting(r(199, 300)));
+
+ Assertions.assertEquals(-1, r(100, 200).compareIntersecting(r(200, 300)));
+ Assertions.assertEquals(-1, r(100, 200).compareIntersecting(r(201, 300)));
+ }
+
+ private static void assertIntersection(KeyRange<IntKey> expected, KeyRange<IntKey> a, KeyRange<IntKey> b)
+ {
+ Assertions.assertEquals(expected, a.intersection(b));
+ Assertions.assertEquals(expected, b.intersection(a));
+ }
+
+ @Test
+ void intersectionTest()
+ {
+ assertIntersection(r(25, 75), r(0, 75), r(25, 100));
+ assertIntersection(r(0, 75), r(0, 75), r(0, 100));
+ assertIntersection(r(25, 100), r(0, 100), r(25, 100));
+ assertIntersection(r(25, 75), r(0, 100), r(25, 75));
+ assertIntersection(r(0, 100), r(0, 100), r(0, 100));
+ }
+
+ @Test
+ void intersectsTest()
+ {
+ KeyRange range = r(100, 200);
+ Assertions.assertTrue(range.intersects(keys(50, 150, 250)));
+ Assertions.assertTrue(range.intersects(keys(150, 250)));
+ Assertions.assertTrue(range.intersects(keys(50, 150)));
+
+ Assertions.assertFalse(range.intersects(keys()));
+ Assertions.assertFalse(range.intersects(keys(50, 75)));
+ Assertions.assertFalse(range.intersects(keys(50, 75, 250, 300)));
+ Assertions.assertFalse(range.intersects(keys(250, 300)));
+ }
+
+ @Test
+ void tryMergeTest()
+ {
+ // touching
+ Assertions.assertEquals(r(0, 100), r(0, 50).tryMerge(r(50, 100)));
+ Assertions.assertEquals(r(0, 100), r(50, 100).tryMerge(r(0, 50)));
+
+ // intersecting
+ Assertions.assertEquals(r(0, 100), r(0, 75).tryMerge(r(25, 100)));
+ Assertions.assertEquals(r(0, 100), r(25, 100).tryMerge(r(0, 75)));
+
+ // can't merge
+ Assertions.assertNull(r(0, 40).tryMerge(r(60, 100)));
+ Assertions.assertNull(r(60, 100).tryMerge(r(0, 40)));
}
}
diff --git a/accord-core/src/test/java/accord/utils/KeyRangesTest.java b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
index 4b3990e..421a73f 100644
--- a/accord-core/src/test/java/accord/utils/KeyRangesTest.java
+++ b/accord-core/src/test/java/accord/utils/KeyRangesTest.java
@@ -28,4 +28,56 @@ public class KeyRangesTest
Assertions.assertEquals(1, ranges.rangeIndexForKey(IntKey.key(350)));
Assertions.assertEquals(-3, ranges.rangeIndexForKey(IntKey.key(450)));
}
+
+ @Test
+ void differenceTest()
+ {
+ Assertions.assertEquals(ranges(r(100, 125), r(175, 200)),
+ ranges(r(100, 200)).difference(
+ ranges(r(125, 175))));
+ Assertions.assertEquals(ranges(r(125, 175)),
+ ranges(r(100, 200)).difference(
+ ranges(r(100, 125), r(175, 200))));
+ Assertions.assertEquals(ranges(r(100, 175)),
+ ranges(r(100, 200)).difference(
+ ranges(r(0, 75), r(175, 200))));
+ Assertions.assertEquals(ranges(r(100, 200)),
+ ranges(r(100, 200)).difference(
+ ranges(r(0, 75), r(200, 205))));
+
+ Assertions.assertEquals(ranges(r(125, 175), r(300, 350)),
+ ranges(r(100, 200), r(250, 350)).difference(
+ ranges(r(0, 125), r(175, 300))));
+ Assertions.assertEquals(ranges(r(125, 200), r(300, 350)),
+ ranges(r(100, 200), r(250, 350)).difference(
+ ranges(r(0, 125), r(225, 300))));
+
+ Assertions.assertEquals(ranges(r(125, 135), r(140, 160), r(175, 200)),
+ ranges(r(100, 200)).difference(
+ ranges(r(0, 125), r(135, 140), r(160, 170), r(170, 175))));
+ }
+
+ @Test
+ void addTest()
+ {
+ Assertions.assertEquals(ranges(r(0, 50), r(50, 100), r(100, 150), r(150, 200)),
+ ranges(r(0, 50), r(100, 150)).union(ranges(r(50, 100), r(150, 200))));
+
+ Assertions.assertThrows(IllegalArgumentException.class, () -> ranges(r(0, 50)).union(ranges(r(25, 75))));
+ }
+
+ @Test
+ void mergeTouchingTest()
+ {
+ Assertions.assertEquals(ranges(r(0, 400)), ranges(r(0, 100), r(100, 200), r(200, 300), r(300, 400)).mergeTouching());
+ Assertions.assertEquals(ranges(r(0, 200), r(300, 400)), ranges(r(0, 100), r(100, 200), r(300, 400)).mergeTouching());
+ Assertions.assertEquals(ranges(r(0, 100), r(200, 400)), ranges(r(0, 100), r(200, 300), r(300, 400)).mergeTouching());
+ }
+
+ @Test
+ void selectTest()
+ {
+ KeyRanges testRanges = ranges(r(0, 100), r(100, 200), r(200, 300), r(300, 400), r(400, 500));
+ Assertions.assertEquals(ranges(testRanges.get(1), testRanges.get(3)), testRanges.select(new int[]{1, 3}));
+ }
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index d9585a2..e35235b 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -13,10 +13,11 @@ import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import accord.coordinate.Timeout;
+import accord.local.CommandStore;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.MessageSink;
-import accord.messages.Timeout;
import accord.messages.Callback;
import accord.messages.Reply;
import accord.messages.Request;
@@ -254,23 +255,30 @@ public class Cluster implements Scheduler
{
Shards shards = topologyFactory.toShards(nodes);
Map<Id, Node> lookup = new HashMap<>();
- Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
- for (Id node : nodes)
- lookup.put(node, new Node(node, shards, shards.forNode(node), sinks.create(node, randomSupplier.get()),
- randomSupplier.get(), nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks));
-
- List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
- sinks.recurring(() ->
+ try
{
- Collections.shuffle(nodesList, randomSupplier.get());
- int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
- sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
- }, 5L, TimeUnit.SECONDS);
+ Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
+ for (Id node : nodes)
+ lookup.put(node, new Node(node, shards, sinks.create(node, randomSupplier.get()), randomSupplier.get(),
+ nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStore.Factory.SINGLE_THREAD));
+
+ List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
+ sinks.recurring(() ->
+ {
+ Collections.shuffle(nodesList, randomSupplier.get());
+ int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
+ sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
+ }, 5L, TimeUnit.SECONDS);
- Packet next;
- while ((next = in.get()) != null)
- sinks.add(next);
+ Packet next;
+ while ((next = in.get()) != null)
+ sinks.add(next);
- while (sinks.processPending());
+ while (sinks.processPending());
+ }
+ finally
+ {
+ lookup.values().forEach(Node::shutdown);
+ }
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index 208df48..c557016 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -4,6 +4,9 @@ import java.io.IOException;
import accord.api.Key;
import accord.api.KeyRange;
+import accord.topology.KeyRanges;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
import com.google.gson.TypeAdapter;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
@@ -16,6 +19,36 @@ public class MaelstromKey extends Datum<MaelstromKey> implements Key<MaelstromKe
{
super(start, end);
}
+
+ @Override
+ public KeyRange<MaelstromKey> subRange(MaelstromKey start, MaelstromKey end)
+ {
+ return new Range(start, end);
+ }
+
+ @Override
+ public KeyRanges split(int count)
+ {
+ Preconditions.checkArgument(start().kind == Kind.HASH);
+ Preconditions.checkArgument(end().kind == Kind.HASH);
+ long startHash = ((Hash) start().value).hash;
+ long endHash = end().value != null ? ((Hash) end().value).hash : Integer.MAX_VALUE;
+ long currentSize = endHash - startHash;
+ if (currentSize < count)
+ return new KeyRanges(new KeyRange[]{this});
+ long interval = currentSize / count;
+
+ long subEnd = 0;
+ KeyRange[] ranges = new KeyRange[count];
+ for (int i=0; i<count; i++)
+ {
+ long subStart = i > 0 ? subEnd : startHash;
+ subEnd = i < count - 1 ? subStart + interval : endHash;
+ ranges[i] = new Range(new MaelstromKey(Kind.HASH, new Hash(Ints.checkedCast(subStart))),
+ i < count - 1 ? new MaelstromKey(Kind.HASH, new Hash(Ints.checkedCast(subEnd))) : end());
+ }
+ return new KeyRanges(ranges);
+ }
}
public MaelstromKey(Kind kind, Object value)
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
index 7773ba8..142532e 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -1,6 +1,7 @@
package accord.maelstrom;
import accord.api.*;
+import accord.topology.KeyRanges;
import accord.txn.Keys;
import static java.lang.Math.max;
@@ -15,15 +16,20 @@ public class MaelstromRead implements Read
}
@Override
- public Data read(KeyRange range, Store store)
+ public Data read(KeyRanges ranges, Store store)
{
MaelstromStore s = (MaelstromStore)store;
MaelstromData result = new MaelstromData();
- int lowIdx = range.lowKeyIndex(keys);
- if (lowIdx < 0)
- return result;
- for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
- result.put(keys.get(i), s.get(keys.get(i)));
+ for (KeyRange range : ranges)
+ {
+ int lowIdx = range.lowKeyIndex(keys);
+ if (lowIdx < -keys.size())
+ return result;
+ if (lowIdx < 0)
+ continue;
+ for (int i = lowIdx, limit = range.higherKeyIndex(keys) ; i < limit ; ++i)
+ result.put(keys.get(i), s.get(keys.get(i)));
+ }
return result;
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index 31ec395..b2216aa 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -8,18 +8,22 @@ import accord.api.Key;
import accord.api.KeyRange;
import accord.api.Store;
import accord.api.Write;
+import accord.topology.KeyRanges;
import accord.txn.Timestamp;
import accord.utils.Timestamped;
public class MaelstromWrite extends TreeMap<Key, Value> implements Write
{
@Override
- public void apply(KeyRange range, Timestamp executeAt, Store store)
+ public void apply(KeyRanges ranges, Timestamp executeAt, Store store)
{
MaelstromStore s = (MaelstromStore) store;
- NavigableMap<Key, Value> selection = subMap(range.start(), range.startInclusive(),
- range.end(), range.endInclusive());
- for (Map.Entry<Key, Value> e : selection.entrySet())
- s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+ for (KeyRange range : ranges)
+ {
+ NavigableMap<Key, Value> selection = subMap(range.start(), range.startInclusive(),
+ range.end(), range.endInclusive());
+ for (Map.Entry<Key, Value> e : selection.entrySet())
+ s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+ }
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 19b1ee6..30e0e3a 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -13,10 +13,11 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import accord.coordinate.Timeout;
+import accord.local.CommandStore;
import accord.local.Node;
import accord.local.Node.Id;
import accord.api.Scheduler;
-import accord.messages.Timeout;
import accord.utils.ThreadPoolScheduler;
import accord.maelstrom.Packet.Type;
import accord.api.MessageSink;
@@ -137,43 +138,50 @@ public class Main
MaelstromInit init = (MaelstromInit) packet.body;
shards = topologyFactory.toShards(init.cluster);
sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err);
- on = new Node(init.self, shards, shards.forNode(init.self), sink, new Random(), System::currentTimeMillis, MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler);
+ on = new Node(init.self, shards, sink, new Random(), System::currentTimeMillis, MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStore.Factory.SINGLE_THREAD);
err.println("Initialized node " + init.self);
err.flush();
sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id));
}
- while (true)
+ try
{
- String line = in.get();
- if (line == null)
+ while (true)
{
- err.println("Received EOF; terminating");
- err.flush();
- scheduler.stop();
- err.println("Terminated");
+ String line = in.get();
+ if (line == null)
+ {
+ err.println("Received EOF; terminating");
+ err.flush();
+ scheduler.stop();
+ err.println("Terminated");
+ err.flush();
+ return;
+ }
+ err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
err.flush();
- return;
- }
- err.println("Received " + (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)) + " " + line);
- err.flush();
- Packet next = Packet.parse(line);
- switch (next.body.type)
- {
- case txn:
- on.receive((MaelstromRequest)next.body, next.src, next.body.msg_id);
- break;
- default:
- if (next.body.in_reply_to > Body.SENTINEL_MSG_ID)
- {
- Reply reply = (Reply)((Wrapper)next.body).body;
- CallbackInfo callback = reply.isFinal() ? sink.callbacks.remove(next.body.in_reply_to)
- : sink.callbacks.get(next.body.in_reply_to);
- if (callback != null)
- scheduler.now(() -> callback.callback.onSuccess(next.src, reply));
- }
- else on.receive((Request)((Wrapper)next.body).body, next.src, next.body.msg_id);
+ Packet next = Packet.parse(line);
+ switch (next.body.type)
+ {
+ case txn:
+ on.receive((MaelstromRequest)next.body, next.src, next.body.msg_id);
+ break;
+ default:
+ if (next.body.in_reply_to > Body.SENTINEL_MSG_ID)
+ {
+ Reply reply = (Reply)((Wrapper)next.body).body;
+ CallbackInfo callback = reply.isFinal() ? sink.callbacks.remove(next.body.in_reply_to)
+ : sink.callbacks.get(next.body.in_reply_to);
+ if (callback != null)
+ scheduler.now(() -> callback.callback.onSuccess(next.src, reply));
+ }
+ else on.receive((Request)((Wrapper)next.body).body, next.src, next.body.msg_id);
+ }
}
}
+ finally
+ {
+ on.shutdown();
+ }
}
public static void main(String[] args) throws IOException
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org