You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2021/10/29 11:29:42 UTC
[cassandra-accord] 01/02: initial commit
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 93533f5f701dfff084f7db08a0a55e64bb7f3651
Author: Benedict Elliott Smith <be...@apple.com>
AuthorDate: Sat Sep 4 23:39:44 2021 +0100
initial commit
---
.gitignore | 4 +
accord-core/build.gradle | 42 ++
accord-core/src/main/java/accord/api/Agent.java | 30 ++
accord-core/src/main/java/accord/api/Data.java | 13 +
accord-core/src/main/java/accord/api/Key.java | 8 +
.../src/main/java/accord/api/MessageSink.java | 13 +
accord-core/src/main/java/accord/api/Query.java | 13 +
accord-core/src/main/java/accord/api/Read.java | 11 +
accord-core/src/main/java/accord/api/Result.java | 10 +
.../src/main/java/accord/api/Scheduler.java | 18 +
accord-core/src/main/java/accord/api/Store.java | 9 +
accord-core/src/main/java/accord/api/Update.java | 11 +
accord-core/src/main/java/accord/api/Write.java | 13 +
.../main/java/accord/coordinate/AcceptPhase.java | 103 +++++
.../src/main/java/accord/coordinate/Agree.java | 172 +++++++
.../src/main/java/accord/coordinate/Agreed.java | 31 ++
.../main/java/accord/coordinate/Coordinate.java | 280 ++++++++++++
.../src/main/java/accord/coordinate/Execute.java | 158 +++++++
.../src/main/java/accord/coordinate/Preempted.java | 9 +
.../src/main/java/accord/coordinate/Recover.java | 205 +++++++++
.../src/main/java/accord/coordinate/Timeout.java | 8 +
.../src/main/java/accord/local/Command.java | 363 +++++++++++++++
.../src/main/java/accord/local/CommandsForKey.java | 46 ++
.../src/main/java/accord/local/Instance.java | 58 +++
.../src/main/java/accord/local/Listener.java | 6 +
.../src/main/java/accord/local/Listeners.java | 7 +
accord-core/src/main/java/accord/local/Node.java | 278 ++++++++++++
accord-core/src/main/java/accord/local/Status.java | 11 +
.../src/main/java/accord/messages/Accept.java | 111 +++++
.../src/main/java/accord/messages/Apply.java | 50 +++
.../main/java/accord/messages/BeginRecovery.java | 227 ++++++++++
.../src/main/java/accord/messages/Callback.java | 9 +
.../src/main/java/accord/messages/Commit.java | 41 ++
.../src/main/java/accord/messages/Message.java | 5 +
.../src/main/java/accord/messages/PreAccept.java | 137 ++++++
.../src/main/java/accord/messages/ReadData.java | 233 ++++++++++
.../src/main/java/accord/messages/Reply.java | 6 +
.../src/main/java/accord/messages/Request.java | 9 +
.../main/java/accord/messages/WaitOnCommit.java | 102 +++++
.../src/main/java/accord/topology/Shard.java | 57 +++
.../src/main/java/accord/topology/Shards.java | 22 +
.../src/main/java/accord/topology/Topology.java | 245 ++++++++++
accord-core/src/main/java/accord/txn/Ballot.java | 18 +
.../src/main/java/accord/txn/Dependencies.java | 107 +++++
accord-core/src/main/java/accord/txn/Keys.java | 160 +++++++
.../src/main/java/accord/txn/Timestamp.java | 65 +++
accord-core/src/main/java/accord/txn/Txn.java | 164 +++++++
accord-core/src/main/java/accord/txn/TxnId.java | 16 +
accord-core/src/main/java/accord/txn/Writes.java | 34 ++
.../accord/utils/DeterministicIdentitySet.java | 95 ++++
.../main/java/accord/utils/IndexedConsumer.java | 6 +
.../src/main/java/accord/utils/KeyRange.java | 20 +
.../java/accord/utils/ThreadPoolScheduler.java | 83 ++++
.../src/main/java/accord/utils/Timestamped.java | 20 +
.../src/main/java/accord/utils/WrapAroundList.java | 32 ++
.../src/main/java/accord/utils/WrapAroundSet.java | 40 ++
.../src/test/java/accord/NetworkFilter.java | 74 +++
accord-core/src/test/java/accord/Utils.java | 48 ++
.../src/test/java/accord/burn/BurnTest.java | 274 ++++++++++++
.../java/accord/burn/ReconcilingOutputStreams.java | 73 +++
.../java/accord/coordinate/CoordinateTest.java | 45 ++
.../test/java/accord/coordinate/RecoverTest.java | 113 +++++
.../src/test/java/accord/impl/IntHashKey.java | 87 ++++
accord-core/src/test/java/accord/impl/IntKey.java | 66 +++
.../src/test/java/accord/impl/TestAgent.java | 24 +
.../src/test/java/accord/impl/TopologyFactory.java | 58 +++
.../src/test/java/accord/impl/basic/Cluster.java | 165 +++++++
.../src/test/java/accord/impl/basic/NodeSink.java | 60 +++
.../src/test/java/accord/impl/basic/Packet.java | 45 ++
.../src/test/java/accord/impl/basic/Pending.java | 5 +
.../test/java/accord/impl/basic/PendingQueue.java | 11 +
.../java/accord/impl/basic/PendingRunnable.java | 5 +
.../java/accord/impl/basic/RandomDelayQueue.java | 85 ++++
.../impl/basic/RecurringPendingRunnable.java | 38 ++
.../java/accord/impl/basic/UniformRandomQueue.java | 80 ++++
.../src/test/java/accord/impl/list/ListAgent.java | 28 ++
.../src/test/java/accord/impl/list/ListData.java | 16 +
.../src/test/java/accord/impl/list/ListQuery.java | 35 ++
.../src/test/java/accord/impl/list/ListRead.java | 33 ++
.../test/java/accord/impl/list/ListRequest.java | 31 ++
.../src/test/java/accord/impl/list/ListResult.java | 40 ++
.../src/test/java/accord/impl/list/ListStore.java | 20 +
.../src/test/java/accord/impl/list/ListUpdate.java | 29 ++
.../src/test/java/accord/impl/list/ListWrite.java | 21 +
.../test/java/accord/impl/mock/MockCluster.java | 269 +++++++++++
.../src/test/java/accord/impl/mock/MockStore.java | 26 ++
.../src/test/java/accord/impl/mock/Network.java | 27 ++
.../accord/impl/mock/RecordingMessageSink.java | 69 +++
.../java/accord/impl/mock/SimpleMessageSink.java | 37 ++
.../test/java/accord/messages/PreAcceptTest.java | 120 +++++
.../src/test/java/accord/topology/ShardTest.java | 108 +++++
.../test/java/accord/verify/HistoryViolation.java | 12 +
.../accord/verify/LinearizabilityVerifier.java | 334 ++++++++++++++
.../accord/verify/SerializabilityVerifier.java | 496 +++++++++++++++++++++
.../accord/verify/SerializabilityVerifierTest.java | 154 +++++++
accord-maelstrom/build.gradle | 49 ++
.../src/main/java/accord/maelstrom/Body.java | 159 +++++++
.../src/main/java/accord/maelstrom/Cluster.java | 276 ++++++++++++
.../src/main/java/accord/maelstrom/Datum.java | 277 ++++++++++++
.../src/main/java/accord/maelstrom/Error.java | 30 ++
.../src/main/java/accord/maelstrom/Json.java | 482 ++++++++++++++++++++
.../main/java/accord/maelstrom/MaelstromAgent.java | 28 ++
.../main/java/accord/maelstrom/MaelstromData.java | 16 +
.../main/java/accord/maelstrom/MaelstromInit.java | 33 ++
.../main/java/accord/maelstrom/MaelstromKey.java | 57 +++
.../main/java/accord/maelstrom/MaelstromQuery.java | 36 ++
.../main/java/accord/maelstrom/MaelstromRead.java | 27 ++
.../main/java/accord/maelstrom/MaelstromReply.java | 100 +++++
.../java/accord/maelstrom/MaelstromRequest.java | 119 +++++
.../java/accord/maelstrom/MaelstromResult.java | 147 ++++++
.../main/java/accord/maelstrom/MaelstromStore.java | 25 ++
.../java/accord/maelstrom/MaelstromUpdate.java | 21 +
.../main/java/accord/maelstrom/MaelstromWrite.java | 21 +
.../src/main/java/accord/maelstrom/Main.java | 183 ++++++++
.../src/main/java/accord/maelstrom/Packet.java | 141 ++++++
.../java/accord/maelstrom/TopologyFactory.java | 66 +++
.../src/main/java/accord/maelstrom/Value.java | 115 +++++
.../src/main/java/accord/maelstrom/Wrapper.java | 25 ++
.../src/test/java/accord/maelstrom/Runner.java | 299 +++++++++++++
.../java/accord/maelstrom/SimpleRandomTest.java | 65 +++
build.gradle | 22 +
gradle/wrapper/gradle-wrapper.jar | Bin 0 -> 59203 bytes
gradle/wrapper/gradle-wrapper.properties | 5 +
gradlew | 185 ++++++++
gradlew.bat | 89 ++++
settings.gradle | 3 +
126 files changed, 10406 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2657cfd
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+.idea/
+.gradle/
+accord-core/build/
+accord-maelstrom/build/
diff --git a/accord-core/build.gradle b/accord-core/build.gradle
new file mode 100644
index 0000000..1d4802e
--- /dev/null
+++ b/accord-core/build.gradle
@@ -0,0 +1,42 @@
+plugins {
+ id 'java'
+}
+
+group 'accord'
+version '1.0-SNAPSHOT'
+
+repositories {
+ mavenCentral()
+}
+
+compileJava {
+ sourceCompatibility = JavaVersion.VERSION_11
+}
+
+configurations {
+ testClasses {
+ extendsFrom(testRuntime)
+ }
+}
+
+task testJar(type: Jar) {
+ classifier = 'test'
+ from sourceSets.test.output
+}
+
+// add the jar generated by the testJar task to the testClasses dependency
+artifacts {
+ testClasses testJar
+}
+
+dependencies {
+ implementation 'com.google.guava:guava:30.1.1-jre'
+ implementation 'com.google.code.gson:gson:2.8.7'
+ implementation 'ch.qos.logback:logback-classic:1.2.3'
+ testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+ testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
+}
+
+test {
+ useJUnitPlatform()
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/api/Agent.java b/accord-core/src/main/java/accord/api/Agent.java
new file mode 100644
index 0000000..62d6e7b
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -0,0 +1,30 @@
+package accord.api;
+
+import accord.local.Node;
+import accord.local.Command;
+import accord.txn.Timestamp;
+
+/**
+ * Facility for augmenting node behaviour at specific points
+ */
+public interface Agent
+{
+
+ /**
+ * For use by implementations to decide what to do about successfully recovered transactions.
+ * Specifically intended to define if and how they should inform clients of the result.
+ * e.g. in Maelstrom we send the full result directly, in other impls we may simply acknowledge success via the coordinator
+ *
+ * Note: may be invoked multiple times in different places
+ */
+ void onRecover(Node node, Result success, Throwable fail);
+
+ /**
+ * For use by implementations to decide what to do about timestamp inconsistency, i.e. two different timestamps
+ * committed for the same transaction. This is a protocol consistency violation, potentially leading to non-linearizable
+ * histories. In test cases this is used to fail the transaction, whereas in real systems this likely will be used for
+ * reporting the violation, as it is no more correct at this point to refuse the operation than it is to complete it.
+ */
+ void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next);
+
+}
diff --git a/accord-core/src/main/java/accord/api/Data.java b/accord-core/src/main/java/accord/api/Data.java
new file mode 100644
index 0000000..382712d
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Data.java
@@ -0,0 +1,13 @@
+package accord.api;
+
+/**
+ * The result of some (potentially partial) {@link Read} from some {@link Store}
+ */
+public interface Data
+{
+ /**
+ * Combine the contents of the parameter with this object and return the resultant object.
+ * This method may modify the current object and return itself.
+ */
+ Data merge(Data data);
+}
diff --git a/accord-core/src/main/java/accord/api/Key.java b/accord-core/src/main/java/accord/api/Key.java
new file mode 100644
index 0000000..2c22972
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Key.java
@@ -0,0 +1,8 @@
+package accord.api;
+
+/**
+ * A routing key for determining which shards are involved in a transaction
+ */
+public interface Key<K extends Key<K>> extends Comparable<K>
+{
+}
diff --git a/accord-core/src/main/java/accord/api/MessageSink.java b/accord-core/src/main/java/accord/api/MessageSink.java
new file mode 100644
index 0000000..4fb3da9
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/MessageSink.java
@@ -0,0 +1,13 @@
+package accord.api;
+
+import accord.local.Node.Id;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+
+public interface MessageSink
+{
+ void send(Id to, Request request);
+ void send(Id to, Request request, Callback callback);
+ void reply(Id replyingToNode, long replyingToMessage, Reply reply);
+}
diff --git a/accord-core/src/main/java/accord/api/Query.java b/accord-core/src/main/java/accord/api/Query.java
new file mode 100644
index 0000000..da68d87
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Query.java
@@ -0,0 +1,13 @@
+package accord.api;
+
+/**
+ * The computational/transformation part of a client query
+ */
+public interface Query
+{
+ /**
+ * Perform some transformation on the complete {@link Data} result of a {@link Read}
+ * from some {@link Store}, to produce a {@link Result} to return to the client.
+ */
+ Result compute(Data data);
+}
diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java
new file mode 100644
index 0000000..ff4fd2f
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -0,0 +1,11 @@
+package accord.api;
+
+/**
+ * A read to be performed on potentially multiple shards, the inputs of which may be fed to a {@link Query}
+ *
+ * TODO: support splitting the read into per-shard portions
+ */
+public interface Read
+{
+ Data read(Key start, Key end, Store store);
+}
diff --git a/accord-core/src/main/java/accord/api/Result.java b/accord-core/src/main/java/accord/api/Result.java
new file mode 100644
index 0000000..f4decfd
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Result.java
@@ -0,0 +1,10 @@
+package accord.api;
+
+/**
+ * A result to be returned to a client, or be stored in a node's command state.
+ *
+ * TODO: support minimizing the result for storage in a node's command state (e.g. to only retain success/failure)
+ */
+public interface Result
+{
+}
diff --git a/accord-core/src/main/java/accord/api/Scheduler.java b/accord-core/src/main/java/accord/api/Scheduler.java
new file mode 100644
index 0000000..8af6bcf
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Scheduler.java
@@ -0,0 +1,18 @@
+package accord.api;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A simple task execution interface
+ */
+public interface Scheduler
+{
+ interface Scheduled
+ {
+ void cancel();
+ }
+
+ Scheduled recurring(Runnable run, long delay, TimeUnit units);
+ Scheduled once(Runnable run, long delay, TimeUnit units);
+ void now(Runnable run);
+}
diff --git a/accord-core/src/main/java/accord/api/Store.java b/accord-core/src/main/java/accord/api/Store.java
new file mode 100644
index 0000000..898aa67
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Store.java
@@ -0,0 +1,9 @@
+package accord.api;
+
+/**
+ * A marker interface for a shard instance's storage, that is passed to
+ * {@link Read} and {@link Write} objects for execution
+ */
+public interface Store
+{
+}
diff --git a/accord-core/src/main/java/accord/api/Update.java b/accord-core/src/main/java/accord/api/Update.java
new file mode 100644
index 0000000..d07c087
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -0,0 +1,11 @@
+package accord.api;
+
+/**
+ * A client-defined update operation (the write equivalent of a query).
+ * Takes as input the data returned by {@code Read}, and returns a {@code Write}
+ * representing new information to distributed to each shard's stores.
+ */
+public interface Update
+{
+ Write apply(Data data);
+}
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
new file mode 100644
index 0000000..4404ee6
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -0,0 +1,13 @@
+package accord.api;
+
+import accord.txn.Timestamp;
+
+/**
+ * A collection of data to write to one or more stores
+ *
+ * TODO: support splitting so as to minimise duplication of data across shards
+ */
+public interface Write
+{
+ void apply(Key start, Key end, Timestamp executeAt, Store store);
+}
diff --git a/accord-core/src/main/java/accord/coordinate/AcceptPhase.java b/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
new file mode 100644
index 0000000..eb79af7
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/AcceptPhase.java
@@ -0,0 +1,103 @@
+package accord.coordinate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import accord.messages.Preempted;
+import accord.txn.Ballot;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.topology.Shards;
+import accord.txn.Timestamp;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.messages.Accept;
+import accord.messages.Accept.AcceptOk;
+import accord.messages.Accept.AcceptReply;
+
+class AcceptPhase extends CompletableFuture<Agreed>
+{
+ final Node node;
+ final Ballot ballot;
+ final TxnId txnId;
+ final Txn txn;
+ final Shards shards;
+
+ private List<AcceptOk> acceptOks;
+ private Timestamp proposed;
+ private int[] accepts;
+ private int[] failures;
+ private int acceptQuorums;
+
+ AcceptPhase(Node node, Ballot ballot, TxnId txnId, Txn txn, Shards shards)
+ {
+ this.node = node;
+ this.ballot = ballot;
+ this.txnId = txnId;
+ this.txn = txn;
+ this.shards = shards;
+ }
+
+ protected void startAccept(Timestamp executeAt, Dependencies deps)
+ {
+ this.proposed = executeAt;
+ this.acceptOks = new ArrayList<>();
+ this.accepts = new int[shards.size()];
+ this.failures = new int[shards.size()];
+ node.send(shards, new Accept(ballot, txnId, txn, executeAt, deps), new Callback<AcceptReply>()
+ {
+ @Override
+ public void onSuccess(Id from, AcceptReply response)
+ {
+ onAccept(from, response);
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable throwable)
+ {
+ shards.forEachOn(from, (i, shard) -> {
+ if (++failures[i] >= shard.slowPathQuorumSize)
+ completeExceptionally(new accord.messages.Timeout());
+ });
+ }
+ });
+ }
+
+ private void onAccept(Id from, AcceptReply reply)
+ {
+ if (isDone())
+ return;
+
+ if (!reply.isOK())
+ {
+ completeExceptionally(new Preempted());
+ return;
+ }
+
+ AcceptOk ok = (AcceptOk) reply;
+ acceptOks.add(ok);
+ shards.forEachOn(from, txn.keys(), (i, shard) -> {
+ if (++accepts[i] == shard.slowPathQuorumSize)
+ ++acceptQuorums;
+ });
+
+ if (acceptQuorums == shards.size())
+ onAccepted();
+ }
+
+ private void onAccepted()
+ {
+ Dependencies deps = new Dependencies();
+ for (AcceptOk acceptOk : acceptOks)
+ deps.addAll(acceptOk.deps);
+ agreed(proposed, deps);
+ }
+
+ protected void agreed(Timestamp executeAt, Dependencies deps)
+ {
+ complete(new Agreed(txnId, txn, executeAt, deps, shards, null, null));
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Agree.java b/accord-core/src/main/java/accord/coordinate/Agree.java
new file mode 100644
index 0000000..0d726b8
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Agree.java
@@ -0,0 +1,172 @@
+package accord.coordinate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+
+import accord.messages.Preempted;
+import accord.messages.Timeout;
+import accord.txn.Ballot;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.txn.Dependencies;
+import accord.txn.Keys;
+import accord.local.Node.Id;
+import accord.txn.Timestamp;
+import accord.messages.PreAccept;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.messages.PreAccept.PreAcceptReply;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ */
+class Agree extends AcceptPhase implements Callback<PreAcceptReply>
+{
+ final Keys keys;
+
+ public enum PreacceptOutcome { COMMIT, ACCEPT }
+
+ // TODO: handle reconfigurations
+ private int[] preAccepts;
+ private int[] fastPathPreAccepts;
+ private int[] failures;
+ private int[] responsesOutstanding;
+
+ private int preAccepted;
+ private int fastPathAccepted;
+ private int noOutstandingResponses;
+ private PreacceptOutcome preacceptOutcome;
+ private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
+
+ // TODO: hybrid fast path? or at least short-circuit accept if we gain a fast-path quorum _and_ proposed one by accept
+ boolean permitHybridFastPath;
+
+ private Agree(Node node, TxnId txnId, Txn txn)
+ {
+ super(node, Ballot.ZERO, txnId, txn, node.cluster().forKeys(txn.keys()));
+ this.keys = txn.keys();
+ this.failures = new int[shards.size()];
+ this.preAccepts = new int[shards.size()];
+ this.fastPathPreAccepts = new int[shards.size()];
+ this.responsesOutstanding = new int[shards.size()];
+ shards.forEach((i, shard) -> {
+ this.responsesOutstanding[i] = shard.nodes.size();
+ });
+
+
+ node.send(shards, new PreAccept(txnId, txn), this);
+ }
+
+ private void messageReceived(int shard)
+ {
+ if (--responsesOutstanding[shard] == 0)
+ noOutstandingResponses++;
+ }
+
+ @Override
+ public void onSuccess(Id from, PreAcceptReply response)
+ {
+ onPreAccept(from, response);
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable throwable)
+ {
+ if (isDone() || isPreAccepted())
+ return;
+
+ shards.forEachOn(from, (i, shard) -> {
+ messageReceived(i);
+ if (++failures[i] >= shard.slowPathQuorumSize)
+ completeExceptionally(new Timeout());
+ });
+
+ // if no other responses are expected and the slow quorum has been satisfied, proceed
+ if (shouldSlowPathAccept())
+ onPreAccepted();
+ }
+
+ private synchronized void onPreAccept(Id from, PreAcceptReply receive)
+ {
+ if (isDone() || isPreAccepted())
+ return;
+
+ if (!receive.isOK())
+ {
+ // we've been preempted by a recovery coordinator; defer to it, and wait to hear any result
+ completeExceptionally(new Preempted());
+ return;
+ }
+
+ PreAcceptOk ok = (PreAcceptOk) receive;
+ preAcceptOks.add(ok);
+
+ boolean fastPath = ok.witnessedAt.compareTo(txnId) == 0;
+ shards.forEachOn(from, (i, shard) -> {
+ messageReceived(i);
+ if (fastPath && shard.fastPathElectorate.contains(from) && ++fastPathPreAccepts[i] == shard.fastPathQuorumSize)
+ ++fastPathAccepted;
+
+ if (++preAccepts[i] == shard.slowPathQuorumSize)
+ ++preAccepted;
+ });
+
+ if (isFastPathAccepted() || shouldSlowPathAccept())
+ onPreAccepted();
+ }
+
+ private void onPreAccepted()
+ {
+ if (isFastPathAccepted())
+ {
+ preacceptOutcome = PreacceptOutcome.COMMIT;
+ Dependencies deps = new Dependencies();
+ for (PreAcceptOk preAcceptOk : preAcceptOks)
+ {
+ if (preAcceptOk.witnessedAt.equals(txnId))
+ deps.addAll(preAcceptOk.deps);
+ }
+ agreed(txnId, deps);
+ }
+ else
+ {
+ preacceptOutcome = PreacceptOutcome.ACCEPT;
+ Timestamp executeAt = Timestamp.NONE;
+ Dependencies deps = new Dependencies();
+ for (PreAcceptOk preAcceptOk : preAcceptOks)
+ {
+ deps.addAll(preAcceptOk.deps);
+ executeAt = Timestamp.max(executeAt, preAcceptOk.witnessedAt);
+ }
+
+ // TODO: perhaps don't submit Accept immediately if we almost have enough for fast-path,
+ // but by sending accept we rule out hybrid fast-path
+ permitHybridFastPath = executeAt.compareTo(txnId) == 0;
+
+ startAccept(executeAt, deps);
+ }
+ }
+
+ private boolean isFastPathAccepted()
+ {
+ return fastPathAccepted == shards.size();
+ }
+
+ private boolean shouldSlowPathAccept()
+ {
+ return noOutstandingResponses == shards.size() && preAccepted == shards.size();
+ }
+
+ private boolean isPreAccepted()
+ {
+ return preacceptOutcome != null;
+ }
+
+ static CompletionStage<Agreed> agree(Node node, TxnId txnId, Txn txn)
+ {
+ return new Agree(node, txnId, txn);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Agreed.java b/accord-core/src/main/java/accord/coordinate/Agreed.java
new file mode 100644
index 0000000..7121d4f
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Agreed.java
@@ -0,0 +1,31 @@
+package accord.coordinate;
+
+import accord.api.Result;
+import accord.txn.Writes;
+import accord.topology.Shards;
+import accord.txn.Timestamp;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+class Agreed
+{
+ public final TxnId txnId;
+ public final Txn txn;
+ public final Timestamp executeAt;
+ public final Dependencies deps;
+ public final Shards shards;
+ public final Writes applied;
+ public final Result result;
+
+ public Agreed(TxnId txnId, Txn txn, Timestamp executeAt, Dependencies deps, Shards shards, Writes applied, Result result)
+ {
+ this.txnId = txnId;
+ this.txn = txn;
+ this.executeAt = executeAt;
+ this.deps = deps;
+ this.shards = shards;
+ this.applied = applied;
+ this.result = result;
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java b/accord-core/src/main/java/accord/coordinate/Coordinate.java
new file mode 100644
index 0000000..0cd117b
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java
@@ -0,0 +1,280 @@
+package accord.coordinate;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import accord.local.Node;
+import accord.api.Result;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.txn.Ballot;
+
+public class Coordinate
+{
+ private static CompletionStage<Result> andThenExecute(Node node, CompletionStage<Agreed> agree)
+ {
+ DebugCompletionStage<Result> result = new DebugCompletionStage<>(agree);
+ result.wrapped = agree.thenCompose(agreed -> {
+ CompletionStage<Result> execute = Execute.execute(node, agreed);
+ result.debug2 = execute;
+ return execute;
+ });
+ return result;
+ }
+
+ public static CompletionStage<Result> execute(Node node, TxnId txnId, Txn txn)
+ {
+ return andThenExecute(node, Agree.agree(node, txnId, txn));
+ }
+
+ public static CompletionStage<Result> recover(Node node, TxnId txnId, Txn txn)
+ {
+ return andThenExecute(node, new Recover(node, new Ballot(node.uniqueNow()), txnId, txn));
+ }
+
+ private static class DebugCompletionStage<T> implements CompletionStage<T>
+ {
+ final Object debug1;
+ Object debug2;
+ CompletionStage<T> wrapped;
+
+ @Override
+ public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn)
+ {
+ return wrapped.thenApply(fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn)
+ {
+ return wrapped.thenApplyAsync(fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
+ {
+ return wrapped.thenApplyAsync(fn, executor);
+ }
+
+ @Override
+ public CompletionStage<Void> thenAccept(Consumer<? super T> action)
+ {
+ return wrapped.thenAccept(action);
+ }
+
+ @Override
+ public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action)
+ {
+ return wrapped.thenAcceptAsync(action);
+ }
+
+ @Override
+ public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
+ {
+ return wrapped.thenAcceptAsync(action, executor);
+ }
+
+ @Override
+ public CompletionStage<Void> thenRun(Runnable action)
+ {
+ return wrapped.thenRun(action);
+ }
+
+ @Override
+ public CompletionStage<Void> thenRunAsync(Runnable action)
+ {
+ return wrapped.thenRunAsync(action);
+ }
+
+ @Override
+ public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor)
+ {
+ return wrapped.thenRunAsync(action, executor);
+ }
+
+ @Override
+ public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
+ {
+ return wrapped.thenCombine(other, fn);
+ }
+
+ @Override
+ public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
+ {
+ return wrapped.thenCombineAsync(other, fn);
+ }
+
+ @Override
+ public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)
+ {
+ return wrapped.thenCombineAsync(other, fn, executor);
+ }
+
+ @Override
+ public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
+ {
+ return wrapped.thenAcceptBoth(other, action);
+ }
+
+ @Override
+ public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
+ {
+ return wrapped.thenAcceptBothAsync(other, action);
+ }
+
+ @Override
+ public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
+ {
+ return wrapped.thenAcceptBothAsync(other, action, executor);
+ }
+
+ @Override
+ public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
+ {
+ return wrapped.runAfterBoth(other, action);
+ }
+
+ @Override
+ public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
+ {
+ return wrapped.runAfterBothAsync(other, action);
+ }
+
+ @Override
+ public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
+ {
+ return wrapped.runAfterBothAsync(other, action, executor);
+ }
+
+ @Override
+ public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
+ {
+ return wrapped.applyToEither(other, fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn)
+ {
+ return wrapped.applyToEitherAsync(other, fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
+ {
+ return wrapped.applyToEitherAsync(other, fn, executor);
+ }
+
+ @Override
+ public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
+ {
+ return wrapped.acceptEither(other, action);
+ }
+
+ @Override
+ public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
+ {
+ return wrapped.acceptEitherAsync(other, action);
+ }
+
+ @Override
+ public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
+ {
+ return wrapped.acceptEitherAsync(other, action, executor);
+ }
+
+ @Override
+ public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action)
+ {
+ return wrapped.runAfterEither(other, action);
+ }
+
+ @Override
+ public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
+ {
+ return wrapped.runAfterEitherAsync(other, action);
+ }
+
+ @Override
+ public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
+ {
+ return wrapped.runAfterEitherAsync(other, action, executor);
+ }
+
+ @Override
+ public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
+ {
+ return wrapped.thenCompose(fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
+ {
+ return wrapped.thenComposeAsync(fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
+ {
+ return wrapped.thenComposeAsync(fn, executor);
+ }
+
+ @Override
+ public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
+ {
+ return wrapped.handle(fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
+ {
+ return wrapped.handleAsync(fn);
+ }
+
+ @Override
+ public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
+ {
+ return wrapped.handleAsync(fn, executor);
+ }
+
+ @Override
+ public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
+ {
+ return wrapped.whenComplete(action);
+ }
+
+ @Override
+ public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
+ {
+ return wrapped.whenCompleteAsync(action);
+ }
+
+ @Override
+ public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
+ {
+ return wrapped.whenCompleteAsync(action, executor);
+ }
+
+ @Override
+ public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn)
+ {
+ return wrapped.exceptionally(fn);
+ }
+
+ @Override
+ public CompletableFuture<T> toCompletableFuture()
+ {
+ return wrapped.toCompletableFuture();
+ }
+
+ private DebugCompletionStage(Object debug1)
+ {
+ this.debug1 = debug1;
+ }
+ }
+
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Execute.java b/accord-core/src/main/java/accord/coordinate/Execute.java
new file mode 100644
index 0000000..502e2e0
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Execute.java
@@ -0,0 +1,158 @@
+package accord.coordinate;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+import accord.api.Data;
+import accord.messages.Preempted;
+import accord.api.Result;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.txn.Dependencies;
+import accord.messages.Apply;
+import accord.messages.ReadData.ReadReply;
+import accord.messages.ReadData.ReadWaiting;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import accord.local.Node.Id;
+import accord.txn.Timestamp;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.txn.Keys;
+import accord.messages.Commit;
+import accord.messages.ReadData;
+import accord.messages.ReadData.ReadOk;
+
+class Execute extends CompletableFuture<Result> implements Callback<ReadReply>
+{
+ final Node node;
+ final TxnId txnId;
+ final Txn txn;
+ final Timestamp executeAt;
+ final Shards shards;
+ final Keys keys;
+ final Dependencies deps;
+ final int[] attempts;
+ final int[] inFlight;
+ final boolean[] hasData;
+ private Data data;
+ final int replicaIndex;
+ int count = 0;
+
+ private Execute(Node node, Agreed agreed)
+ {
+ this.node = node;
+ this.txnId = agreed.txnId;
+ this.txn = agreed.txn;
+ this.keys = txn.keys();
+ this.deps = agreed.deps;
+ this.executeAt = agreed.executeAt;
+ this.shards = agreed.shards;
+ this.attempts = new int[shards.size()];
+ this.inFlight = new int[shards.size()];
+ this.hasData = new boolean[shards.size()];
+ this.replicaIndex = node.random().nextInt(shards.get(0).nodes.size());
+
+ // TODO: perhaps compose these different behaviours differently?
+ if (agreed.applied != null)
+ {
+ Apply send = new Apply(txnId, txn, executeAt, agreed.deps, agreed.applied, agreed.result);
+ node.send(shards, send);
+ complete(agreed.result);
+ }
+ else
+ {
+ // TODO: we're sending duplicate commits
+ shards.forEach((i, shard) -> {
+ for (int n = 0 ; n < shard.nodes.size() ; ++n)
+ {
+ Id to = shard.nodes.get(n);
+ // TODO: Topology needs concept of locality/distance
+ boolean read = n == replicaIndex % shard.nodes.size();
+ Commit send = new Commit(txnId, txn, executeAt, agreed.deps, read);
+ if (read)
+ {
+ node.send(to, send, this);
+ shards.forEachOn(to, (j, s) -> ++inFlight[j]);
+ }
+ else
+ {
+ node.send(to, send);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void onSuccess(Id from, ReadReply reply)
+ {
+ if (isDone())
+ return;
+
+ if (!reply.isFinal())
+ {
+ ReadWaiting waiting = (ReadWaiting) reply;
+ // TODO first see if we can collect newer information (from ourselves or others), and if so send it
+ // otherwise, try to complete the transaction
+ node.recover(waiting.txnId, waiting.txn);
+ return;
+ }
+
+ if (!reply.isOK())
+ {
+ completeExceptionally(new Preempted());
+ return;
+ }
+
+ data = data == null ? ((ReadOk) reply).data
+ : data.merge(((ReadOk) reply).data);
+
+ shards.forEachOn(from, (i, shard) -> {
+ --inFlight[i];
+ if (!hasData[i]) ++count;
+ hasData[i] = true;
+ });
+
+ if (count == shards.size())
+ {
+ Result result = txn.result(data);
+ node.send(shards, new Apply(txnId, txn, executeAt, deps, txn.execute(executeAt, data), result));
+ complete(result);
+ }
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable throwable)
+ {
+ // try again with another random node
+ // TODO: API hooks
+ if (!(throwable instanceof accord.messages.Timeout))
+ throwable.printStackTrace();
+
+ shards.forEachOn(from, (i, shard) -> {
+ // TODO: less naive selection of replica to consult
+ if (--inFlight[i] == 0 && !hasData[i])
+ read(i);
+ if (inFlight[i] == 0 && !hasData[i])
+ completeExceptionally(throwable);
+ });
+ }
+
+ private void read(int shardIndex)
+ {
+ Shard shard = shards.get(shardIndex);
+ if (attempts[shardIndex] == shard.nodes.size())
+ return;
+
+ int nodeIndex = (replicaIndex + attempts[shardIndex]++) % shard.nodes.size();
+ Node.Id to = shard.nodes.get(nodeIndex);
+ shards.forEachOn(to, (i, s) -> ++inFlight[i]);
+ node.send(to, new ReadData(txnId, txn), this);
+ }
+
+ static CompletionStage<Result> execute(Node instance, Agreed agreed)
+ {
+ return new Execute(instance, agreed);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Preempted.java b/accord-core/src/main/java/accord/coordinate/Preempted.java
new file mode 100644
index 0000000..ad96801
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Preempted.java
@@ -0,0 +1,9 @@
+package accord.messages;
+
+/**
+ * Thrown when a coordinator is preempted by another recovery
+ * coordinator intending to complete the transaction
+ */
+public class Preempted extends Throwable
+{
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java
new file mode 100644
index 0000000..816c5a3
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -0,0 +1,205 @@
+package accord.coordinate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import accord.messages.Preempted;
+import accord.txn.Ballot;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.topology.Shards;
+import accord.txn.Timestamp;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.messages.BeginRecovery;
+import accord.messages.BeginRecovery.RecoverOk;
+import accord.messages.BeginRecovery.RecoverReply;
+import accord.messages.WaitOnCommit;
+import accord.messages.WaitOnCommit.WaitOnCommitOk;
+
+import static accord.local.Status.Accepted;
+
+// TODO: rename to Recover (verb); rename Recover message to not clash
+class Recover extends AcceptPhase implements Callback<RecoverReply>
+{
+ class RetryAfterCommits implements Callback<WaitOnCommitOk>
+ {
+ final int[] failures;
+ final int[] commits;
+ int commitQuorums;
+
+ RetryAfterCommits(Dependencies waitOn)
+ {
+ commits = new int[waitOn.size()];
+ failures = new int[waitOn.size()];
+ for (Map.Entry<TxnId, Txn> e : waitOn)
+ node.send(shards, new WaitOnCommit(e.getKey(), e.getValue().keys()), this);
+ }
+
+ @Override
+ public void onSuccess(Id from, WaitOnCommitOk response)
+ {
+ synchronized (Recover.this)
+ {
+ if (isDone() || commitQuorums == commits.length)
+ return;
+
+ shards.forEachOn(from, (i, shard) -> {
+ if (++commits[i] == shard.slowPathQuorumSize)
+ ++commitQuorums;
+ });
+
+ if (commitQuorums == commits.length)
+ {
+ new Recover(node, ballot, txnId, txn, shards).handle((success, failure) -> {
+ if (success != null) complete(success);
+ else completeExceptionally(failure);
+ return null;
+ });
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable throwable)
+ {
+ synchronized (Recover.this)
+ {
+ if (isDone())
+ return;
+
+ shards.forEachOn(from, (i, shard) -> {
+ if (++failures[i] >= shard.slowPathQuorumSize)
+ completeExceptionally(new accord.messages.Timeout());
+ });
+ }
+ }
+ }
+
+ final List<RecoverOk> recoverOks = new ArrayList<>();
+ int[] failure;
+ int[] recovery;
+ int[] recoveryWithFastPath;
+ int recoveryWithFastPathQuorums = 0;
+ int recoveryQuorums = 0;
+
+ public Recover(Node node, Ballot ballot, TxnId txnId, Txn txn)
+ {
+ this(node, ballot, txnId, txn, node.cluster().forKeys(txn.keys()));
+ }
+
+ private Recover(Node node, Ballot ballot, TxnId txnId, Txn txn, Shards shards)
+ {
+ super(node, ballot, txnId, txn, shards);
+ this.failure = new int[this.shards.size()];
+ this.recovery = new int[this.shards.size()];
+ this.recoveryWithFastPath = new int[this.shards.size()];
+ node.send(this.shards, new BeginRecovery(txnId, txn, ballot), this);
+ }
+
+ @Override
+ public synchronized void onSuccess(Id from, RecoverReply response)
+ {
+ if (isDone() || recoveryQuorums == shards.size())
+ return;
+
+ if (!response.isOK())
+ {
+ completeExceptionally(new Preempted());
+ return;
+ }
+
+ RecoverOk ok = (RecoverOk) response;
+ recoverOks.add(ok);
+ boolean fastPath = ok.executeAt.compareTo(txnId) == 0;
+ shards.forEachOn(from, (i, shard) -> {
+ if (fastPath && ++recoveryWithFastPath[i] == shard.recoveryFastPathSize)
+ ++recoveryWithFastPathQuorums;
+
+ if (++recovery[i] == shard.slowPathQuorumSize)
+ ++recoveryQuorums;
+ });
+
+ if (recoveryQuorums == shards.size())
+ recover();
+ }
+
+ private void recover()
+ {
+ // first look for the most recent Accept; if present, go straight to proposing it again
+ RecoverOk acceptOrCommit = null;
+ for (RecoverOk ok : recoverOks)
+ {
+ if (ok.status.compareTo(Accepted) >= 0)
+ {
+ if (acceptOrCommit == null) acceptOrCommit = ok;
+ else if (acceptOrCommit.status.compareTo(ok.status) < 0) acceptOrCommit = ok;
+ else if (acceptOrCommit.status == ok.status && acceptOrCommit.accepted.compareTo(ok.accepted) < 0) acceptOrCommit = ok;
+ }
+ }
+
+ if (acceptOrCommit != null)
+ {
+ switch (acceptOrCommit.status)
+ {
+ case Accepted:
+ startAccept(acceptOrCommit.executeAt, acceptOrCommit.deps);
+ return;
+ case Committed:
+ case ReadyToExecute:
+ case Executed:
+ case Applied:
+ complete(new Agreed(txnId, txn, acceptOrCommit.executeAt, acceptOrCommit.deps, shards, acceptOrCommit.writes, acceptOrCommit.result));
+ return;
+ }
+ }
+
+ // should all be PreAccept
+ Timestamp maxExecuteAt = txnId;
+ Dependencies deps = new Dependencies();
+ Dependencies earlierAcceptedNoWitness = new Dependencies();
+ Dependencies earlierCommittedWitness = new Dependencies();
+ boolean rejectsFastPath = false;
+ for (RecoverOk ok : recoverOks)
+ {
+ deps.addAll(ok.deps);
+ earlierAcceptedNoWitness.addAll(ok.earlierAcceptedNoWitness);
+ earlierCommittedWitness.addAll(ok.earlierCommittedWitness);
+ maxExecuteAt = Timestamp.max(maxExecuteAt, ok.executeAt);
+ rejectsFastPath |= ok.rejectsFastPath;
+ }
+
+ Timestamp executeAt;
+ if (rejectsFastPath || recoveryWithFastPathQuorums < shards.size())
+ {
+ executeAt = maxExecuteAt;
+ }
+ else
+ {
+ earlierAcceptedNoWitness.removeAll(earlierCommittedWitness);
+ if (!earlierAcceptedNoWitness.isEmpty())
+ {
+ new RetryAfterCommits(earlierCommittedWitness);
+ return;
+ }
+ executeAt = txnId;
+ }
+
+ startAccept(executeAt, deps);
+ }
+
+ @Override
+ public void onFailure(Id from, Throwable throwable)
+ {
+ if (isDone())
+ return;
+
+ shards.forEachOn(from, (i, shard) -> {
+ if (++failure[i] >= shard.slowPathQuorumSize)
+ completeExceptionally(new accord.messages.Timeout());
+ });
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/Timeout.java b/accord-core/src/main/java/accord/coordinate/Timeout.java
new file mode 100644
index 0000000..d405fa7
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/Timeout.java
@@ -0,0 +1,8 @@
+package accord.messages;
+
+/**
+ * Thrown when a transaction exceeds its specified timeout for obtaining a result for a client
+ */
+public class Timeout extends RuntimeException
+{
+}
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
new file mode 100644
index 0000000..e8d6891
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -0,0 +1,363 @@
+package accord.local;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+
+import accord.api.Result;
+import accord.txn.Ballot;
+import accord.txn.Dependencies;
+import accord.txn.Timestamp;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.txn.Writes;
+
+import static accord.local.Status.Accepted;
+import static accord.local.Status.Applied;
+import static accord.local.Status.Committed;
+import static accord.local.Status.Executed;
+import static accord.local.Status.NotWitnessed;
+import static accord.local.Status.PreAccepted;
+import static accord.local.Status.ReadyToExecute;
+
+public class Command implements Listener, Consumer<Listener>
+{
+ public final Instance instance;
+ private final TxnId txnId;
+ private Txn txn;
+ private Ballot promised = Ballot.ZERO, accepted = Ballot.ZERO;
+ private Timestamp executeAt;
+ private Dependencies deps = new Dependencies();
+ private Writes writes;
+ private Result result;
+
+ private Status status = NotWitnessed;
+
+ private NavigableMap<TxnId, Command> waitingOnCommit;
+ private NavigableMap<Timestamp, Command> waitingOnApply;
+
+ private final Listeners listeners = new Listeners();
+
+ public Command(Instance instance, TxnId id)
+ {
+ this.instance = instance;
+ this.txnId = id;
+ }
+
+ public TxnId txnId()
+ {
+ return txnId;
+ }
+
+ public Txn txn()
+ {
+ return txn;
+ }
+
+ public Ballot promised()
+ {
+ return promised;
+ }
+
+ public Ballot accepted()
+ {
+ return accepted;
+ }
+
+ public Timestamp executeAt()
+ {
+ return executeAt;
+ }
+
+ public Dependencies savedDeps()
+ {
+ return deps;
+ }
+
+ public Writes writes()
+ {
+ return writes;
+ }
+
+ public Result result()
+ {
+ return result;
+ }
+
+ public Status status()
+ {
+ return status;
+ }
+
+ public boolean hasBeen(Status status)
+ {
+ return this.status.compareTo(status) >= 0;
+ }
+
+ public boolean is(Status status)
+ {
+ return this.status == status;
+ }
+
+ // requires that command != null
+ // relies on mutual exclusion for each key
+ public boolean witness(Txn txn)
+ {
+ if (promised.compareTo(Ballot.ZERO) > 0)
+ return false;
+
+ if (hasBeen(PreAccepted))
+ return true;
+
+ Timestamp max = txn.maxConflict(instance);
+ // unlike in the Accord paper, we partition shards within a node, so that to ensure a total order we must either:
+ // - use a global logical clock to issue new timestamps; or
+ // - assign each shard _and_ process a unique id, and use both as components of the timestamp
+ Timestamp witnessed = txnId.compareTo(max) > 0 ? txnId : instance.node().uniqueNow(max);
+
+ this.txn = txn;
+ this.executeAt = witnessed;
+ this.status = PreAccepted;
+
+ txn.register(instance, this);
+ listeners.forEach(this);
+ return true;
+ }
+
+ public boolean accept(Ballot ballot, Txn txn, Timestamp executeAt, Dependencies deps)
+ {
+ if (this.promised.compareTo(ballot) > 0)
+ return false;
+
+ if (hasBeen(Committed))
+ return false;
+
+ witness(txn);
+ this.deps = deps;
+ this.executeAt = executeAt;
+ promised = accepted = ballot;
+ status = Accepted;
+ listeners.forEach(this);
+ return true;
+ }
+
+ // relies on mutual exclusion for each key
+ public boolean commit(Txn txn, Dependencies deps, Timestamp executeAt)
+ {
+ if (hasBeen(Committed))
+ {
+ if (executeAt.equals(this.executeAt))
+ return false;
+
+ instance.node().agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
+ }
+
+ witness(txn);
+ this.status = Committed;
+ this.deps = deps;
+ this.executeAt = executeAt;
+ this.waitingOnCommit = new TreeMap<>();
+ this.waitingOnApply = new TreeMap<>();
+
+ for (TxnId id : savedDeps().on(instance.shard))
+ {
+ Command command = instance.command(id);
+ switch (command.status)
+ {
+ default:
+ throw new IllegalStateException();
+ case NotWitnessed:
+ command.witness(deps.get(command.txnId));
+ case PreAccepted:
+ case Accepted:
+ // we don't know when these dependencies will execute, and cannot execute until we do
+ waitingOnCommit.put(id, command);
+ command.addListener(this);
+ break;
+ case Committed:
+ // TODO: split into ReadyToRead and ReadyToWrite;
+ // the distributed read can be performed as soon as those keys are ready, and in parallel with any other reads
+ // the client can even ACK immediately after; only the write needs to be postponed until other in-progress reads complete
+ case ReadyToExecute:
+ case Executed:
+ case Applied:
+ command.addListener(this);
+ updatePredecessor(command);
+ break;
+ }
+ }
+ if (waitingOnCommit.isEmpty())
+ {
+ waitingOnCommit = null;
+ if (waitingOnApply.isEmpty())
+ waitingOnApply = null;
+ }
+ listeners.forEach(this);
+ maybeExecute();
+ return true;
+ }
+
+ public boolean apply(Txn txn, Dependencies deps, Timestamp executeAt, Writes writes, Result result)
+ {
+ if (hasBeen(Executed) && executeAt.equals(this.executeAt))
+ return false;
+ else if (!hasBeen(Committed))
+ commit(txn, deps, executeAt);
+ else if (!executeAt.equals(this.executeAt))
+ instance.node().agent().onInconsistentTimestamp(this, this.executeAt, executeAt);
+
+ this.executeAt = executeAt;
+ this.writes = writes;
+ this.result = result;
+ this.status = Executed;
+ this.listeners.forEach(this);
+ maybeExecute();
+ return true;
+ }
+
+ public boolean recover(Txn txn, Ballot ballot)
+ {
+ if (this.promised.compareTo(ballot) > 0)
+ return false;
+
+ witness(txn);
+ this.promised = ballot;
+ return true;
+ }
+
+ public Command addListener(Listener listener)
+ {
+ listeners.add(listener);
+ return this;
+ }
+
+ public void removeListener(Listener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ @Override
+ public void onChange(Command command)
+ {
+ switch (command.status)
+ {
+ case Committed:
+ case ReadyToExecute:
+ case Executed:
+ case Applied:
+ if (waitingOnApply != null)
+ {
+ updatePredecessor(command);
+ if (waitingOnCommit != null)
+ {
+ if (waitingOnCommit.remove(command.txnId) != null && waitingOnCommit.isEmpty())
+ waitingOnCommit = null;
+ }
+ if (waitingOnCommit == null && waitingOnApply.isEmpty())
+ waitingOnApply = null;
+ }
+ else
+ {
+ command.removeListener(this);
+ }
+ maybeExecute();
+ break;
+ }
+ }
+
+ private void maybeExecute()
+ {
+ if (status != Committed && status != Executed)
+ return;
+
+ if (waitingOnApply != null)
+ return;
+
+ switch (status)
+ {
+ case Committed:
+ // TODO: maintain distinct ReadyToRead and ReadyToWrite states
+ status = ReadyToExecute;
+ listeners.forEach(this);
+ break;
+ case Executed:
+ writes.apply(instance);
+ status = Applied;
+ listeners.forEach(this);
+ }
+ }
+
+ private void updatePredecessor(Command committed)
+ {
+ if (committed.executeAt.compareTo(executeAt) > 0)
+ {
+ // cannot be a predecessor if we execute later
+ committed.removeListener(this);
+ }
+ else if (committed.hasBeen(Applied))
+ {
+ waitingOnApply.remove(committed.executeAt);
+ committed.removeListener(this);
+ }
+ else
+ {
+ waitingOnApply.putIfAbsent(committed.executeAt, committed);
+ }
+ }
+
+ public Command blockedBy()
+ {
+ Command cur = directlyBlockedBy();
+ if (cur == null)
+ return null;
+
+ Command next;
+ while (null != (next = cur.directlyBlockedBy()))
+ cur = next;
+ return cur;
+ }
+
+ private Command directlyBlockedBy()
+ {
+ // firstly we're waiting on every dep to commit
+ while (waitingOnCommit != null)
+ {
+ // TODO: when we change our liveness mechanism this may not be a problem
+ // cannot guarantee that listener updating this set is invoked before this method by another listener
+ // so we must check the entry is still valid, and potentially remove it if not
+ Command waitingOn = waitingOnCommit.firstEntry().getValue();
+ if (!waitingOn.hasBeen(Committed)) return waitingOn;
+ onChange(waitingOn);
+ }
+
+ while (waitingOnApply != null)
+ {
+ // TODO: when we change our liveness mechanism this may not be a problem
+ // cannot guarantee that listener updating this set is invoked before this method by another listener
+ // so we must check the entry is still valid, and potentially remove it if not
+ Command waitingOn = waitingOnApply.firstEntry().getValue();
+ if (!waitingOn.hasBeen(Applied)) return waitingOn;
+ onChange(waitingOn);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void accept(Listener listener)
+ {
+ listener.onChange(this);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Command{" +
+ "txnId=" + txnId +
+ ", txn=" + txn +
+ ", executeAt=" + executeAt +
+ ", deps=" + deps +
+ ", status=" + status +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java
new file mode 100644
index 0000000..36802e9
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -0,0 +1,46 @@
+package accord.local;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import accord.txn.Timestamp;
+import accord.txn.TxnId;
+
+public class CommandsForKey implements Listener
+{
+ // TODO: efficiency
+ public final NavigableMap<Timestamp, Command> uncommitted = new TreeMap<>();
+ public final NavigableMap<TxnId, Command> committedById = new TreeMap<>();
+ public final NavigableMap<Timestamp, Command> committedByExecuteAt = new TreeMap<>();
+
+ private Timestamp max = Timestamp.NONE;
+
+ public Timestamp max()
+ {
+ return max;
+ }
+
+ @Override
+ public void onChange(Command command)
+ {
+ max = Timestamp.max(max, command.executeAt());
+ switch (command.status())
+ {
+ case Applied:
+ case Executed:
+ case Committed:
+ uncommitted.remove(command.txnId());
+ committedById.put(command.txnId(), command);
+ committedByExecuteAt.put(command.executeAt(), command);
+ command.removeListener(this);
+ break;
+ }
+ }
+
+ public void register(Command command)
+ {
+ max = Timestamp.max(max, command.executeAt());
+ uncommitted.put(command.txnId(), command);
+ command.addListener(this);
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/Instance.java b/accord-core/src/main/java/accord/local/Instance.java
new file mode 100644
index 0000000..c842b1e
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/Instance.java
@@ -0,0 +1,58 @@
+package accord.local;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import accord.api.Key;
+import accord.api.Store;
+import accord.topology.Shard;
+import accord.txn.TxnId;
+
+/**
+ * node-local accord metadata per shard
+ */
+public class Instance
+{
+ public final Shard shard;
+ private final Node node;
+ private final Store store;
+ private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
+ private final NavigableMap<Key, CommandsForKey> commandsForKey = new TreeMap<>();
+
+ public Instance(Shard shard, Node node, Store store)
+ {
+ this.shard = shard;
+ this.node = node;
+ this.store = store;
+ }
+
+ public Command command(TxnId txnId)
+ {
+ return commands.computeIfAbsent(txnId, id -> new Command(this, id));
+ }
+
+ public boolean hasCommand(TxnId txnId)
+ {
+ return commands.containsKey(txnId);
+ }
+
+ public CommandsForKey commandsForKey(Key key)
+ {
+ return commandsForKey.computeIfAbsent(key, ignore -> new CommandsForKey());
+ }
+
+ public boolean hasCommandsForKey(Key key)
+ {
+ return commandsForKey.containsKey(key);
+ }
+
+ public Store store()
+ {
+ return store;
+ }
+
+ public Node node()
+ {
+ return node;
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/Listener.java b/accord-core/src/main/java/accord/local/Listener.java
new file mode 100644
index 0000000..19c47b1
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/Listener.java
@@ -0,0 +1,6 @@
+package accord.local;
+
+public interface Listener
+{
+ void onChange(Command command);
+}
diff --git a/accord-core/src/main/java/accord/local/Listeners.java b/accord-core/src/main/java/accord/local/Listeners.java
new file mode 100644
index 0000000..19bed62
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/Listeners.java
@@ -0,0 +1,7 @@
+package accord.local;
+
+import accord.utils.DeterministicIdentitySet;
+
+class Listeners extends DeterministicIdentitySet<Listener>
+{
+}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
new file mode 100644
index 0000000..7724186
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -0,0 +1,278 @@
+package accord.local;
+
+import java.util.*;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import accord.api.Agent;
+import accord.api.Key;
+import accord.api.MessageSink;
+import accord.api.Result;
+import accord.api.Scheduler;
+import accord.api.Store;
+import accord.coordinate.Coordinate;
+import accord.messages.Callback;
+import accord.messages.Request;
+import accord.messages.Reply;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import accord.topology.Topology;
+import accord.txn.Keys;
+import accord.txn.Timestamp;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+public class Node
+{
+ public static class Id implements Comparable<Id>
+ {
+ public static final Id NONE = new Id(0);
+ public static final Id MAX = new Id(Long.MAX_VALUE);
+
+ public final long id;
+
+ public Id(long id)
+ {
+ this.id = id;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.hashCode(id);
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that instanceof Id && equals((Id) that);
+ }
+
+ public boolean equals(Id that)
+ {
+ return id == that.id;
+ }
+
+ @Override
+ public int compareTo(Id that)
+ {
+ return Long.compare(this.id, that.id);
+ }
+
+ public String toString()
+ {
+ return Long.toString(id);
+ }
+ }
+
+ private final Id id;
+ private final Topology cluster;
+ private final Shards local;
+ private final Instance[] instances;
+ private final MessageSink messageSink;
+ private final Random random;
+
+ private final LongSupplier nowSupplier;
+ private final AtomicReference<Timestamp> now;
+ private final Agent agent;
+
+ // TODO: this really needs to be thought through some more, as it needs to be per-instance in some cases, and per-node in others
+ private final Scheduler scheduler;
+
+ private final Map<TxnId, CompletionStage<Result>> coordinating = new ConcurrentHashMap<>();
+ private final Set<TxnId> pendingRecovery = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+ public Node(Id id, Topology cluster, Shards local, MessageSink messageSink, Random random, LongSupplier nowSupplier, Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler)
+ {
+ this.id = id;
+ this.cluster = cluster;
+ this.random = random;
+ this.agent = agent;
+ this.now = new AtomicReference<>(new Timestamp(nowSupplier.getAsLong(), 0, id));
+ this.local = local;
+ this.messageSink = messageSink;
+ this.instances = new Instance[local.size()];
+ this.nowSupplier = nowSupplier;
+ this.scheduler = scheduler;
+ for (int i = 0 ; i < instances.length ; ++i)
+ instances[i] = new Instance(local.get(i), this, dataSupplier.get());
+ }
+
+ public Timestamp uniqueNow()
+ {
+ return now.updateAndGet(cur -> {
+ // TODO: this diverges from proof; either show isomorphism or make consistent
+ long now = nowSupplier.getAsLong();
+ if (now > cur.real) return new Timestamp(now, 0, id);
+ else return new Timestamp(cur.real, cur.logical + 1, id);
+ });
+ }
+
+ public Timestamp uniqueNow(Timestamp atLeast)
+ {
+ if (now.get().compareTo(atLeast) < 0)
+ now.accumulateAndGet(atLeast, (a, b) -> a.compareTo(b) < 0 ? new Timestamp(b.real, b.logical + 1, id) : a);
+
+ return now.updateAndGet(cur -> {
+ // TODO: this diverges from proof; either show isomorphism or make consistent
+ long now = nowSupplier.getAsLong();
+ if (now > cur.real) return new Timestamp(now, 0, id);
+ else return new Timestamp(cur.real, cur.logical + 1, id);
+ });
+ }
+
+ public long now()
+ {
+ return nowSupplier.getAsLong();
+ }
+
+ public Topology cluster()
+ {
+ return cluster;
+ }
+
+ public Stream<Instance> local(Keys keys)
+ {
+ // TODO: efficiency
+ return Stream.of(local.select(keys, instances, Instance[]::new));
+ }
+
+ public Optional<Instance> local(Key key)
+ {
+ return local(Keys.of(key)).reduce((i1, i2) -> {
+ throw new IllegalStateException("more than one instance encountered for key");
+ });
+ }
+
+ // send to every node besides ourselves
+ public void send(Shards shards, Request send)
+ {
+ Set<Id> contacted = new HashSet<>();
+ shards.forEach(shard -> send(shard, send, contacted));
+ }
+
+ public void send(Shard shard, Request send)
+ {
+ shard.nodes.forEach(node -> messageSink.send(node, send));
+ }
+
+ private <T> void send(Shard shard, Request send, Set<Id> alreadyContacted)
+ {
+ shard.nodes.forEach(node -> {
+ if (alreadyContacted.add(node))
+ send(node, send);
+ });
+ }
+
+ // send to every node besides ourselves
+ public <T> void send(Shards shards, Request send, Callback<T> callback)
+ {
+ // TODO efficiency
+ Set<Id> contacted = new HashSet<>();
+ shards.forEach(shard -> send(shard, send, callback, contacted));
+ }
+
+ public <T> void send(Shard shard, Request send, Callback<T> callback)
+ {
+ shard.nodes.forEach(node -> send(node, send, callback));
+ }
+
+ private <T> void send(Shard shard, Request send, Callback<T> callback, Set<Id> alreadyContacted)
+ {
+ shard.nodes.forEach(node -> {
+ if (alreadyContacted.add(node))
+ send(node, send, callback);
+ });
+ }
+
+ // send to a specific node
+ public <T> void send(Id to, Request send, Callback<T> callback)
+ {
+ messageSink.send(to, send, callback);
+ }
+
+ // send to a specific node
+ public void send(Id to, Request send)
+ {
+ messageSink.send(to, send);
+ }
+
+ public void reply(Id replyingToNode, long replyingToMessage, Reply send)
+ {
+ messageSink.reply(replyingToNode, replyingToMessage, send);
+ }
+
+ public CompletionStage<Result> coordinate(Txn txn)
+ {
+ TxnId txnId = new TxnId(uniqueNow());
+ CompletionStage<Result> result = Coordinate.execute(this, txnId, txn);
+ coordinating.put(txnId, result);
+ result.handle((success, fail) ->
+ {
+ coordinating.remove(txnId);
+ // if we don't succeed, try again in 30s to make sure somebody finishes it
+ // TODO: this is an ugly liveness mechanism
+ if (fail != null && pendingRecovery.add(txnId))
+ scheduler.once(() -> { pendingRecovery.remove(txnId); recover(txnId, txn); } , 30L, TimeUnit.SECONDS);
+ return null;
+ });
+ return result;
+ }
+
+ // TODO: encapsulate in Coordinate, so we can request that e.g. commits be re-sent?
+ public CompletionStage<Result> recover(TxnId txnId, Txn txn)
+ {
+ CompletionStage<Result> result = coordinating.get(txnId);
+ if (result != null)
+ return result;
+
+ result = Coordinate.recover(this, txnId, txn);
+ coordinating.putIfAbsent(txnId, result);
+ result.handle((success, fail) -> {
+ coordinating.remove(txnId);
+ agent.onRecover(this, success, fail);
+ // if we don't succeed, try again in 30s to make sure somebody finishes it
+ // TODO: this is an ugly liveness mechanism
+ if (fail != null && pendingRecovery.add(txnId))
+ scheduler.once(() -> { pendingRecovery.remove(txnId); recover(txnId, txn); } , 30L, TimeUnit.SECONDS);
+ return null;
+ });
+ return result;
+ }
+
+ public void receive(Request request, Id from, long messageId)
+ {
+ scheduler.now(() -> request.process(this, from, messageId));
+ }
+
+ public Scheduler scheduler()
+ {
+ return scheduler;
+ }
+
+ public Random random()
+ {
+ return random;
+ }
+
+ public Agent agent()
+ {
+ return agent;
+ }
+
+ public Id id()
+ {
+ return id;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Node{" + id + '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/local/Status.java b/accord-core/src/main/java/accord/local/Status.java
new file mode 100644
index 0000000..f19478d
--- /dev/null
+++ b/accord-core/src/main/java/accord/local/Status.java
@@ -0,0 +1,11 @@
+package accord.local;
+
+public enum Status
+{
+ NotWitnessed, PreAccepted, Accepted, Committed, ReadyToExecute, Executed, Applied;
+
+ public static Status max(Status a, Status b)
+ {
+ return a.compareTo(b) >= 0 ? a : b;
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java
new file mode 100644
index 0000000..c8708a7
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -0,0 +1,111 @@
+package accord.messages;
+
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.txn.Ballot;
+import accord.local.Node;
+import accord.txn.Timestamp;
+import accord.local.Command;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+import static accord.messages.PreAccept.calculateDeps;
+
+public class Accept implements Request
+{
+ public final Ballot ballot;
+ public final TxnId txnId;
+ public final Txn txn;
+ public final Timestamp executeAt;
+ public final Dependencies deps;
+
+ public Accept(Ballot ballot, TxnId txnId, Txn txn, Timestamp executeAt, Dependencies deps)
+ {
+ this.ballot = ballot;
+ this.txnId = txnId;
+ this.txn = txn;
+ this.executeAt = executeAt;
+ this.deps = deps;
+ }
+
+ public void process(Node on, Node.Id replyToNode, long replyToMessage)
+ {
+ on.reply(replyToNode, replyToMessage, txn.local(on).map(instance -> {
+ Command command = instance.command(txnId);
+ if (!command.accept(ballot, txn, executeAt, deps))
+ return new AcceptNack(command.promised());
+ return new AcceptOk(calculateDeps(instance, txnId, txn, executeAt));
+ }).reduce((r1, r2) -> {
+ if (!r1.isOK()) return r1;
+ if (!r2.isOK()) return r2;
+ AcceptOk ok1 = (AcceptOk) r1;
+ AcceptOk ok2 = (AcceptOk) r2;
+ if (ok1.deps.isEmpty()) return ok2;
+ if (ok2.deps.isEmpty()) return ok1;
+ ok1.deps.addAll(ok2.deps);
+ return ok1;
+ }).orElseThrow());
+ }
+
+ public interface AcceptReply extends Reply
+ {
+ boolean isOK();
+ }
+
+ public static class AcceptOk implements AcceptReply
+ {
+ public final Dependencies deps;
+
+ public AcceptOk(Dependencies deps)
+ {
+ this.deps = deps;
+ }
+
+ @Override
+ public boolean isOK()
+ {
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AcceptOk{" + deps + '}';
+ }
+ }
+
+ public static class AcceptNack implements AcceptReply
+ {
+ public final Timestamp reject;
+
+ public AcceptNack(Timestamp reject)
+ {
+ this.reject = reject;
+ }
+
+ @Override
+ public boolean isOK()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AcceptNack{" + reject + '}';
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Accept{" +
+ "ballot: " + ballot +
+ ", txnId: " + txnId +
+ ", txn: " + txn +
+ ", executeAt: " + executeAt +
+ ", deps: " + deps +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java
new file mode 100644
index 0000000..75d7da9
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -0,0 +1,50 @@
+package accord.messages;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Request;
+import accord.api.Result;
+import accord.txn.Dependencies;
+import accord.txn.Timestamp;
+import accord.txn.Writes;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+public class Apply implements Request
+{
+ final TxnId txnId;
+ final Txn txn;
+ // TODO: these only need to be sent if we don't know if this node has witnessed a Commit
+ final Dependencies deps;
+ final Timestamp executeAt;
+ final Writes writes;
+ final Result result;
+
+ public Apply(TxnId txnId, Txn txn, Timestamp executeAt, Dependencies deps, Writes writes, Result result)
+ {
+ this.txnId = txnId;
+ this.txn = txn;
+ this.deps = deps;
+ this.executeAt = executeAt;
+ this.writes = writes;
+ this.result = result;
+ }
+
+ public void process(Node node, Id replyToNode, long replyToMessage)
+ {
+ txn.local(node).forEach(instance -> instance.command(txnId).apply(txn, deps, executeAt, writes, result));
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Apply{" +
+ "txnId: " + txnId +
+ ", txn: " + txn +
+ ", deps: " + deps +
+ ", executeAt: " + executeAt +
+ ", writes: " + writes +
+ ", result: " + result +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
new file mode 100644
index 0000000..2c3aeb2
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -0,0 +1,227 @@
+package accord.messages;
+
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.api.Result;
+import accord.txn.Writes;
+import accord.txn.Ballot;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.txn.Timestamp;
+import accord.local.Command;
+import accord.txn.Dependencies;
+import accord.local.Status;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+import static accord.local.Status.Accepted;
+import static accord.local.Status.Applied;
+import static accord.local.Status.Committed;
+import static accord.local.Status.NotWitnessed;
+import static accord.local.Status.PreAccepted;
+import static accord.messages.PreAccept.calculateDeps;
+
+public class BeginRecovery implements Request
+{
+ final TxnId txnId;
+ final Txn txn;
+ final Ballot ballot;
+
+ public BeginRecovery(TxnId txnId, Txn txn, Ballot ballot)
+ {
+ this.txnId = txnId;
+ this.txn = txn;
+ this.ballot = ballot;
+ }
+
+ public void process(Node node, Id replyToNode, long replyToMessage)
+ {
+ RecoverReply reply = txn.local(node).map(instance -> {
+ Command command = instance.command(txnId);
+
+ if (!command.recover(txn, ballot))
+ return new RecoverNack(command.promised());
+
+ Dependencies deps = command.status() == PreAccepted ? calculateDeps(instance, txnId, txn, txnId)
+ : command.savedDeps();
+
+ boolean rejectsFastPath;
+ Dependencies earlierCommittedWitness, earlierAcceptedNoWitness;
+
+ if (command.hasBeen(Committed))
+ {
+ rejectsFastPath = false;
+ earlierCommittedWitness = earlierAcceptedNoWitness = new Dependencies();
+ }
+ else
+ {
+ rejectsFastPath = txn.uncommittedStartedAfter(instance, txnId)
+ .filter(c -> c.hasBeen(Accepted))
+ .anyMatch(c -> !c.savedDeps().contains(txnId));
+ if (!rejectsFastPath)
+ rejectsFastPath = txn.committedExecutesAfter(instance, txnId)
+ .anyMatch(c -> !c.savedDeps().contains(txnId));
+
+ earlierCommittedWitness = txn.committedStartedBefore(instance, txnId)
+ .filter(c -> c.savedDeps().contains(txnId))
+ .collect(Dependencies::new, Dependencies::add, Dependencies::addAll);
+
+ earlierAcceptedNoWitness = txn.uncommittedStartedBefore(instance, txnId)
+ .filter(c -> c.is(Accepted) && !c.savedDeps().contains(txnId))
+ .filter(c -> c.savedDeps().contains(txnId))
+ .collect(Dependencies::new, Dependencies::add, Dependencies::addAll);
+ }
+ return new RecoverOk(command.status(), command.accepted(), command.executeAt(), deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, command.writes(), command.result());
+ }).reduce((r1, r2) -> {
+ if (!r1.isOK()) return r1;
+ if (!r2.isOK()) return r2;
+ RecoverOk ok1 = (RecoverOk) r1;
+ RecoverOk ok2 = (RecoverOk) r2;
+
+ // set ok1 to the most recent of the two
+ if (ok1.status.compareTo(ok2.status) < 0)
+ { RecoverOk tmp = ok1; ok1 = ok2; ok2 = tmp; }
+
+ switch (ok1.status)
+ {
+ default: throw new IllegalStateException();
+ case PreAccepted:
+ if (ok2.status == NotWitnessed)
+ throw new IllegalStateException();
+ break;
+
+ case Accepted:
+ // we currently replicate all deps to every shard, so all Accepted should have the same information
+ // but we must pick the one with the newest ballot
+ if (ok2.status == Accepted)
+ return ok1.accepted.compareTo(ok2.accepted) >= 0 ? ok1 : ok2;
+
+ case Committed:
+ case ReadyToExecute:
+ case Executed:
+ case Applied:
+ // we currently replicate all deps to every shard, so all Committed should have the same information
+ return ok1;
+ }
+
+ // ok1 and ok2 both PreAccepted
+ Dependencies deps;
+ if (ok1.deps.equals(ok2.deps))
+ {
+ deps = ok1.deps;
+ }
+ else
+ {
+ deps = new Dependencies();
+ deps.addAll(ok1.deps);
+ deps.addAll(ok2.deps);
+ }
+ ok1.earlierCommittedWitness.addAll(ok2.earlierCommittedWitness);
+ ok1.earlierAcceptedNoWitness.addAll(ok2.earlierAcceptedNoWitness);
+ ok1.earlierAcceptedNoWitness.removeAll(ok1.earlierCommittedWitness);
+ return new RecoverOk(
+ ok1.status,
+ Ballot.max(ok1.accepted, ok2.accepted),
+ Timestamp.max(ok1.executeAt, ok2.executeAt),
+ deps,
+ ok1.earlierCommittedWitness,
+ ok1.earlierAcceptedNoWitness,
+ ok1.rejectsFastPath | ok2.rejectsFastPath,
+ ok1.writes, ok1.result);
+ }).orElseThrow();
+
+ node.reply(replyToNode, replyToMessage, reply);
+ if (reply instanceof RecoverOk && ((RecoverOk) reply).status == Applied)
+ {
+ // disseminate directly
+ RecoverOk ok = (RecoverOk) reply;
+ node.send(node.cluster().forKeys(txn.keys), new Apply(txnId, txn, ok.executeAt, ok.deps, ok.writes, ok.result));
+ }
+ }
+
+ public interface RecoverReply extends Reply
+ {
+ boolean isOK();
+ }
+
+ public static class RecoverOk implements RecoverReply
+ {
+ public final Status status;
+ public final Ballot accepted;
+ public final Timestamp executeAt;
+ public final Dependencies deps;
+ public final Dependencies earlierCommittedWitness; // counter-point to earlierAcceptedNoWitness
+ public final Dependencies earlierAcceptedNoWitness; // wait for these to commit
+ public final boolean rejectsFastPath;
+ public final Writes writes;
+ public final Result result;
+
+ RecoverOk(Status status, Ballot accepted, Timestamp executeAt, Dependencies deps, Dependencies earlierCommittedWitness, Dependencies earlierAcceptedNoWitness, boolean rejectsFastPath, Writes writes, Result result)
+ {
+ this.accepted = accepted;
+ this.executeAt = executeAt;
+ this.status = status;
+ this.deps = deps;
+ this.earlierCommittedWitness = earlierCommittedWitness;
+ this.earlierAcceptedNoWitness = earlierAcceptedNoWitness;
+ this.rejectsFastPath = rejectsFastPath;
+ this.writes = writes;
+ this.result = result;
+ }
+
+ @Override
+ public boolean isOK()
+ {
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RecoverOk{" +
+ "status:" + status +
+ ", accepted:" + accepted +
+ ", executeAt:" + executeAt +
+ ", deps:" + deps +
+ ", earlierCommittedWitness:" + earlierCommittedWitness +
+ ", earlierAcceptedNoWitness:" + earlierAcceptedNoWitness +
+ ", rejectsFastPath:" + rejectsFastPath +
+ ", writes:" + writes +
+ ", result:" + result +
+ '}';
+ }
+ }
+
+ public static class RecoverNack implements RecoverReply
+ {
+ final Ballot supersededBy;
+ private RecoverNack(Ballot supersededBy)
+ {
+ this.supersededBy = supersededBy;
+ }
+
+ @Override
+ public boolean isOK()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RecoverNack{" +
+ "supersededBy:" + supersededBy +
+ '}';
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BeginRecovery{" +
+ "txnId:" + txnId +
+ ", txn:" + txn +
+ ", ballot:" + ballot +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/Callback.java b/accord-core/src/main/java/accord/messages/Callback.java
new file mode 100644
index 0000000..4c1d830
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Callback.java
@@ -0,0 +1,9 @@
+package accord.messages;
+
+import accord.local.Node.Id;
+
+public interface Callback<T>
+{
+ void onSuccess(Id from, T response);
+ void onFailure(Id from, Throwable throwable);
+}
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
new file mode 100644
index 0000000..927c29a
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -0,0 +1,41 @@
+package accord.messages;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Request;
+import accord.txn.Timestamp;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+// TODO: CommitOk responses, so we can send again if no reply received? Or leave to recovery?
+public class Commit extends ReadData implements Request
+{
+ final Timestamp executeAt;
+ final Dependencies deps;
+ final boolean read;
+
+ public Commit(TxnId txnId, Txn txn, Timestamp executeAt, Dependencies deps, boolean read)
+ {
+ super(txnId, txn);
+ this.executeAt = executeAt;
+ this.deps = deps;
+ this.read = read;
+ }
+
+ public void process(Node node, Id from, long messageId)
+ {
+ txn.local(node).forEach(instance -> instance.command(txnId).commit(txn, deps, executeAt));
+ if (read) super.process(node, from, messageId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Commit{" +
+ "executeAt: " + executeAt +
+ ", deps: " + deps +
+ ", read: " + read +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/Message.java b/accord-core/src/main/java/accord/messages/Message.java
new file mode 100644
index 0000000..928fcc7
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Message.java
@@ -0,0 +1,5 @@
+package accord.messages;
+
+public interface Message
+{
+}
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
new file mode 100644
index 0000000..0ae2e76
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -0,0 +1,137 @@
+package accord.messages;
+
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import accord.local.Instance;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.txn.Timestamp;
+import accord.local.Command;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+public class PreAccept implements Request
+{
+ public final TxnId txnId;
+ public final Txn txn;
+
+ public PreAccept(TxnId txnId, Txn txn)
+ {
+ this.txnId = txnId;
+ this.txn = txn;
+ }
+
+ public void process(Node node, Id from, long messageId)
+ {
+ node.reply(from, messageId, txn.local(node).map(instance -> {
+ Command command = instance.command(txnId);
+ if (!command.witness(txn))
+ return PreAcceptNack.INSTANCE;
+ // TODO: only lookup keys relevant to this instance
+ // TODO: why don't we calculate deps from the executeAt timestamp??
+ return new PreAcceptOk(command.executeAt(), calculateDeps(instance, txnId, txn, txnId));
+ }).reduce((r1, r2) -> {
+ if (!r1.isOK()) return r1;
+ if (!r2.isOK()) return r2;
+ PreAcceptOk ok1 = (PreAcceptOk) r1;
+ PreAcceptOk ok2 = (PreAcceptOk) r2;
+ PreAcceptOk okMax = ok1.witnessedAt.compareTo(ok2.witnessedAt) >= 0 ? ok1 : ok2;
+ if (ok1 != okMax && !ok1.deps.isEmpty()) okMax.deps.addAll(ok1.deps);
+ if (ok2 != okMax && !ok2.deps.isEmpty()) okMax.deps.addAll(ok2.deps);
+ return okMax;
+ }).orElseThrow());
+ }
+
+ public interface PreAcceptReply extends Reply
+ {
+ boolean isOK();
+ }
+
+ public static class PreAcceptOk implements PreAcceptReply
+ {
+ public final Timestamp witnessedAt;
+ public final Dependencies deps;
+
+ public PreAcceptOk(Timestamp witnessedAt, Dependencies deps)
+ {
+ this.witnessedAt = witnessedAt;
+ this.deps = deps;
+ }
+
+ @Override
+ public boolean isOK()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PreAcceptOk that = (PreAcceptOk) o;
+ return witnessedAt.equals(that.witnessedAt) && deps.equals(that.deps);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(witnessedAt, deps);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PreAcceptOk{" +
+ "witnessedAt=" + witnessedAt +
+ ", deps=" + deps +
+ '}';
+ }
+ }
+
+ public static class PreAcceptNack implements PreAcceptReply
+ {
+ public static final PreAcceptNack INSTANCE = new PreAcceptNack();
+
+ private PreAcceptNack() {}
+
+ @Override
+ public boolean isOK()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PreAcceptNack{}";
+ }
+ }
+
+ static Dependencies calculateDeps(Instance instance, TxnId txnId, Txn txn, Timestamp executeAt)
+ {
+ NavigableMap<TxnId, Txn> deps = new TreeMap<>();
+ txn.conflictsMayExecuteBefore(instance, executeAt).forEach(conflict -> {
+ if (conflict.txnId().equals(txnId))
+ return;
+
+ if (txn.isWrite() || conflict.txn().isWrite())
+ deps.put(conflict.txnId(), conflict.txn());
+ });
+ return new Dependencies(deps);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PreAccept{" +
+ "txnId: " + txnId +
+ ", txn: " + txn +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
new file mode 100644
index 0000000..45f9f30
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -0,0 +1,233 @@
+package accord.messages;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import accord.local.Instance;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.api.Data;
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.local.Command;
+import accord.local.Listener;
+import accord.local.Status;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.txn.Timestamp;
+import accord.api.Scheduler.Scheduled;
+import accord.utils.DeterministicIdentitySet;
+
+public class ReadData implements Request
+{
+ static class LocalRead implements Listener
+ {
+ final TxnId txnId;
+ final Node node;
+ final Node.Id replyToNode;
+ final long replyToMessage;
+
+ Data data;
+ boolean isObsolete; // TODO: respond with the Executed result we have stored?
+ Set<Instance> waitingOn;
+ Scheduled waitingOnReporter;
+
+ LocalRead(TxnId txnId, Node node, Id replyToNode, long replyToMessage)
+ {
+ this.txnId = txnId;
+ this.node = node;
+ this.replyToNode = replyToNode;
+ this.replyToMessage = replyToMessage;
+ // TODO: this is messy, we want a complete separate liveness mechanism that ensures progress for all transactions
+ this.waitingOnReporter = node.scheduler().once(new ReportWaiting(), 1L, TimeUnit.SECONDS);
+ }
+
+ class ReportWaiting implements Listener, Runnable
+ {
+ @Override
+ public void onChange(Command command)
+ {
+ command.removeListener(this);
+ run();
+ }
+
+ @Override
+ public void run()
+ {
+ Iterator<Instance> i = waitingOn.iterator();
+ Command blockedBy = null;
+ while (i.hasNext() && null == (blockedBy = i.next().command(txnId).blockedBy()));
+ if (blockedBy == null) return;
+ blockedBy.addListener(this);
+ assert blockedBy.status().compareTo(Status.NotWitnessed) > 0;
+ node.reply(replyToNode, replyToMessage, new ReadWaiting(blockedBy.txnId(), blockedBy.txn(), blockedBy.executeAt(), blockedBy.status()));
+ }
+ }
+
+ @Override
+ public synchronized void onChange(Command command)
+ {
+ switch (command.status())
+ {
+ case NotWitnessed:
+ case PreAccepted:
+ case Accepted:
+ case Committed:
+ return;
+
+ case Executed:
+ case Applied:
+ obsolete(command);
+ case ReadyToExecute:
+ }
+
+ command.removeListener(this);
+ if (!isObsolete)
+ read(command);
+ }
+
+ private void read(Command command)
+ {
+ // TODO: threading/futures (don't want to perform expensive reads within this mutually exclusive context)
+ Data next = command.txn().read(command);
+ data = data == null ? next : data.merge(next);
+
+ waitingOn.remove(command.instance);
+ if (waitingOn.isEmpty())
+ {
+ waitingOnReporter.cancel();
+ node.reply(replyToNode, replyToMessage, new ReadOk(data));
+ }
+ }
+
+ void obsolete(Command command)
+ {
+ if (!isObsolete)
+ {
+ isObsolete = true;
+ waitingOnReporter.cancel();
+ node.send(command.instance.shard, new Apply(command.txnId(), command.txn(), command.executeAt(), command.savedDeps(), command.writes(), command.result()));
+ node.reply(replyToNode, replyToMessage, new ReadNack());
+ }
+ }
+
+ synchronized void setup(TxnId txnId, Txn txn)
+ {
+ // TODO: simple hash set supporting concurrent modification, or else avoid concurrent modification
+ waitingOn = txn.local(node).collect(Collectors.toCollection(() -> new DeterministicIdentitySet<>()));
+ waitingOn.forEach(instance -> {
+ Command command = instance.command(txnId);
+ command.witness(txn);
+ switch (command.status())
+ {
+ case NotWitnessed:
+ throw new IllegalStateException();
+ case PreAccepted:
+ case Accepted:
+ case Committed:
+ command.addListener(this);
+ break;
+
+ case Executed:
+ case Applied:
+ obsolete(command);
+ break;
+
+ case ReadyToExecute:
+ if (!isObsolete)
+ read(command);
+ }
+ });
+ }
+ }
+
+ final TxnId txnId;
+ final Txn txn;
+
+ public ReadData(TxnId txnId, Txn txn)
+ {
+ this.txnId = txnId;
+ this.txn = txn;
+ }
+
+ public void process(Node node, Node.Id from, long messageId)
+ {
+ new LocalRead(txnId, node, from, messageId).setup(txnId, txn);
+ }
+
+ public static class ReadReply implements Reply
+ {
+ public boolean isOK()
+ {
+ return true;
+ }
+ }
+
+ public static class ReadNack extends ReadReply
+ {
+ @Override
+ public boolean isOK()
+ {
+ return false;
+ }
+ }
+
+ public static class ReadOk extends ReadReply
+ {
+ public final Data data;
+ public ReadOk(Data data)
+ {
+ this.data = data;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ReadOk{" + data + '}';
+ }
+ }
+
+ public static class ReadWaiting extends ReadReply
+ {
+ public final TxnId txnId;
+ public final Txn txn;
+ public final Timestamp executeAt;
+ public final Status status;
+
+ public ReadWaiting(TxnId txnId, Txn txn, Timestamp executeAt, Status status)
+ {
+ this.txnId = txnId;
+ this.txn = txn;
+ this.executeAt = executeAt;
+ this.status = status;
+ }
+
+ @Override
+ public boolean isFinal()
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ReadWaiting{" +
+ "txnId:" + txnId +
+ ", txn:" + txn +
+ ", executeAt:" + executeAt +
+ ", status:" + status +
+ '}';
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ReadData{" +
+ "txnId:" + txnId +
+ ", txn:" + txn +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/messages/Reply.java b/accord-core/src/main/java/accord/messages/Reply.java
new file mode 100644
index 0000000..b516a4a
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Reply.java
@@ -0,0 +1,6 @@
+package accord.messages;
+
+public interface Reply extends Message
+{
+ default boolean isFinal() { return true; }
+}
diff --git a/accord-core/src/main/java/accord/messages/Request.java b/accord-core/src/main/java/accord/messages/Request.java
new file mode 100644
index 0000000..50ffbd5
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/Request.java
@@ -0,0 +1,9 @@
+package accord.messages;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+
+public interface Request extends Message
+{
+ void process(Node on, Id from, long messageId);
+}
diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
new file mode 100644
index 0000000..2007215
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java
@@ -0,0 +1,102 @@
+package accord.messages;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import accord.local.Instance;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.local.Command;
+import accord.local.Listener;
+import accord.txn.TxnId;
+import accord.txn.Keys;
+
+public class WaitOnCommit implements Request
+{
+ static class LocalWait implements Listener
+ {
+ final Node node;
+ final Id replyToNode;
+ final long replyToMessage;
+
+ int waitingOn;
+
+ LocalWait(Node node, Id replyToNode, long replyToMessage)
+ {
+ this.node = node;
+ this.replyToNode = replyToNode;
+ this.replyToMessage = replyToMessage;
+ }
+
+ @Override
+ public synchronized void onChange(Command command)
+ {
+ switch (command.status())
+ {
+ default:
+ throw new IllegalStateException();
+ case NotWitnessed:
+ case PreAccepted:
+ case Accepted:
+ return;
+
+ case Committed:
+ case Executed:
+ case Applied:
+ case ReadyToExecute:
+ }
+
+ command.removeListener(this);
+ ack();
+ }
+
+ private void ack()
+ {
+ if (--waitingOn == 0)
+ node.reply(replyToNode, replyToMessage, new WaitOnCommitOk());
+ }
+
+ synchronized void setup(TxnId txnId, Keys keys)
+ {
+ List<Instance> instances = node.local(keys).collect(Collectors.toList());
+ waitingOn = instances.size();
+ instances.forEach(instance -> {
+ Command command = instance.command(txnId);
+ switch (command.status())
+ {
+ case NotWitnessed:
+ case PreAccepted:
+ case Accepted:
+ command.addListener(this);
+ break;
+
+ case Committed:
+ case Executed:
+ case Applied:
+ case ReadyToExecute:
+ ack();
+ }
+ });
+ }
+ }
+
+ final TxnId txnId;
+ final Keys keys;
+
+ public WaitOnCommit(TxnId txnId, Keys keys)
+ {
+ this.txnId = txnId;
+ this.keys = keys;
+ }
+
+ public void process(Node node, Id replyToNode, long replyToMessage)
+ {
+ new LocalWait(node, replyToNode, replyToMessage).setup(txnId, keys);
+ }
+
+ public static class WaitOnCommitOk implements Reply
+ {
+ }
+}
diff --git a/accord-core/src/main/java/accord/topology/Shard.java b/accord-core/src/main/java/accord/topology/Shard.java
new file mode 100644
index 0000000..925308c
--- /dev/null
+++ b/accord-core/src/main/java/accord/topology/Shard.java
@@ -0,0 +1,57 @@
+package accord.topology;
+
+import java.util.List;
+import java.util.Set;
+
+import accord.local.Node.Id;
+import accord.api.Key;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class Shard
+{
+ public final Key start, end;
+ public final List<Id> nodes;
+ public final Set<Id> fastPathElectorate;
+ public final int recoveryFastPathSize;
+ public final int fastPathQuorumSize;
+ public final int slowPathQuorumSize;
+
+ public Shard(Key start, Key end, List<Id> nodes, Set<Id> fastPathElectorate)
+ {
+ this.start = start;
+ this.end = end;
+ this.nodes = nodes;
+ int f = maxToleratedFailures(nodes.size());
+ this.fastPathElectorate = fastPathElectorate;
+ int e = fastPathElectorate.size();
+ this.recoveryFastPathSize = (f+1)/2;
+ this.slowPathQuorumSize = f + 1;
+ this.fastPathQuorumSize = fastPathQuorumSize(nodes.size(), e, f);
+ }
+
+ @VisibleForTesting
+ static int maxToleratedFailures(int replicas)
+ {
+ return (replicas - 1) / 2;
+ }
+
+ @VisibleForTesting
+ static int fastPathQuorumSize(int replicas, int electorate, int f)
+ {
+ Preconditions.checkArgument(electorate >= replicas - f);
+// return (fastPathElectorateSize + f + 1 + 1) / 2;
+ return (f + electorate)/2 + 1;
+ }
+
+ public boolean contains(Key key)
+ {
+ return key.compareTo(start) >= 0 && key.compareTo(end) < 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Shard[" + start + ',' + end + ']';
+ }
+}
diff --git a/accord-core/src/main/java/accord/topology/Shards.java b/accord-core/src/main/java/accord/topology/Shards.java
new file mode 100644
index 0000000..6c5eadf
--- /dev/null
+++ b/accord-core/src/main/java/accord/topology/Shards.java
@@ -0,0 +1,22 @@
+package accord.topology;
+
+import java.util.Collections;
+import java.util.Map;
+
+import accord.local.Node.Id;
+import accord.txn.Keys;
+
+public class Shards extends Topology
+{
+ public static final Shards EMPTY = new Shards(Keys.EMPTY, new Shard[0], Collections.emptyMap(), Keys.EMPTY, new int[0]);
+
+ public Shards(Shard... shards)
+ {
+ super(shards);
+ }
+
+ public Shards(Keys starts, Shard[] shards, Map<Id, NodeInfo> nodeLookup, Keys subsetOfStarts, int[] supersetIndexes)
+ {
+ super(starts, shards, nodeLookup, subsetOfStarts, supersetIndexes);
+ }
+}
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
new file mode 100644
index 0000000..c3308b7
--- /dev/null
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -0,0 +1,245 @@
+package accord.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import accord.local.Node.Id;
+import accord.api.Key;
+import accord.txn.Keys;
+import accord.utils.IndexedConsumer;
+
+public class Topology extends AbstractCollection<Shard>
+{
+ // TODO: introduce range version of Keys
+ final Keys starts;
+ final Shard[] shards;
+ final Map<Id, Shards.NodeInfo> nodeLookup;
+ final Keys subsetOfStarts;
+ final int[] supersetIndexes;
+
+ static class NodeInfo
+ {
+ final Keys starts;
+ final int[] supersetIndexes;
+
+ NodeInfo(Keys starts, int[] supersetIndexes)
+ {
+ this.starts = starts;
+ this.supersetIndexes = supersetIndexes;
+ }
+
+ @Override
+ public String toString()
+ {
+ return starts.toString();
+ }
+ }
+
+ public Topology(Shard... shards)
+ {
+ this.starts = new Keys(Arrays.stream(shards).map(shard -> shard.start).sorted().collect(Collectors.toList()));
+ this.shards = shards;
+ this.subsetOfStarts = starts;
+ this.supersetIndexes = IntStream.range(0, shards.length).toArray();
+ this.nodeLookup = new HashMap<>();
+ Map<Id, List<Integer>> build = new HashMap<>();
+ for (int i = 0 ; i < shards.length ; ++i)
+ {
+ for (Id node : shards[i].nodes)
+ build.computeIfAbsent(node, ignore -> new ArrayList<>()).add(i);
+ }
+ for (Map.Entry<Id, List<Integer>> e : build.entrySet())
+ {
+ int[] supersetIndexes = e.getValue().stream().mapToInt(i -> i).toArray();
+ Keys starts = this.starts.select(supersetIndexes);
+ nodeLookup.put(e.getKey(), new Shards.NodeInfo(starts, supersetIndexes));
+ }
+ }
+
+ public Topology(Keys starts, Shard[] shards, Map<Id, Shards.NodeInfo> nodeLookup, Keys subsetOfStarts, int[] supersetIndexes)
+ {
+ this.starts = starts;
+ this.shards = shards;
+ this.nodeLookup = nodeLookup;
+ this.subsetOfStarts = subsetOfStarts;
+ this.supersetIndexes = supersetIndexes;
+ }
+
+ public Shards forNode(Id node)
+ {
+ NodeInfo info = nodeLookup.get(node);
+ if (info == null)
+ return Shards.EMPTY;
+ return forKeys(info.starts);
+ }
+
+ public Shard forKey(Key key)
+ {
+ int i = starts.floorIndex(key);
+ return shards[i];
+ }
+
+ public Shards forKeys(Keys select)
+ {
+ int subsetIndex = 0;
+ int count = 0;
+ int[] newSubset = new int[Math.min(select.size(), subsetOfStarts.size())];
+ for (int i = 0 ; i < select.size() ; )
+ {
+ subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
+ int supersetIndex = supersetIndexes[subsetIndex];
+ newSubset[count++] = supersetIndex;
+ Shard shard = shards[supersetIndex];
+ i = select.ceilIndex(i, select.size(), shard.end);
+ }
+ if (count != newSubset.length)
+ newSubset = Arrays.copyOf(newSubset, count);
+ Keys subsetOfKeys = starts.select(newSubset);
+ return new Shards(starts, shards, nodeLookup, subsetOfKeys, newSubset);
+ }
+
+ /**
+ * @param on the node to limit our selection to
+ * @param select may be a superSet of the keys owned by {@code on} but not of this {@code Shards}
+ */
+ public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
+ {
+ Shards.NodeInfo info = nodeLookup.get(on);
+// int nodeIndex = 0;
+// int subsetIndex = 0;
+// for (int i = select.ceilIndex(info.starts.get(0)) ; i < select.size() ; )
+// {
+// nodeIndex = info.starts.floorIndex(nodeIndex, info.starts.size(), select.get(i));
+// int supersetIndex = info.supersetIndexes[nodeIndex];
+// Shard shard = shards[supersetIndex];
+// if (shard.end.compareTo(select.get(i)) > 0)
+// {
+// subsetIndex = Arrays.binarySearch(supersetIndexes, subsetIndex, supersetIndexes.length, supersetIndex);
+// consumer.accept(subsetIndex, shard);
+// }
+// i = select.ceilIndex(i + 1, select.size(), shard.end);
+// }
+
+ for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
+ {
+ Key key = select.get(i);
+ Shard shard = shards[supersetIndexes[j]];
+ int c = supersetIndexes[j] - info.supersetIndexes[k];
+ if (c < 0) ++j;
+ else if (c > 0) ++k;
+ else if (key.compareTo(shard.start) < 0) ++i;
+ else if (key.compareTo(shard.end) < 0) { consumer.accept(j, shard); i++; j++; k++; }
+ else { j++; k++; }
+ }
+ }
+
+ public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
+ {
+ // TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
+ Shards.NodeInfo info = nodeLookup.get(on);
+ int[] a = supersetIndexes, b = info.supersetIndexes;
+ int ai = 0, bi = 0;
+ while (ai < a.length && bi < b.length)
+ {
+ if (a[ai] == b[bi])
+ {
+ consumer.accept(ai, shards[a[ai]]);
+ ++ai; ++bi;
+ }
+ else if (a[ai] < b[bi])
+ {
+ ai = Arrays.binarySearch(a, ai + 1, a.length, b[bi]);
+ if (ai < 0) ai = -1 -ai;
+ }
+ else
+ {
+ bi = Arrays.binarySearch(b, bi + 1, b.length, a[ai]);
+ if (bi < 0) bi = -1 -bi;
+ }
+ }
+ }
+
+ public void forEach(IndexedConsumer<Shard> consumer)
+ {
+ for (int i = 0 ; i < supersetIndexes.length ; ++i)
+ consumer.accept(i, shards[supersetIndexes[i]]);
+ }
+
+
+ public <T> T[] select(Keys select, T[] indexedByShard, IntFunction<T[]> constructor)
+ {
+ List<T> selection = new ArrayList<>();
+// int subsetIndex = 0;
+// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
+// {
+// subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
+// selection.add(indexedByShard[subsetIndex]);
+// Shard shard = shards[supersetIndexes[subsetIndex]];
+// i = select.ceilIndex(i + 1, select.size(), shard.end);
+// }
+
+// int minSubsetIndex = 0;
+// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
+// {
+// int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
+// selection.add(indexedByShard[subsetIndex]);
+// minSubsetIndex = subsetIndex + 1;
+// if (minSubsetIndex == supersetIndexes.length)
+// break;
+// Shard shard = shards[supersetIndexes[minSubsetIndex]];
+// i = select.ceilIndex(i + 1, select.size(), shard.start);
+// }
+
+// int minSubsetIndex = 0;
+// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
+// {
+// int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
+// Shard shard = shards[supersetIndexes[subsetIndex]];
+// if (shard.end.compareTo(select.get(i)) > 0)
+// selection.add(indexedByShard[subsetIndex]);
+// minSubsetIndex = subsetIndex + 1;
+// if (minSubsetIndex == supersetIndexes.length)
+// break;
+//
+// shard = shards[supersetIndexes[minSubsetIndex]];
+// i = select.ceilIndex(i + 1, select.size(), shard.start);
+// }
+
+ for (int i = 0, j = 0 ; i < select.size() && j < supersetIndexes.length ;)
+ {
+ Key k = select.get(i);
+ Shard shard = shards[supersetIndexes[j]];
+ int c = k.compareTo(shard.start);
+ if (c < 0) ++i;
+ else if (k.compareTo(shard.end) < 0) { selection.add(indexedByShard[j++]); i++; }
+ else j++;
+ }
+
+ return selection.toArray(constructor);
+ }
+
+ @Override
+ public Iterator<Shard> iterator()
+ {
+ return IntStream.of(supersetIndexes).mapToObj(i -> shards[i]).iterator();
+ }
+
+ @Override
+ public int size()
+ {
+ return subsetOfStarts.size();
+ }
+
+ public Shard get(int index)
+ {
+ return shards[supersetIndexes[index]];
+ }
+}
diff --git a/accord-core/src/main/java/accord/txn/Ballot.java b/accord-core/src/main/java/accord/txn/Ballot.java
new file mode 100644
index 0000000..4482357
--- /dev/null
+++ b/accord-core/src/main/java/accord/txn/Ballot.java
@@ -0,0 +1,18 @@
+package accord.txn;
+
+import accord.local.Node.Id;
+
+public class Ballot extends Timestamp
+{
+ public static final Ballot ZERO = new Ballot(Timestamp.NONE);
+
+ public Ballot(Timestamp from)
+ {
+ super(from);
+ }
+
+ public Ballot(long real, int logical, Id node)
+ {
+ super(real, logical, node);
+ }
+}
diff --git a/accord-core/src/main/java/accord/txn/Dependencies.java b/accord-core/src/main/java/accord/txn/Dependencies.java
new file mode 100644
index 0000000..0c6fa83
--- /dev/null
+++ b/accord-core/src/main/java/accord/txn/Dependencies.java
@@ -0,0 +1,107 @@
+package accord.txn;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import accord.local.Command;
+import accord.topology.Shard;
+import com.google.common.annotations.VisibleForTesting;
+
+// TODO: do not send Txn
+// TODO: implementation efficiency
+public class Dependencies implements Iterable<Entry<TxnId, Txn>>
+{
+ // TODO: encapsulate
+ public final NavigableMap<TxnId, Txn> deps;
+
+ public Dependencies()
+ {
+ this.deps = new TreeMap<>();
+ }
+
+ public Dependencies(NavigableMap<TxnId, Txn> deps)
+ {
+ this.deps = deps;
+ }
+
+ public void add(Command command)
+ {
+ add(command.txnId(), command.txn());
+ }
+
+ @VisibleForTesting
+ public Dependencies add(TxnId txnId, Txn txn)
+ {
+ deps.put(txnId, txn);
+ return this;
+ }
+
+ public void addAll(Dependencies add)
+ {
+ this.deps.putAll(add.deps);
+ }
+
+ public void removeAll(Dependencies add)
+ {
+ this.deps.keySet().removeAll(add.deps.keySet());
+ }
+
+ public boolean contains(TxnId txnId)
+ {
+ return deps.containsKey(txnId);
+ }
+
+ public boolean isEmpty()
+ {
+ return deps.isEmpty();
+ }
+
+ public Txn get(TxnId txnId)
+ {
+ return deps.get(txnId);
+ }
+
+ public Iterable<TxnId> on(Shard shard)
+ {
+ // TODO: efficiency
+ return deps.entrySet()
+ .stream()
+ .filter(e -> e.getValue().keys().stream().anyMatch(shard::contains))
+ .map(Entry::getKey)::iterator;
+ }
+
+ @Override
+ public Iterator<Entry<TxnId, Txn>> iterator()
+ {
+ return deps.entrySet().iterator();
+ }
+
+ public int size()
+ {
+ return deps.size();
+ }
+
+ @Override
+ public String toString()
+ {
+ return deps.keySet().toString();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Dependencies entries = (Dependencies) o;
+ return deps.equals(entries.deps);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(deps);
+ }
+}
diff --git a/accord-core/src/main/java/accord/txn/Keys.java b/accord-core/src/main/java/accord/txn/Keys.java
new file mode 100644
index 0000000..4e49a8c
--- /dev/null
+++ b/accord-core/src/main/java/accord/txn/Keys.java
@@ -0,0 +1,160 @@
+package accord.txn;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import accord.api.Key;
+
+@SuppressWarnings("rawtypes")
+public class Keys implements Iterable<Key>
+{
+ public static final Keys EMPTY = new Keys(new Key[0]);
+
+ final Key[] keys;
+
+ public Keys(SortedSet<? extends Key> keys)
+ {
+ this.keys = keys.toArray(Key[]::new);
+ }
+
+ public Keys(Collection<? extends Key> keys)
+ {
+ this.keys = keys.toArray(Key[]::new);
+ Arrays.sort(this.keys);
+ }
+
+ public Keys(Key[] keys)
+ {
+ this.keys = keys;
+ Arrays.sort(keys);
+ }
+
+ public int indexOf(Key key)
+ {
+ return Arrays.binarySearch(keys, key);
+ }
+
+ public Key get(int indexOf)
+ {
+ return keys[indexOf];
+ }
+
+ public Stream<Key> subSet(Key start, boolean isInclusiveStart, Key end, boolean isInclusiveEnd)
+ {
+ int i = Arrays.binarySearch(keys, start);
+ if (i < 0) i = -1 -i;
+ else if (!isInclusiveStart) ++i;
+ int j = Arrays.binarySearch(keys, end);
+ if (j < 0) j = -1 -j;
+ else if (isInclusiveEnd) ++j;
+ return Arrays.stream(keys, i, j);
+ }
+
+ public Keys select(int[] indexes)
+ {
+ Key[] selection = new Key[indexes.length];
+ for (int i = 0 ; i < indexes.length ; ++i)
+ selection[i] = keys[indexes[i]];
+ return new Keys(selection);
+ }
+
+ public int size()
+ {
+ return keys.length;
+ }
+
+ public int ceilIndex(int lowerBound, int upperBound, Key key)
+ {
+ int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
+ if (i < 0) i = -1 - i;
+ return i;
+ }
+
+ public int ceilIndex(Key key)
+ {
+ return ceilIndex(0, keys.length, key);
+ }
+
+ public int higherIndex(int lowerBound, int upperBound, Key key)
+ {
+ int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
+ if (i < 0) i = -1 - i;
+ else ++i;
+ return i;
+ }
+
+ public int higherIndex(Key key)
+ {
+ return higherIndex(0, keys.length, key);
+ }
+
+ public int floorIndex(int lowerBound, int upperBound, Key key)
+ {
+ int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
+ if (i < 0) i = -2 - i;
+ return i;
+ }
+
+ public int floorIndex(Key key)
+ {
+ return floorIndex(0, keys.length, key);
+ }
+
+ public int lowerIndex(int lowerBound, int upperBound, Key key)
+ {
+ int i = Arrays.binarySearch(keys, lowerBound, upperBound, key);
+ if (i < 0) i = -2 - i;
+ else --i;
+ return i;
+ }
+
+ public int lowerIndex(Key key)
+ {
+ return lowerIndex(0, keys.length, key);
+ }
+
+ public Stream<Key> stream()
+ {
+ return Stream.of(keys);
+ }
+
+ @Override
+ public Iterator<Key> iterator()
+ {
+ return new Iterator<>()
+ {
+ int i = 0;
+ @Override
+ public boolean hasNext()
+ {
+ return i < keys.length;
+ }
+
+ @Override
+ public Key next()
+ {
+ return keys[i++];
+ }
+ };
+ }
+
+ @Override
+ public String toString()
+ {
+ return stream().map(Object::toString).collect(Collectors.joining(",", "[", "]"));
+ }
+
+ public static Keys of(Key k0, Key... kn)
+ {
+ Key[] keys = new Key[kn.length + 1];
+ keys[0] = k0;
+ for (int i=0; i<kn.length; i++)
+ keys[i + 1] = kn[i];
+
+ return new Keys(keys);
+ }
+}
diff --git a/accord-core/src/main/java/accord/txn/Timestamp.java b/accord-core/src/main/java/accord/txn/Timestamp.java
new file mode 100644
index 0000000..f85986c
--- /dev/null
+++ b/accord-core/src/main/java/accord/txn/Timestamp.java
@@ -0,0 +1,65 @@
+package accord.txn;
+
+import accord.local.Node.Id;
+
+public class Timestamp implements Comparable<Timestamp>
+{
+ public static final Timestamp NONE = new Timestamp(0, 0, Id.NONE);
+ public static final Timestamp MAX = new Timestamp(Long.MAX_VALUE, Integer.MAX_VALUE, Id.MAX);
+
+ public final long real;
+ public final int logical;
+ public final Id node;
+
+ public Timestamp(long real, int logical, Id node)
+ {
+ this.real = real;
+ this.logical = logical;
+ this.node = node;
+ }
+
+ public Timestamp(Timestamp copy)
+ {
+ this.real = copy.real;
+ this.logical = copy.logical;
+ this.node = copy.node;
+ }
+
+ @Override
+ public int compareTo(Timestamp that)
+ {
+ int c = Long.compare(this.real, that.real);
+ if (c == 0) c = Integer.compare(this.logical, that.logical);
+ if (c == 0) c = this.node.compareTo(that.node);
+ return c;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (int) (((real * 31) + node.hashCode()) * 31 + logical);
+ }
+
+ public boolean equals(Timestamp that)
+ {
+ return this.real == that.real && this.logical == that.logical && this.node.equals(that.node);
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that instanceof Timestamp && equals((Timestamp) that);
+ }
+
+ public static <T extends Timestamp> T max(T a, T b)
+ {
+ return a.compareTo(b) >= 0 ? a : b;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[" + real + ',' + logical + ',' + node + ']';
+ }
+
+}
diff --git a/accord-core/src/main/java/accord/txn/Txn.java b/accord-core/src/main/java/accord/txn/Txn.java
new file mode 100644
index 0000000..f41daeb
--- /dev/null
+++ b/accord-core/src/main/java/accord/txn/Txn.java
@@ -0,0 +1,164 @@
+package accord.txn;
+
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+import accord.api.Data;
+import accord.api.Query;
+import accord.api.Read;
+import accord.api.Result;
+import accord.api.Update;
+import accord.api.Key;
+import accord.api.Store;
+import accord.local.Command;
+import accord.local.CommandsForKey;
+import accord.local.Instance;
+import accord.local.Node;
+
+public class Txn
+{
+ enum Kind { READ, WRITE, RECONFIGURE }
+
+ final Kind kind;
+ public final Keys keys;
+ public final Read read;
+ public final Query query;
+ public final Update update;
+
+ public Txn(Keys keys, Read read, Query query)
+ {
+ this.kind = Kind.READ;
+ this.keys = keys;
+ this.read = read;
+ this.query = query;
+ this.update = null;
+ }
+
+ public Txn(Keys keys, Read read, Query query, Update update)
+ {
+ this.kind = Kind.WRITE;
+ this.keys = keys;
+ this.read = read;
+ this.update = update;
+ this.query = query;
+ }
+
+ public boolean isWrite()
+ {
+ switch (kind)
+ {
+ default:
+ throw new IllegalStateException();
+ case READ:
+ return false;
+ case WRITE:
+ case RECONFIGURE:
+ return true;
+ }
+ }
+
+ public Result result(Data data)
+ {
+ return query.compute(data);
+ }
+
+ public Writes execute(Timestamp executeAt, Data data)
+ {
+ if (update == null)
+ return new Writes(executeAt, keys, null);
+
+ return new Writes(executeAt, keys, update.apply(data));
+ }
+
+ public Keys keys()
+ {
+ return keys;
+ }
+
+ public String toString()
+ {
+ return "read:" + read.toString() + (update != null ? ", update:" + update : "");
+ }
+
+ public Data read(Key start, Key end, Store store)
+ {
+ return read.read(start, end, store);
+ }
+
+ public Data read(Command command)
+ {
+ Instance instance = command.instance;
+ return read(instance.shard.start, instance.shard.end, instance.store());
+ }
+
+ // TODO: move these somewhere else?
+ public Stream<Instance> local(Node node)
+ {
+ return node.local(keys());
+ }
+
+ public Timestamp maxConflict(Instance instance)
+ {
+ return maxConflict(instance, keys());
+ }
+
+ public Stream<Command> conflictsMayExecuteBefore(Instance instance, Timestamp mayExecuteBefore)
+ {
+ return keys().stream().flatMap(key -> {
+ CommandsForKey forKey = instance.commandsForKey(key);
+ return Stream.concat(
+ forKey.uncommitted.headMap(mayExecuteBefore, false).values().stream(),
+ // TODO: only return latest of Committed?
+ forKey.committedByExecuteAt.headMap(mayExecuteBefore, false).values().stream()
+ );
+ });
+ }
+
+ public Stream<Command> uncommittedStartedBefore(Instance instance, TxnId startedBefore)
+ {
+ return keys().stream().flatMap(key -> {
+ CommandsForKey forKey = instance.commandsForKey(key);
+ return forKey.uncommitted.headMap(startedBefore, false).values().stream();
+ });
+ }
+
+ public Stream<Command> committedStartedBefore(Instance instance, TxnId startedBefore)
+ {
+ return keys().stream().flatMap(key -> {
+ CommandsForKey forKey = instance.commandsForKey(key);
+ return forKey.committedById.headMap(startedBefore, false).values().stream();
+ });
+ }
+
+ public Stream<Command> uncommittedStartedAfter(Instance instance, TxnId startedAfter)
+ {
+ return keys().stream().flatMap(key -> {
+ CommandsForKey forKey = instance.commandsForKey(key);
+ return forKey.uncommitted.tailMap(startedAfter, false).values().stream();
+ });
+ }
+
+ public Stream<Command> committedExecutesAfter(Instance instance, TxnId startedAfter)
+ {
+ return keys().stream().flatMap(key -> {
+ CommandsForKey forKey = instance.commandsForKey(key);
+ return forKey.committedByExecuteAt.tailMap(startedAfter, false).values().stream();
+ });
+ }
+
+ public void register(Instance instance, Command command)
+ {
+ assert instance == command.instance;
+ keys().forEach(key -> instance.commandsForKey(key).register(command));
+ }
+
+ protected Timestamp maxConflict(Instance instance, Keys keys)
+ {
+ return keys.stream()
+ .map(instance::commandsForKey)
+ .map(CommandsForKey::max)
+ .max(Comparator.naturalOrder())
+ .orElse(Timestamp.NONE);
+ }
+
+}
diff --git a/accord-core/src/main/java/accord/txn/TxnId.java b/accord-core/src/main/java/accord/txn/TxnId.java
new file mode 100644
index 0000000..0ba1565
--- /dev/null
+++ b/accord-core/src/main/java/accord/txn/TxnId.java
@@ -0,0 +1,16 @@
+package accord.txn;
+
+import accord.local.Node.Id;
+
+public class TxnId extends Timestamp
+{
+ public TxnId(Timestamp timestamp)
+ {
+ super(timestamp);
+ }
+
+ public TxnId(long real, int logical, Id node)
+ {
+ super(real, logical, node);
+ }
+}
diff --git a/accord-core/src/main/java/accord/txn/Writes.java b/accord-core/src/main/java/accord/txn/Writes.java
new file mode 100644
index 0000000..d17b8e6
--- /dev/null
+++ b/accord-core/src/main/java/accord/txn/Writes.java
@@ -0,0 +1,34 @@
+package accord.txn;
+
+import accord.api.Write;
+import accord.local.Instance;
+
+public class Writes
+{
+ public final Timestamp executeAt;
+ public final Keys keys;
+ public final Write write;
+
+ public Writes(Timestamp executeAt, Keys keys, Write write)
+ {
+ this.executeAt = executeAt;
+ this.keys = keys;
+ this.write = write;
+ }
+
+ public void apply(Instance instance)
+ {
+ if (write != null)
+ write.apply(instance.shard.start, instance.shard.end, executeAt, instance.store());
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TxnWrites{" +
+ "executeAt:" + executeAt +
+ ", keys:" + keys +
+ ", write:" + write +
+ '}';
+ }
+}
diff --git a/accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java b/accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java
new file mode 100644
index 0000000..b05b62a
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java
@@ -0,0 +1,95 @@
+package accord.utils;
+
+import java.util.AbstractSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+
+public class DeterministicIdentitySet<T> extends AbstractSet<T>
+{
+ static class Entry<T>
+ {
+ final T item;
+ Entry<T> prev;
+ Entry<T> next;
+
+ Entry(T item)
+ {
+ this.item = item;
+ }
+ }
+
+ // TODO: an identity hash map that doesn't mind concurrent modification / iteration
+ final IdentityHashMap<T, Entry<T>> lookup = new IdentityHashMap<>();
+ final Entry<T> head = new Entry<T>(null);
+
+ public DeterministicIdentitySet()
+ {
+ head.prev = head.next = head;
+ }
+
+ @Override
+ public Iterator<T> iterator()
+ {
+ return new Iterator<T>()
+ {
+ Entry<T> next = head.next;
+ @Override
+ public boolean hasNext()
+ {
+ return next != head;
+ }
+
+ @Override
+ public T next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ T result = next.item;
+ next = next.next;
+ return result;
+ }
+ };
+ }
+
+ @Override
+ public int size()
+ {
+ return lookup.size();
+ }
+
+ // we add to the front, and iterate in reverse order, so that we can add and remove while iterating without modifying the set we iterate over
+ public boolean add(T item)
+ {
+ Entry<T> entry = lookup.computeIfAbsent(item, Entry::new);
+ if (entry.prev != null)
+ return false;
+ entry.prev = head;
+ entry.next = head.next;
+ head.next = entry;
+ entry.next.prev = entry;
+ return true;
+ }
+
+ public boolean remove(Object item)
+ {
+ Entry<T> entry = lookup.remove(item);
+ if (entry == null)
+ return false;
+ Entry<T> prev = entry.prev, next = entry.next;
+ prev.next = next;
+ next.prev = prev;
+ return true;
+ }
+
+ public void forEach(Consumer<? super T> consumer)
+ {
+ Entry<T> next = head.next;
+ while (next != head)
+ {
+ consumer.accept(next.item);
+ next = next.next;
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/utils/IndexedConsumer.java b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
new file mode 100644
index 0000000..c8f92b9
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
@@ -0,0 +1,6 @@
+package accord.utils;
+
+public interface IndexedConsumer<V>
+{
+ void accept(int i, V v);
+}
diff --git a/accord-core/src/main/java/accord/utils/KeyRange.java b/accord-core/src/main/java/accord/utils/KeyRange.java
new file mode 100644
index 0000000..6ade671
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/KeyRange.java
@@ -0,0 +1,20 @@
+package accord.utils;
+
+import accord.api.Key;
+
+public class KeyRange<K extends Key<K>>
+{
+ public final K start;
+ public final K end;
+
+ private KeyRange(K start, K end)
+ {
+ this.start = start;
+ this.end = end;
+ }
+
+ public static <K extends Key<K>> KeyRange<K> of(K start, K end)
+ {
+ return new KeyRange<>(start, end);
+ }
+}
diff --git a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java
new file mode 100644
index 0000000..1ba217f
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java
@@ -0,0 +1,83 @@
+package accord.utils;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import accord.api.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThreadPoolScheduler implements Scheduler
+{
+ private static final Logger logger = LoggerFactory.getLogger(ThreadPoolScheduler.class);
+ final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
+ public ThreadPoolScheduler()
+ {
+ exec.setMaximumPoolSize(1);
+ }
+
+ static class FutureAsScheduled implements Scheduled
+ {
+ final ScheduledFuture<?> f;
+
+ FutureAsScheduled(ScheduledFuture<?> f)
+ {
+ this.f = f;
+ }
+
+ @Override
+ public void cancel()
+ {
+ f.cancel(true);
+ }
+ }
+
+ static Runnable wrap(Runnable runnable)
+ {
+ return () ->
+ {
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable t)
+ {
+ logger.error("Unhandled Exception", t);
+ throw t;
+ }
+ };
+ }
+
+ @Override
+ public Scheduled recurring(Runnable run, long delay, TimeUnit units)
+ {
+ return new FutureAsScheduled(exec.scheduleWithFixedDelay(wrap(run), delay, delay, units));
+ }
+
+ @Override
+ public Scheduled once(Runnable run, long delay, TimeUnit units)
+ {
+ return new FutureAsScheduled(exec.schedule(wrap(run), delay, units));
+ }
+
+ @Override
+ public void now(Runnable run)
+ {
+ exec.execute(wrap(run));
+ }
+
+ public void stop()
+ {
+ exec.shutdown();
+ try
+ {
+ if (!exec.awaitTermination(1L, TimeUnit.MINUTES))
+ throw new IllegalStateException("did not terminate");
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/utils/Timestamped.java b/accord-core/src/main/java/accord/utils/Timestamped.java
new file mode 100644
index 0000000..528389f
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/Timestamped.java
@@ -0,0 +1,20 @@
+package accord.utils;
+
+import accord.txn.Timestamp;
+
+public class Timestamped<T>
+{
+ public final Timestamp timestamp;
+ public final T data;
+
+ public Timestamped(Timestamp timestamp, T data)
+ {
+ this.timestamp = timestamp;
+ this.data = data;
+ }
+
+ public static <T> Timestamped<T> merge(Timestamped<T> a, Timestamped<T> b)
+ {
+ return a.timestamp.compareTo(b.timestamp) >= 0 ? a : b;
+ }
+}
diff --git a/accord-core/src/main/java/accord/utils/WrapAroundList.java b/accord-core/src/main/java/accord/utils/WrapAroundList.java
new file mode 100644
index 0000000..13be283
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/WrapAroundList.java
@@ -0,0 +1,32 @@
+package accord.utils;
+
+import java.util.AbstractList;
+
+public class WrapAroundList<T> extends AbstractList<T>
+{
+ final T[] contents;
+ final int start, end, size;
+
+ public WrapAroundList(T[] contents, int start, int end)
+ {
+ this.contents = contents;
+ this.start = start;
+ this.end = end;
+ this.size = end > start ? end - start : end + (contents.length - start);
+ }
+
+
+ @Override
+ public T get(int index)
+ {
+ int i = start + index;
+ if (i >= contents.length) i -= contents.length;
+ return contents[i];
+ }
+
+ @Override
+ public int size()
+ {
+ return size;
+ }
+}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/utils/WrapAroundSet.java b/accord-core/src/main/java/accord/utils/WrapAroundSet.java
new file mode 100644
index 0000000..5bb3b9e
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/WrapAroundSet.java
@@ -0,0 +1,40 @@
+package accord.utils;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Map;
+
+public class WrapAroundSet<T> extends AbstractSet<T>
+{
+ final Map<T, Integer> lookup;
+ final WrapAroundList<T> list;
+
+ public WrapAroundSet(Map<T, Integer> lookup, WrapAroundList<T> list)
+ {
+ this.lookup = lookup;
+ this.list = list;
+ }
+
+ @Override
+ public boolean contains(Object o)
+ {
+ Integer i = lookup.get(o);
+ if (i == null) return false;
+ if (list.end > list.start)
+ return i >= list.start && i < list.end;
+ else
+ return i >= list.start || i < list.end;
+ }
+
+ @Override
+ public Iterator<T> iterator()
+ {
+ return list.iterator();
+ }
+
+ @Override
+ public int size()
+ {
+ return list.size;
+ }
+}
diff --git a/accord-core/src/test/java/accord/NetworkFilter.java b/accord-core/src/test/java/accord/NetworkFilter.java
new file mode 100644
index 0000000..b9cd0c2
--- /dev/null
+++ b/accord-core/src/test/java/accord/NetworkFilter.java
@@ -0,0 +1,74 @@
+package accord;
+
+import accord.local.Node.Id;
+import accord.messages.Message;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+public class NetworkFilter
+{
+ private final Logger logger = LoggerFactory.getLogger(NetworkFilter.class);
+
+ private interface DiscardPredicate
+ {
+ boolean check(Id from, Id to, Message message);
+ }
+
+ Set<DiscardPredicate> discardPredicates = Sets.newConcurrentHashSet();
+
+ public boolean shouldDiscard(Id from, Id to, Message message)
+ {
+ return discardPredicates.stream().anyMatch(p -> p.check(from, to, message));
+ }
+
+ public void isolate(Id node)
+ {
+ logger.info("Isolating node {}", node);
+ discardPredicates.add((from, to, msg) -> from.equals(node) || to.equals(node));
+ }
+
+ public void isolate(Iterable<Id> ids)
+ {
+ for (Id id : ids)
+ isolate(id);
+ }
+
+ public static Predicate<Id> anyId()
+ {
+ return id -> true;
+ }
+
+ public static Predicate<Id> isId(Iterable<Id> ids)
+ {
+ return ImmutableSet.copyOf(ids)::contains;
+ }
+
+ public static Predicate<Message> isMessageType(Class<? extends Message> klass)
+ {
+ return msg -> klass.isAssignableFrom(msg.getClass());
+ }
+
+ public static Predicate<Message> notMessageType(Class<? extends Message> klass)
+ {
+ return Predicate.not(isMessageType(klass));
+ }
+
+ /**
+ * message will be discarded if all predicates apply
+ */
+ public void addFilter(Predicate<Id> fromPredicate, Predicate<Id> toPredicate, Predicate<Message> messagePredicate)
+ {
+ discardPredicates.add((from, to, msg) -> fromPredicate.test(from) && toPredicate.test(to) && messagePredicate.test(msg));
+ }
+
+ public void clear()
+ {
+ logger.info("Clearing network filters");
+ discardPredicates.clear();
+ }
+}
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
new file mode 100644
index 0000000..3421753
--- /dev/null
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -0,0 +1,48 @@
+package accord;
+
+import accord.local.Node;
+import accord.impl.mock.MockStore;
+import accord.txn.Txn;
+import accord.txn.Keys;
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Utils
+{
+ public static Node.Id id(int i)
+ {
+ return new Node.Id(i);
+ }
+
+ public static List<Node.Id> ids(int num)
+ {
+ List<Node.Id> rlist = new ArrayList<>(num);
+ for (int i=0; i<num; i++)
+ {
+ rlist.add(id(i+1));
+ }
+ return rlist;
+ }
+
+ public static List<Node.Id> ids(int first, int last)
+ {
+ Preconditions.checkArgument(last >= first);
+ List<Node.Id> rlist = new ArrayList<>(last - first + 1);
+ for (int i=first; i<=last; i++)
+ rlist.add(id(i));
+
+ return rlist;
+ }
+
+ public static Txn writeTxn(Keys keys)
+ {
+ return new Txn(keys, MockStore.READ, MockStore.QUERY, MockStore.UPDATE);
+ }
+
+ public static Txn readTxn(Keys keys)
+ {
+ return new Txn(keys, MockStore.READ, MockStore.QUERY);
+ }
+}
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
new file mode 100644
index 0000000..8f3821e
--- /dev/null
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -0,0 +1,274 @@
+package accord.burn;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import accord.impl.IntHashKey;
+import accord.impl.basic.Cluster;
+import accord.impl.basic.RandomDelayQueue.Factory;
+import accord.impl.IntKey;
+import accord.impl.TopologyFactory;
+import accord.impl.basic.Packet;
+import accord.impl.basic.PendingQueue;
+import accord.impl.list.ListQuery;
+import accord.impl.list.ListRead;
+import accord.impl.list.ListRequest;
+import accord.impl.list.ListResult;
+import accord.impl.list.ListUpdate;
+import accord.verify.SerializabilityVerifier;
+import accord.verify.LinearizabilityVerifier;
+import accord.verify.LinearizabilityVerifier.Observation;
+import accord.local.Node.Id;
+import accord.api.Key;
+import accord.txn.Txn;
+import accord.txn.Keys;
+
+public class BurnTest
+{
+ static List<Packet> generate(Random random, List<Id> clients, List<Id> nodes, int keyCount, int operations)
+ {
+ List<Key> keys = new ArrayList<>();
+ for (int i = 0 ; i < keyCount ; ++i)
+ keys.add(IntHashKey.key(i));
+
+ List<Packet> packets = new ArrayList<>();
+ int[] next = new int[keyCount];
+
+ for (int count = 0 ; count < operations ; ++count)
+ {
+ Id client = clients.get(random.nextInt(clients.size()));
+ Id node = nodes.get(random.nextInt(clients.size()));
+
+ int readCount = 1 + random.nextInt(2);
+ int writeCount = random.nextInt(3);
+
+ TreeSet<Key> requestKeys = new TreeSet<>();
+ while (readCount-- > 0)
+ requestKeys.add(randomKey(random, keys, requestKeys));
+
+ ListUpdate update = new ListUpdate();
+ while (writeCount-- > 0)
+ {
+ int i = randomKeyIndex(random, keys, update.keySet());
+ update.put(keys.get(i), ++next[i]);
+ }
+
+ requestKeys.addAll(update.keySet());
+ ListRead read = new ListRead(new Keys(requestKeys));
+ ListQuery query = new ListQuery(client, count, read.keys, update);
+ ListRequest request = new ListRequest(new Txn(new Keys(requestKeys), read, query, update));
+ packets.add(new Packet(client, node, count, request));
+ }
+
+ return packets;
+ }
+
+ private static Key randomKey(Random random, List<Key> keys, Set<Key> notIn)
+ {
+ return keys.get(randomKeyIndex(random, keys, notIn));
+ }
+
+ private static int randomKeyIndex(Random random, List<Key> keys, Set<Key> notIn)
+ {
+ int i;
+ while (notIn.contains(keys.get(i = random.nextInt(keys.size()))));
+ return i;
+ }
+
+ static void burn(TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException
+ {
+ Random random = new Random();
+ long seed = random.nextLong();
+ System.out.println(seed);
+ random.setSeed(seed);
+ burn(random, topologyFactory, clients, nodes, keyCount, operations, concurrency);
+ }
+
+ static void burn(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException
+ {
+ burn(seed, topologyFactory, clients, nodes, keyCount, operations, concurrency, System.out, System.err);
+ }
+
+ static void burn(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency, PrintStream out, PrintStream err) throws IOException
+ {
+ Random random = new Random();
+ System.out.println(seed);
+ random.setSeed(seed);
+ burn(random, topologyFactory, clients, nodes, keyCount, operations, concurrency, out, err);
+ }
+
+ static void burn(Random random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency)
+ {
+ burn(random, topologyFactory, clients, nodes, keyCount, operations, concurrency, System.out, System.err);
+ }
+
+ static void reconcile(long seed, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency) throws IOException, ExecutionException, InterruptedException, TimeoutException
+ {
+ ReconcilingOutputStreams streams = new ReconcilingOutputStreams(System.out, System.err, 2);
+ Random random1 = new Random(), random2 = new Random();
+ random1.setSeed(seed);
+ random2.setSeed(seed);
+ PrintStream out1 = new PrintStream(streams.get(0));
+ PrintStream out2 = new PrintStream(streams.get(1));
+ ExecutorService exec = Executors.newFixedThreadPool(2);
+ Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, clients, nodes, keyCount, operations, concurrency, out1, out1));
+ Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, clients, nodes, keyCount, operations, concurrency, out2, out2));
+ exec.shutdown();
+ f1.get();
+ f2.get();
+ }
+
+ static void burn(Random random, TopologyFactory topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int operations, int concurrency, PrintStream stdout, PrintStream stderr)
+ {
+ PendingQueue queue = new Factory(random).get();
+
+ SerializabilityVerifier serializable = new SerializabilityVerifier(keyCount);
+ Map<Integer, LinearizabilityVerifier> linearizableMap = new HashMap<>();
+
+ Packet[] requests = generate(random, clients, nodes, keyCount, operations).toArray(Packet[]::new);
+ int[] starts = new int[requests.length];
+ Packet[] replies = new Packet[requests.length];
+
+ AtomicInteger clock = new AtomicInteger();
+ AtomicInteger requestIndex = new AtomicInteger();
+ for (int max = Math.min(concurrency, requests.length) ; requestIndex.get() < max ; )
+ {
+ int i = requestIndex.getAndIncrement();
+ starts[i] = clock.incrementAndGet();
+ queue.add(requests[i]);
+ }
+
+ // not used for atomicity, just for encapsulation
+ Consumer<Packet> responseSink = packet -> {
+ ListResult reply = (ListResult) packet.message;
+ if (replies[(int)packet.replyId] != null)
+ return;
+
+ if (requestIndex.get() < requests.length)
+ {
+ int i = requestIndex.getAndIncrement();
+ starts[i] = clock.incrementAndGet();
+ queue.add(requests[i]);
+ }
+
+ stdout.println(reply);
+ serializable.begin();
+
+ ListUpdate update = (ListUpdate) ((ListRequest) requests[(int)packet.replyId].message).txn.update;
+ int start = starts[(int)packet.replyId];
+ int end = clock.incrementAndGet();
+ replies[(int)packet.replyId] = packet;
+
+ for (int i = 0 ; i < reply.read.length ; ++i)
+ {
+ Key key = reply.keys.get(i);
+ int k = key(key);
+
+ int[] read = reply.read[i];
+ int write = reply.update.getOrDefault(key, -1);
+
+ if (read != null)
+ {
+ // TODO: standardise read to include/exclude write
+ serializable.witnessRead(k, read);
+ if (write >= 0)
+ read = append(read, write);
+ linearizableMap.computeIfAbsent(k, LinearizabilityVerifier::new)
+ .witnessRead(new Observation(read, start, end));
+ }
+ if (write >= 0)
+ {
+ serializable.witnessWrite(k, write);
+ linearizableMap.computeIfAbsent(k, LinearizabilityVerifier::new)
+ .witnessWrite(write, start, end, true);
+ }
+ }
+
+ serializable.apply();
+ };
+
+ Cluster.run(nodes.toArray(Id[]::new), () -> queue,
+ responseSink, () -> new Random(random.nextLong()), () -> new AtomicLong()::incrementAndGet,
+ topologyFactory, () -> null, stderr);
+
+ stdout.printf("Received %d acks to %d operations\n", clock.get() - operations, operations);
+ if (clock.get() != operations * 2)
+ {
+ for (int i = 0 ; i < requests.length ; ++i)
+ {
+ stdout.println(requests[i]);
+ stdout.println("\t\t" + replies[i]);
+ }
+ throw new AssertionError("Incomplete set of responses");
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ PrintStream devnull = new PrintStream(new OutputStream()
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+ });
+
+ while (true)
+ {
+ long seed = ThreadLocalRandom.current().nextLong();
+// long seed = 5844871443302548687L;
+ System.out.println("Seed " + seed);
+ Random random = new Random(seed);
+ List<Id> clients = generateIds(true, 1 + random.nextInt(4));
+ List<Id> nodes = generateIds(false, 5 + random.nextInt(5));
+ burn(random, new TopologyFactory<>(nodes.size() == 5 ? 3 : (2 + random.nextInt(3)), IntHashKey.ranges(4 + random.nextInt(12))),
+ clients,
+ nodes,
+ 5 + random.nextInt(15),
+ 100,
+ 10 + random.nextInt(30),
+// System.out,
+// System.err
+ devnull, devnull
+ );
+ }
+ }
+
+ private static List<Id> generateIds(boolean clients, int count)
+ {
+ List<Id> ids = new ArrayList<>();
+ for (int i = 1; i <= count ; ++i)
+ ids.add(new Id(clients ? -i : i));
+ return ids;
+ }
+
+ private static int key(Key key)
+ {
+ return ((IntKey) key).key;
+ }
+
+ private static int[] append(int[] to, int append)
+ {
+ to = Arrays.copyOf(to, to.length + 1);
+ to[to.length - 1] = append;
+ return to;
+ }
+}
diff --git a/accord-core/src/test/java/accord/burn/ReconcilingOutputStreams.java b/accord-core/src/test/java/accord/burn/ReconcilingOutputStreams.java
new file mode 100644
index 0000000..9788c92
--- /dev/null
+++ b/accord-core/src/test/java/accord/burn/ReconcilingOutputStreams.java
@@ -0,0 +1,73 @@
+package accord.burn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+
+class ReconcilingOutputStreams
+{
+ final PrintStream matches;
+ final PrintStream mismatches;
+ final ReconcilingOutputStream[] streams;
+ int waiting;
+ int epoch;
+
+ class ReconcilingOutputStream extends ByteArrayOutputStream
+ {
+ public void flush() throws IOException
+ {
+ synchronized (ReconcilingOutputStreams.this)
+ {
+ ++waiting;
+ if (waiting == streams.length)
+ {
+ byte[] check = streams[0].toByteArray();
+ boolean equal = true;
+ for (int i = 1; equal && i < streams.length; ++i)
+ equal = Arrays.equals(check, streams[i].toByteArray());
+
+ if (equal) matches.write(check);
+ else
+ {
+ mismatches.write(check);
+ for (int i = 1; i < streams.length; ++i)
+ mismatches.write(streams[i].toByteArray());
+ }
+
+ waiting = 0;
+ epoch++;
+ ReconcilingOutputStreams.this.notifyAll();
+ }
+ else
+ {
+ int until = epoch + 1;
+ try
+ {
+ while (epoch < until) ReconcilingOutputStreams.this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ reset();
+ }
+ }
+ }
+
+ ReconcilingOutputStreams(PrintStream matches, PrintStream mismatches, int count)
+ {
+ this.matches = matches;
+ this.mismatches = mismatches;
+ this.streams = new ReconcilingOutputStream[count];
+ for (int i = 0; i < count; ++i)
+ streams[i] = new ReconcilingOutputStream();
+ }
+
+ ReconcilingOutputStream get(int i)
+ {
+ return streams[i];
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
new file mode 100644
index 0000000..d819b94
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -0,0 +1,45 @@
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.impl.mock.MockCluster;
+import accord.impl.IntKey;
+import accord.api.Result;
+import accord.impl.mock.MockStore;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static accord.Utils.ids;
+import static accord.Utils.writeTxn;
+
+public class CoordinateTest
+{
+ @Test
+ void simpleTest() throws Throwable
+ {
+ MockCluster cluster = MockCluster.builder().build();
+ Node node = cluster.get(1);
+ Assertions.assertNotNull(node);
+
+ TxnId txnId = new TxnId(100, 0, node.id());
+ Txn txn = writeTxn(IntKey.keys(10));
+ Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
+ Assertions.assertEquals(MockStore.RESULT, result);
+ }
+
+ @Test
+ void slowPathTest() throws Throwable
+ {
+ MockCluster cluster = MockCluster.builder().nodes(7).replication(7).build();
+ cluster.networkFilter.isolate(ids(5, 7));
+
+ Node node = cluster.get(1);
+ Assertions.assertNotNull(node);
+
+ TxnId txnId = new TxnId(100, 0, node.id());
+ Txn txn = writeTxn(IntKey.keys(10));
+ Result result = Coordinate.execute(node, txnId, txn).toCompletableFuture().get();
+ Assertions.assertEquals(MockStore.RESULT, result);
+ }
+}
diff --git a/accord-core/src/test/java/accord/coordinate/RecoverTest.java b/accord-core/src/test/java/accord/coordinate/RecoverTest.java
new file mode 100644
index 0000000..3f39b51
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/RecoverTest.java
@@ -0,0 +1,113 @@
+package accord.coordinate;
+
+import accord.local.Instance;
+import accord.local.Node;
+import accord.impl.mock.MockCluster;
+import accord.impl.IntKey;
+import accord.api.Key;
+import accord.messages.Timeout;
+import accord.local.*;
+import accord.txn.Keys;
+import accord.messages.PreAccept;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+
+import static accord.NetworkFilter.*;
+import static accord.Utils.id;
+import static accord.Utils.ids;
+import static accord.Utils.writeTxn;
+
+public class RecoverTest
+{
+
+ private static Instance getInstance(Node node, Key key)
+ {
+ return node.local(key).orElseThrow();
+ }
+
+ private static Command getCommand(Node node, Key key, TxnId txnId)
+ {
+ Instance instance = getInstance(node, key);
+ Assertions.assertTrue(instance.hasCommand(txnId));
+ return instance.command(txnId);
+ }
+
+ private static void assertStatus(Node node, Key key, TxnId txnId, Status status)
+ {
+ Command command = getCommand(node, key, txnId);
+
+ Assertions.assertNotNull(command);
+ Assertions.assertEquals(status, command.status());
+ }
+
+ private static void assertMissing(Node node, Key key, TxnId txnId)
+ {
+ Instance instance = getInstance(node, key);
+ Assertions.assertFalse(instance.hasCommand(txnId));
+ }
+
+ private static void assertTimeout(CompletionStage<?> f)
+ {
+ try
+ {
+ f.toCompletableFuture().get();
+ Assertions.fail("expected timeout");
+ }
+ catch (ExecutionException e)
+ {
+ // timeout expected
+ Assertions.assertEquals(Timeout.class, e.getCause().getClass());
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ // TODO
+ void conflictTest() throws Throwable
+ {
+ Key key = IntKey.key(10);
+ MockCluster cluster = MockCluster.builder().nodes(9).replication(9).build();
+ cluster.networkFilter.isolate(ids(7, 9));
+ cluster.networkFilter.addFilter(anyId(), isId(ids(5, 6)), notMessageType(PreAccept.class));
+
+ TxnId txnId1 = new TxnId(100, 0, id(100));
+ Txn txn1 = writeTxn(Keys.of(key));
+ assertTimeout(Coordinate.execute(cluster.get(1), txnId1, txn1));
+
+ TxnId txnId2 = new TxnId(50, 0, id(101));
+ Txn txn2 = writeTxn(Keys.of(key));
+ cluster.networkFilter.clear();
+ cluster.networkFilter.isolate(ids(1, 7));
+ assertTimeout(Coordinate.execute(cluster.get(9), txnId2, txn2));
+
+ cluster.nodes(ids(1, 4)).forEach(n -> assertStatus(n, key, txnId1, Status.Accepted));
+ cluster.nodes(ids(5, 6)).forEach(n -> assertStatus(n, key, txnId1, Status.PreAccepted));
+ cluster.nodes(ids(7, 9)).forEach(n -> assertMissing(n, key, txnId1));
+
+ cluster.nodes(ids(1, 7)).forEach(n -> assertMissing(n, key, txnId2));
+ cluster.nodes(ids(8, 9)).forEach(n -> assertStatus(n, key, txnId2, Status.PreAccepted));
+
+ //
+ cluster.networkFilter.clear();
+ cluster.networkFilter.isolate(ids(1, 4));
+ Coordinate.recover(cluster.get(8), txnId2, txn2).toCompletableFuture().get();
+
+ List<Node> nodes = cluster.nodes(ids(5, 9));
+ Assertions.assertTrue(txnId2.compareTo(txnId1) < 0);
+ nodes.forEach(n -> assertStatus(n, key, txnId2, Status.Applied));
+ nodes.forEach(n -> {
+ assertStatus(n, key, txnId2, Status.Applied);
+ Command command = getCommand(n, key, txnId2);
+ Assertions.assertEquals(txnId1, command.executeAt());
+ });
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
new file mode 100644
index 0000000..79460e1
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -0,0 +1,87 @@
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.CRC32C;
+
+import accord.api.Key;
+import accord.utils.KeyRange;
+import accord.txn.Keys;
+
+public class IntHashKey implements Key<IntHashKey>
+{
+ public final int key;
+ public final int hash;
+
+ private IntHashKey(int key)
+ {
+ this.key = key;
+ this.hash = hash(key);
+ }
+
+ private IntHashKey(int key, int hash)
+ {
+ assert hash != hash(key);
+ this.key = key;
+ this.hash = hash;
+ }
+
+ @Override
+ public int compareTo(IntHashKey that)
+ {
+ return Integer.compare(this.hash, that.hash);
+ }
+
+ public static IntHashKey key(int k)
+ {
+ return new IntHashKey(k);
+ }
+
+ public static Keys keys(int k0, int... kn)
+ {
+ Key[] keys = new Key[kn.length + 1];
+ keys[0] = key(k0);
+ for (int i=0; i<kn.length; i++)
+ keys[i + 1] = key(kn[i]);
+
+ return new Keys(keys);
+ }
+
+ public static KeyRange<IntHashKey> range(int start, int end)
+ {
+ return KeyRange.of(key(start), key(end));
+ }
+
+ public static KeyRange<IntHashKey>[] ranges(int count)
+ {
+ List<KeyRange<IntHashKey>> result = new ArrayList<>();
+ long delta = (Integer.MAX_VALUE - (long)Integer.MIN_VALUE) / count;
+ long start = Integer.MIN_VALUE;
+ IntHashKey prev = new IntHashKey(Integer.MIN_VALUE, (int)start);
+ for (int i = 1 ; i < count ; ++i)
+ {
+ IntHashKey next = new IntHashKey(Integer.MIN_VALUE, (int)Math.min(Integer.MAX_VALUE, start + i * delta));
+ result.add(KeyRange.of(prev, next));
+ prev = next;
+ }
+ result.add(KeyRange.of(prev, new IntHashKey(Integer.MIN_VALUE, Integer.MAX_VALUE)));
+ return result.toArray(KeyRange[]::new);
+ }
+
+ @Override
+ public String toString()
+ {
+ if (key == Integer.MIN_VALUE && hash(key) != hash) return "#" + hash;
+ return Integer.toString(key);
+ }
+
+ private static int hash(int key)
+ {
+ CRC32C crc32c = new CRC32C();
+ crc32c.update(key);
+ crc32c.update(key >> 8);
+ crc32c.update(key >> 16);
+ crc32c.update(key >> 24);
+ return (int)crc32c.getValue();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
new file mode 100644
index 0000000..c24472b
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -0,0 +1,66 @@
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import accord.api.Key;
+import accord.utils.KeyRange;
+import accord.txn.Keys;
+
+public class IntKey implements Key<IntKey>
+{
+ public final int key;
+
+ private IntKey(int key)
+ {
+ this.key = key;
+ }
+
+ @Override
+ public int compareTo(IntKey that)
+ {
+ return Integer.compare(this.key, that.key);
+ }
+
+ public static IntKey key(int k)
+ {
+ return new IntKey(k);
+ }
+
+ public static Keys keys(int k0, int... kn)
+ {
+ Key[] keys = new Key[kn.length + 1];
+ keys[0] = key(k0);
+ for (int i=0; i<kn.length; i++)
+ keys[i + 1] = key(kn[i]);
+
+ return new Keys(keys);
+ }
+
+ public static KeyRange<IntKey> range(int start, int end)
+ {
+ return KeyRange.of(key(start), key(end));
+ }
+
+ public static KeyRange<IntKey>[] ranges(int count)
+ {
+ List<KeyRange<IntKey>> result = new ArrayList<>();
+ long delta = (Integer.MAX_VALUE - (long)Integer.MIN_VALUE) / count;
+ long start = Integer.MIN_VALUE;
+ IntKey prev = new IntKey((int)start);
+ for (int i = 1 ; i < count ; ++i)
+ {
+ IntKey next = new IntKey((int)Math.min(Integer.MAX_VALUE, start + i * delta));
+ result.add(KeyRange.of(prev, next));
+ prev = next;
+ }
+ result.add(KeyRange.of(prev, IntKey.key(Integer.MAX_VALUE)));
+ return result.toArray(KeyRange[]::new);
+ }
+
+ @Override
+ public String toString()
+ {
+ return Integer.toString(key);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/test/java/accord/impl/TestAgent.java
new file mode 100644
index 0000000..22bd120
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -0,0 +1,24 @@
+package accord.impl;
+
+import accord.local.Node;
+import accord.api.Agent;
+import accord.api.Result;
+import accord.local.Command;
+import accord.txn.Timestamp;
+
+public class TestAgent implements Agent
+{
+ @Override
+ public void onRecover(Node node, Result success, Throwable fail)
+ {
+ // do nothing, intended for use by implementations to decide what to do about recovered transactions
+ // specifically if and how they should inform clients of the result
+ // e.g. in Maelstrom we send the full result directly, in other impls we may simply acknowledge success via the coordinator
+ }
+
+ @Override
+ public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next)
+ {
+ throw new AssertionError();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/TopologyFactory.java b/accord-core/src/test/java/accord/impl/TopologyFactory.java
new file mode 100644
index 0000000..a8dad3b
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/TopologyFactory.java
@@ -0,0 +1,58 @@
+package accord.impl;
+
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.api.Key;
+import accord.utils.KeyRange;
+import accord.topology.Shard;
+import accord.topology.Shards;
+import accord.utils.WrapAroundList;
+import accord.utils.WrapAroundSet;
+
+import java.util.*;
+
+public class TopologyFactory<K extends Key<K>>
+{
+ public final int rf;
+ final KeyRange<K>[] ranges;
+
+ public TopologyFactory(int rf, KeyRange<K>... ranges)
+ {
+ this.rf = rf;
+ this.ranges = ranges;
+ }
+
+ public Shards toShards(Node.Id[] cluster)
+ {
+ final Map<Node.Id, Integer> lookup = new HashMap<>();
+ for (int i = 0 ; i < cluster.length ; ++i)
+ lookup.put(cluster[i], i);
+
+ List<WrapAroundList<Id>> electorates = new ArrayList<>();
+ List<Set<Node.Id>> fastPathElectorates = new ArrayList<>();
+
+ for (int i = 0 ; i < cluster.length + rf - 1 ; ++i)
+ {
+ WrapAroundList<Node.Id> electorate = new WrapAroundList<>(cluster, i % cluster.length, (i + rf) % cluster.length);
+ Set<Node.Id> fastPathElectorate = new WrapAroundSet<>(lookup, electorate);
+ electorates.add(electorate);
+ fastPathElectorates.add(fastPathElectorate);
+ }
+
+ final List<Shard> shards = new ArrayList<>();
+ for (int i = 0 ; i < ranges.length ; ++i)
+ shards.add(new Shard(ranges[i].start, ranges[i].end, electorates.get(i % electorates.size()), fastPathElectorates.get(i % fastPathElectorates.size())));
+ return new Shards(shards.toArray(Shard[]::new));
+ }
+
+ public Shards toShards(List<Node.Id> cluster)
+ {
+ return toShards(cluster.toArray(Node.Id[]::new));
+ }
+
+ public static <K extends Key<K>> Shards toShards(List<Node.Id> cluster, int rf, KeyRange<K>... ranges)
+ {
+ return new TopologyFactory<>(rf, ranges).toShards(cluster);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java
new file mode 100644
index 0000000..5ffafe9
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -0,0 +1,165 @@
+package accord.impl.basic;
+
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.api.Scheduler;
+import accord.impl.TopologyFactory;
+import accord.impl.list.ListAgent;
+import accord.impl.list.ListStore;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.topology.Shards;
+
+public class Cluster implements Scheduler
+{
+
+ final Function<Id, Node> lookup;
+ final PendingQueue pending;
+ final Consumer<Packet> responseSink;
+ final Map<Id, NodeSink> sinks = new HashMap<>();
+ final PrintWriter err;
+ int clock;
+ int recurring;
+ Set<Id> partitionSet;
+
+ public Cluster(Supplier<PendingQueue> queueSupplier, Function<Id, Node> lookup, Consumer<Packet> responseSink, OutputStream stderr)
+ {
+ this.pending = queueSupplier.get();
+ this.lookup = lookup;
+ this.responseSink = responseSink;
+ this.err = new PrintWriter(stderr);
+ this.partitionSet = new HashSet<>();
+ }
+
+ NodeSink create(Id self, Random random)
+ {
+ NodeSink sink = new NodeSink(self, lookup, this, random);
+ sinks.put(self, sink);
+ return sink;
+ }
+
+ private void add(Packet packet)
+ {
+ err.println(clock++ + " SEND " + packet);
+ err.flush();
+ if (lookup.apply(packet.dst) == null) responseSink.accept(packet);
+ else pending.add(packet);
+ }
+
+ void add(Id from, Id to, long messageId, Request send)
+ {
+ add(new Packet(from, to, messageId, send));
+ }
+
+ void add(Id from, Id to, long replyId, Reply send)
+ {
+ add(new Packet(from, to, replyId, send));
+ }
+
+ public boolean processPending()
+ {
+ if (pending.size() == recurring)
+ return false;
+
+ Object next = pending.poll();
+ if (next == null)
+ return false;
+
+ if (next instanceof Packet)
+ {
+ Packet deliver = (Packet) next;
+ Node on = lookup.apply(deliver.dst);
+ // Drop the message if it goes across the partition
+ boolean drop = ((Packet) next).src.id >= 0 &&
+ !(partitionSet.contains(deliver.src) && partitionSet.contains(deliver.dst)
+ || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dst));
+ if (drop)
+ {
+ err.println(clock++ + " DROP " + deliver);
+ err.flush();
+ return true;
+ }
+ err.println(clock++ + " RECV " + deliver);
+ err.flush();
+ if (deliver.message instanceof Reply)
+ {
+ Reply reply = (Reply) deliver.message;
+ Callback callback = reply.isFinal() ? sinks.get(deliver.dst).callbacks.remove(deliver.replyId)
+ : sinks.get(deliver.dst).callbacks.get(deliver.replyId);
+ if (callback != null)
+ on.scheduler().now(() -> callback.onSuccess(deliver.src, reply));
+ }
+ else on.receive((Request)deliver.message, deliver.src, deliver.requestId);
+ }
+ else
+ {
+ ((Runnable) next).run();
+ }
+ return true;
+ }
+
+ @Override
+ public Scheduled recurring(Runnable run, long delay, TimeUnit units)
+ {
+ RecurringPendingRunnable result = new RecurringPendingRunnable(pending, run, true, delay, units);
+ ++recurring;
+ pending.add(result, delay, units);
+ return result;
+ }
+
+ @Override
+ public Scheduled once(Runnable run, long delay, TimeUnit units)
+ {
+ RecurringPendingRunnable result = new RecurringPendingRunnable(null, run, false, delay, units);
+ pending.add(result, delay, units);
+ return result;
+ }
+
+ @Override
+ public void now(Runnable run)
+ {
+ run.run();
+ }
+
+ public static void run(Id[] nodes, Supplier<PendingQueue> queueSupplier, Consumer<Packet> responseSink, Supplier<Random> randomSupplier, Supplier<LongSupplier> nowSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, OutputStream stderr)
+ {
+ Shards shards = topologyFactory.toShards(nodes);
+ Map<Id, Node> lookup = new HashMap<>();
+ Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
+ for (Id node : nodes)
+ lookup.put(node, new Node(node, shards, shards.forNode(node), sinks.create(node, randomSupplier.get()),
+ randomSupplier.get(), nowSupplier.get(), ListStore::new, ListAgent.INSTANCE, sinks));
+
+ List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
+ sinks.recurring(() ->
+ {
+ Collections.shuffle(nodesList, randomSupplier.get());
+ int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
+ sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
+ }, 5L, TimeUnit.SECONDS);
+
+ Packet next;
+ while ((next = in.get()) != null)
+ sinks.add(next);
+
+ while (sinks.processPending());
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
new file mode 100644
index 0000000..0482567
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -0,0 +1,60 @@
+package accord.impl.basic;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.api.MessageSink;
+import accord.messages.Timeout;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+
+import static accord.impl.basic.Packet.SENTINEL_MESSAGE_ID;
+
+public class NodeSink implements MessageSink
+{
+ final Id self;
+ final Function<Id, Node> lookup;
+ final Cluster parent;
+ final Random random;
+
+ int nextMessageId = 0;
+ Map<Long, Callback> callbacks = new LinkedHashMap<>();
+
+ public NodeSink(Id self, Function<Id, Node> lookup, Cluster parent, Random random)
+ {
+ this.self = self;
+ this.lookup = lookup;
+ this.parent = parent;
+ this.random = random;
+ }
+
+ @Override
+ public synchronized void send(Id to, Request send)
+ {
+ parent.add(self, to, SENTINEL_MESSAGE_ID, send);
+ }
+
+ @Override
+ public void send(Id to, Request send, Callback callback)
+ {
+ long messageId = nextMessageId++;
+ callbacks.put(messageId, callback);
+ parent.add(self, to, messageId, send);
+ parent.pending.add((PendingRunnable) () -> {
+ if (callback == callbacks.remove(messageId))
+ callback.onFailure(to, new Timeout());
+ }, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void reply(Id replyToNode, long replyToMessage, Reply reply)
+ {
+ parent.add(self, replyToNode, replyToMessage, reply);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Packet.java b/accord-core/src/test/java/accord/impl/basic/Packet.java
new file mode 100644
index 0000000..3b56159
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/Packet.java
@@ -0,0 +1,45 @@
+package accord.impl.basic;
+
+import accord.local.Node.Id;
+import accord.messages.Message;
+import accord.messages.Reply;
+import accord.messages.Request;
+
+public class Packet implements Pending
+{
+ static final int SENTINEL_MESSAGE_ID = Integer.MIN_VALUE;
+
+ public final Id src;
+ public final Id dst;
+ public final long requestId; // if message is Reply, this is the id of the message we are replying to
+ public final long replyId; // if message is Reply, this is the id of the message we are replying to
+ public final Message message;
+
+ public Packet(Id src, Id dst, long requestId, Request request)
+ {
+ this.src = src;
+ this.dst = dst;
+ this.requestId = requestId;
+ this.replyId = SENTINEL_MESSAGE_ID;
+ this.message = request;
+ }
+
+ public Packet(Id src, Id dst, long replyId, Reply reply)
+ {
+ this.src = src;
+ this.dst = dst;
+ this.requestId = SENTINEL_MESSAGE_ID;
+ this.replyId = replyId;
+ this.message = reply;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{from:" + src + ", "
+ + "to:" + dst + ", "
+ + (requestId != SENTINEL_MESSAGE_ID ? "id:" + requestId + ", " : "")
+ + (replyId != SENTINEL_MESSAGE_ID ? "replyTo:" + replyId + ", " : "")
+ + "body:" + message + "}";
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/Pending.java b/accord-core/src/test/java/accord/impl/basic/Pending.java
new file mode 100644
index 0000000..b4f491d
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/Pending.java
@@ -0,0 +1,5 @@
+package accord.impl.basic;
+
+public interface Pending
+{
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/PendingQueue.java b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
new file mode 100644
index 0000000..64b21a1
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/PendingQueue.java
@@ -0,0 +1,11 @@
+package accord.impl.basic;
+
+import java.util.concurrent.TimeUnit;
+
+public interface PendingQueue
+{
+ void add(Pending item);
+ void add(Pending item, long delay, TimeUnit units);
+ Pending poll();
+ int size();
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/PendingRunnable.java b/accord-core/src/test/java/accord/impl/basic/PendingRunnable.java
new file mode 100644
index 0000000..b8392cc
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/PendingRunnable.java
@@ -0,0 +1,5 @@
+package accord.impl.basic;
+
+public interface PendingRunnable extends Pending, Runnable
+{
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
new file mode 100644
index 0000000..e9b4012
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java
@@ -0,0 +1,85 @@
+package accord.impl.basic;
+
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class RandomDelayQueue<T> implements PendingQueue
+{
+ public static class Factory implements Supplier<PendingQueue>
+ {
+ final Random seeds;
+
+ public Factory(Random seeds)
+ {
+ this.seeds = seeds;
+ }
+
+ @Override
+ public PendingQueue get()
+ {
+ return new RandomDelayQueue<>(new Random(seeds.nextLong()));
+ }
+ }
+
+ static class Item implements Comparable<Item>
+ {
+ final long time;
+ final int seq;
+ final Pending item;
+
+ Item(long time, int seq, Pending item)
+ {
+ this.time = time;
+ this.seq = seq;
+ this.item = item;
+ }
+
+ @Override
+ public int compareTo(Item that)
+ {
+ int c = Long.compare(this.time, that.time);
+ if (c == 0) c = Integer.compare(this.seq, that.seq);
+ return c;
+ }
+ }
+
+ final PriorityQueue<Item> queue = new PriorityQueue<>();
+ final Random random;
+ long now;
+ int seq;
+
+ RandomDelayQueue(Random random)
+ {
+ this.random = random;
+ }
+
+ @Override
+ public void add(Pending item)
+ {
+ add(item, random.nextInt(500), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void add(Pending item, long delay, TimeUnit units)
+ {
+ queue.add(new Item(now + units.toMillis(delay), seq++, item));
+ }
+
+ @Override
+ public Pending poll()
+ {
+ Item item = queue.poll();
+ if (item == null)
+ return null;
+ now = item.time;
+ return item.item;
+ }
+
+ @Override
+ public int size()
+ {
+ return queue.size();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java b/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java
new file mode 100644
index 0000000..06e07cd
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java
@@ -0,0 +1,38 @@
+package accord.impl.basic;
+
+import java.util.concurrent.TimeUnit;
+
+import accord.api.Scheduler.Scheduled;
+
+class RecurringPendingRunnable implements PendingRunnable, Scheduled
+{
+ final PendingQueue requeue;
+ final long delay;
+ final TimeUnit units;
+ Runnable run;
+
+ RecurringPendingRunnable(PendingQueue requeue, Runnable run, boolean recurring, long delay, TimeUnit units)
+ {
+ this.requeue = requeue;
+ this.run = run;
+ this.delay = delay;
+ this.units = units;
+ }
+
+ @Override
+ public void run()
+ {
+ if (run != null)
+ {
+ run.run();
+ if (requeue != null)
+ requeue.add(this, delay, units);
+ }
+ }
+
+ @Override
+ public void cancel()
+ {
+ run = null;
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java b/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java
new file mode 100644
index 0000000..75fd4d2
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/UniformRandomQueue.java
@@ -0,0 +1,80 @@
+package accord.impl.basic;
+
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class UniformRandomQueue<T> implements PendingQueue
+{
+ public static class Factory implements Supplier<PendingQueue>
+ {
+ final Random seeds;
+
+ public Factory(Random seeds)
+ {
+ this.seeds = seeds;
+ }
+
+ @Override
+ public PendingQueue get()
+ {
+ return new UniformRandomQueue<>(new Random(seeds.nextLong()));
+ }
+ }
+
+ static class Item implements Comparable<Item>
+ {
+ final double priority;
+ final Pending value;
+
+ Item(double priority, Pending value)
+ {
+ this.priority = priority;
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(Item that)
+ {
+ return Double.compare(this.priority, that.priority);
+ }
+ }
+
+ final PriorityQueue<Item> queue = new PriorityQueue<>();
+ final Random random;
+
+ public UniformRandomQueue(Random random)
+ {
+ this.random = random;
+ }
+
+ @Override
+ public int size()
+ {
+ return queue.size();
+ }
+
+ @Override
+ public void add(Pending item)
+ {
+ queue.add(new Item(random.nextDouble(), item));
+ }
+
+ @Override
+ public void add(Pending item, long delay, TimeUnit units)
+ {
+ queue.add(new Item(random.nextDouble(), item));
+ }
+
+ @Override
+ public Pending poll()
+ {
+ return unwrap(queue.poll());
+ }
+
+ private static Pending unwrap(Item e)
+ {
+ return e == null ? null : e.value;
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java
new file mode 100644
index 0000000..e9ce693
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -0,0 +1,28 @@
+package accord.impl.list;
+
+import accord.local.Node;
+import accord.api.Agent;
+import accord.api.Result;
+import accord.local.Command;
+import accord.txn.Timestamp;
+
+public class ListAgent implements Agent
+{
+ public static final ListAgent INSTANCE = new ListAgent();
+
+ @Override
+ public void onRecover(Node node, Result success, Throwable fail)
+ {
+ if (success != null)
+ {
+ ListResult result = (ListResult) success;
+ node.reply(result.client, result.requestId, result);
+ }
+ }
+
+ @Override
+ public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next)
+ {
+ throw new AssertionError();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListData.java b/accord-core/src/test/java/accord/impl/list/ListData.java
new file mode 100644
index 0000000..35ef8d5
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListData.java
@@ -0,0 +1,16 @@
+package accord.impl.list;
+
+import java.util.TreeMap;
+
+import accord.api.Data;
+import accord.api.Key;
+
+public class ListData extends TreeMap<Key, int[]> implements Data
+{
+ @Override
+ public Data merge(Data data)
+ {
+ this.putAll(((ListData)data));
+ return this;
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java b/accord-core/src/test/java/accord/impl/list/ListQuery.java
new file mode 100644
index 0000000..f7d2c31
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -0,0 +1,35 @@
+package accord.impl.list;
+
+import java.util.Map;
+
+import accord.local.Node.Id;
+import accord.api.Data;
+import accord.api.Key;
+import accord.api.Query;
+import accord.api.Result;
+import accord.txn.Keys;
+
+public class ListQuery implements Query
+{
+ final Id client;
+ final long requestId;
+ final Keys read;
+ final ListUpdate update; // we have to return the writes as well for some reason
+
+ public ListQuery(Id client, long requestId, Keys read, ListUpdate update)
+ {
+ this.client = client;
+ this.requestId = requestId;
+ this.read = read;
+ this.update = update;
+ }
+
+ @Override
+ public Result compute(Data data)
+ {
+ int[][] values = new int[read.size()][];
+ for (Map.Entry<Key, int[]> e : ((ListData)data).entrySet())
+ values[read.indexOf(e.getKey())] = e.getValue();
+ return new ListResult(client, requestId, read, values, update);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
new file mode 100644
index 0000000..3bed2dd
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -0,0 +1,33 @@
+package accord.impl.list;
+
+import accord.api.Data;
+import accord.api.Key;
+import accord.api.Store;
+import accord.api.Read;
+import accord.txn.Keys;
+
+public class ListRead implements Read
+{
+ public final Keys keys;
+
+ public ListRead(Keys keys)
+ {
+ this.keys = keys;
+ }
+
+ @Override
+ public Data read(Key start, Key end, Store store)
+ {
+ ListStore s = (ListStore)store;
+ ListData result = new ListData();
+ for (int i = keys.ceilIndex(start), limit = keys.ceilIndex(end) ; i < limit ; ++i)
+ result.put(keys.get(i), s.get(keys.get(i)));
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return keys.toString();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
new file mode 100644
index 0000000..c79f8e7
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -0,0 +1,31 @@
+package accord.impl.list;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.txn.Txn;
+import accord.messages.Request;
+
+public class ListRequest implements Request
+{
+ public final Txn txn;
+
+ public ListRequest(Txn txn)
+ {
+ this.txn = txn;
+ }
+
+ public void process(Node node, Id client, long messageId)
+ {
+ node.coordinate(txn).handle((success, fail) -> {
+ if (success != null)
+ node.reply(client, messageId, (ListResult) success);
+ return null;
+ });
+ }
+
+ @Override
+ public String toString()
+ {
+ return txn.toString();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/test/java/accord/impl/list/ListResult.java
new file mode 100644
index 0000000..bcb9030
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -0,0 +1,40 @@
+package accord.impl.list;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import accord.local.Node.Id;
+import accord.api.Result;
+import accord.txn.Keys;
+import accord.messages.Reply;
+
+public class ListResult implements Result, Reply
+{
+ public final Id client;
+ public final long requestId;
+ public final Keys keys;
+ public final int[][] read;
+ public final ListUpdate update;
+
+ public ListResult(Id client, long requestId, Keys keys, int[][] read, ListUpdate update)
+ {
+ this.client = client;
+ this.requestId = requestId;
+ this.keys = keys;
+ this.read = read;
+ this.update = update;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{client:" + client + ", "
+ + "requestId:" + requestId + ", "
+ + "reads:" + IntStream.range(0, keys.size())
+ .filter(i -> read[i] != null)
+ .mapToObj(i -> keys.get(i) + ":" + Arrays.toString(read[i]))
+ .collect(Collectors.joining(", ", "{", "}")) + ", "
+ + "writes:" + update + "}";
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java
new file mode 100644
index 0000000..ee6b05e
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -0,0 +1,20 @@
+package accord.impl.list;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import accord.api.Key;
+import accord.api.Store;
+import accord.utils.Timestamped;
+
+public class ListStore implements Store
+{
+ static final int[] EMPTY = new int[0];
+ final Map<Key, Timestamped<int[]>> data = new ConcurrentHashMap<>();
+
+ public int[] get(Key key)
+ {
+ Timestamped<int[]> v = data.get(key);
+ return v == null ? EMPTY : v.data;
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListUpdate.java b/accord-core/src/test/java/accord/impl/list/ListUpdate.java
new file mode 100644
index 0000000..b5db73e
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListUpdate.java
@@ -0,0 +1,29 @@
+package accord.impl.list;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import accord.api.Key;
+import accord.api.Data;
+import accord.api.Update;
+
+public class ListUpdate extends TreeMap<Key, Integer> implements Update
+{
+ @Override
+ public ListWrite apply(Data read)
+ {
+ ListWrite write = new ListWrite();
+ Map<Key, int[]> data = (ListData)read;
+ for (Map.Entry<Key, Integer> e : entrySet())
+ write.put(e.getKey(), append(data.get(e.getKey()), e.getValue()));
+ return write;
+ }
+
+ private static int[] append(int[] to, int append)
+ {
+ to = Arrays.copyOf(to, to.length + 1);
+ to[to.length - 1] = append;
+ return to;
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
new file mode 100644
index 0000000..bc02241
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -0,0 +1,21 @@
+package accord.impl.list;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import accord.api.Key;
+import accord.api.Store;
+import accord.api.Write;
+import accord.txn.Timestamp;
+import accord.utils.Timestamped;
+
+public class ListWrite extends TreeMap<Key, int[]> implements Write
+{
+ @Override
+ public void apply(Key start, Key end, Timestamp executeAt, Store store)
+ {
+ ListStore s = (ListStore) store;
+ for (Map.Entry<Key, int[]> e : subMap(start, true, end, false).entrySet())
+ s.data.merge(e.getKey(), new Timestamped<>(executeAt, e.getValue()), Timestamped::merge);
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
new file mode 100644
index 0000000..d407531
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -0,0 +1,269 @@
+package accord.impl.mock;
+
+import accord.NetworkFilter;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.messages.*;
+import accord.utils.ThreadPoolScheduler;
+import accord.txn.TxnId;
+import accord.utils.KeyRange;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.impl.IntKey;
+import accord.impl.TestAgent;
+import accord.topology.Shards;
+import accord.topology.Topology;
+import accord.impl.TopologyFactory;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+
+import static accord.Utils.id;
+
+public class MockCluster implements Network
+{
+ private static final Logger logger = LoggerFactory.getLogger(MockCluster.class);
+
+ private final Random random;
+ private final Config config;
+ private final Map<Id, Node> nodes = new ConcurrentHashMap<>();
+ private int nextNodeId = 1;
+ public NetworkFilter networkFilter = new NetworkFilter();
+
+ private long nextMessageId = 0;
+ Map<Long, Callback> callbacks = new ConcurrentHashMap<>();
+
+ private MockCluster(Builder builder)
+ {
+ this.config = new Config(builder);
+ this.random = new Random(config.seed);
+
+ init();
+ }
+
+ private synchronized Id nextNodeId()
+ {
+ return id(nextNodeId++);
+ }
+
+ private synchronized long nextMessageId()
+ {
+ return nextMessageId++;
+ }
+
+ private long now()
+ {
+ return System.currentTimeMillis();
+ }
+
+ private Node createNode(Id id, Shards local, Topology topology)
+ {
+ MockStore store = new MockStore();
+ return new Node(id,
+ topology,
+ local,
+ new SimpleMessageSink(id, this),
+ new Random(random.nextLong()),
+ this::now,
+ () -> store,
+ new TestAgent(),
+ new ThreadPoolScheduler());
+ }
+
+ private void init()
+ {
+ Preconditions.checkArgument(config.initialNodes == config.replication, "TODO");
+ List<Id> ids = new ArrayList<>(config.initialNodes);
+ for (int i=0; i<config.initialNodes; i++)
+ {
+ Id nextId = nextNodeId();
+ ids.add(nextId);
+ }
+ TopologyFactory<IntKey> topologyFactory = new TopologyFactory<>(config.replication, KeyRange.of(IntKey.key(0), IntKey.key(config.maxKey)));
+ Shards topology = topologyFactory.toShards(ids);
+ for (int i=0; i<config.initialNodes; i++)
+ {
+ Id id = ids.get(i);
+ Node node = createNode(id, topology.forNode(id), topology);
+ nodes.put(id, node);
+ }
+ }
+
+ @Override
+ public void send(Id from, Id to, Request request, Callback callback)
+ {
+ Node node = nodes.get(to);
+ if (node == null)
+ {
+ logger.info("dropping message to unknown node {}: {} from {}", to, request, from);
+ return;
+ }
+
+ if (networkFilter.shouldDiscard(from, to, request))
+ {
+ // TODO: more flexible timeouts
+ if (callback != null)
+ callback.onFailure(to, new Timeout());
+ logger.info("discarding filtered message from {} to {}: {}", from, to, request);
+ return;
+ }
+
+ long messageId = nextMessageId();
+ if (callback != null)
+ {
+ callbacks.put(messageId, callback);
+ }
+
+ logger.info("processing message from {} to {}: {}", from, to, request);
+ node.receive(request, from, messageId);
+ }
+
+ @Override
+ public void reply(Id from, Id replyingToNode, long replyingToMessage, Reply reply)
+ {
+ Node node = nodes.get(replyingToNode);
+ if (node == null)
+ {
+ logger.info("dropping reply to unknown node {}: {} from {}", replyingToNode, reply, from);
+ return;
+ }
+
+ Callback callback = callbacks.remove(replyingToMessage);
+
+ if (networkFilter.shouldDiscard(from, replyingToNode, reply))
+ {
+ logger.info("discarding filtered reply from {} to {}: {}", from, reply, reply);
+ if (callback != null)
+ callback.onFailure(from, new Timeout());
+ return;
+ }
+
+ if (callback == null)
+ {
+ logger.warn("Callback not found for reply from {} to {}: {} (msgid: {})", from, replyingToNode, reply, replyingToMessage);
+ return;
+ }
+
+ logger.info("processing reply from {} to {}: {}", from, replyingToNode, reply);
+ node.scheduler().now(() -> callback.onSuccess(from, reply));
+ }
+
+ public Node get(Id id)
+ {
+ Node node = nodes.get(id);
+ if (node == null)
+ throw new NoSuchElementException("No node exists with id " + id);
+ return node;
+ }
+
+ public Node get(int i)
+ {
+ return get(id(i));
+ }
+
+ public List<Node> nodes(Iterable<Id> ids)
+ {
+ List<Node> rlist = new ArrayList<>();
+ for (Id id : ids)
+ rlist.add(get(id));
+ return rlist;
+ }
+
+ public static class Config
+ {
+ private final long seed;
+ private final int initialNodes;
+ private final int replication;
+ private final int maxKey;
+
+ private Config(MockCluster.Builder builder)
+ {
+ this.seed = builder.seed;
+ this.initialNodes = builder.initialNodes;
+ this.replication = builder.replication;
+ this.maxKey = builder.maxKey;
+ }
+ }
+
+ public static class Builder
+ {
+ private long seed = 0;
+ private int initialNodes = 3;
+ private int replication = 3;
+ private int maxKey = 10000;
+
+ public Builder seed(long seed)
+ {
+ this.seed = seed;
+ return this;
+ }
+
+ public Builder nodes(int initialNodes)
+ {
+ this.initialNodes = initialNodes;
+ return this;
+ }
+
+ public Builder replication(int replication)
+ {
+ this.replication = replication;
+ return this;
+ }
+
+ public Builder maxKey(int max)
+ {
+ this.maxKey = max;
+ return this;
+ }
+
+ public MockCluster build()
+ {
+ Preconditions.checkArgument(initialNodes > 0);
+ Preconditions.checkArgument(replication > 0);
+ Preconditions.checkArgument(maxKey >= 0);
+ return new MockCluster(this);
+ }
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static class Clock implements LongSupplier
+ {
+ private final AtomicLong now;
+
+ public Clock(long now)
+ {
+ this.now = new AtomicLong(now);
+ }
+
+ public long increment(long by)
+ {
+ return now.addAndGet(by);
+ }
+
+ public long now()
+ {
+ return now.get();
+ }
+
+ @Override
+ public long getAsLong()
+ {
+ return now();
+ }
+
+ public TxnId idForNode(Id id)
+ {
+ return new TxnId(now.get(), 0, id);
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
new file mode 100644
index 0000000..3a27958
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -0,0 +1,26 @@
+package accord.impl.mock;
+
+import accord.api.Data;
+import accord.api.Query;
+import accord.api.Read;
+import accord.api.Result;
+import accord.api.Store;
+import accord.api.Update;
+import accord.api.Write;
+
+public class MockStore implements Store
+{
+ public static final Data DATA = new Data() {
+ @Override
+ public Data merge(Data data)
+ {
+ return DATA;
+ }
+ };
+
+ public static final Result RESULT = new Result() {};
+ public static final Read READ = (start, end, store) -> DATA;
+ public static final Query QUERY = data -> RESULT;
+ public static final Write WRITE = (start, end, executeAt, store) -> {};
+ public static final Update UPDATE = data -> WRITE;
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/Network.java b/accord-core/src/test/java/accord/impl/mock/Network.java
new file mode 100644
index 0000000..5eda959
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/mock/Network.java
@@ -0,0 +1,27 @@
+package accord.impl.mock;
+
+import accord.local.Node.Id;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+
+public interface Network
+{
+ void send(Id from, Id to, Request request, Callback callback);
+ void reply(Id from, Id replyingToNode, long replyingToMessage, Reply reply);
+
+ Network BLACK_HOLE = new Network()
+ {
+ @Override
+ public void send(Id from, Id to, Request request, Callback callback)
+ {
+ // TODO: log
+ }
+
+ @Override
+ public void reply(Id from, Id replyingToNode, long replyingToMessage, Reply reply)
+ {
+ // TODO: log
+ }
+ };
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java b/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
new file mode 100644
index 0000000..55cfa40
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/mock/RecordingMessageSink.java
@@ -0,0 +1,69 @@
+package accord.impl.mock;
+
+import accord.local.Node;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RecordingMessageSink extends SimpleMessageSink
+{
+ public static class Envelope<T>
+ {
+ public final Node.Id to;
+ public final T payload;
+ public final Callback callback;
+
+ public Envelope(Node.Id to, T payload, Callback callback)
+ {
+ this.to = to;
+ this.payload = payload;
+ this.callback = callback;
+ }
+ }
+
+ public final List<Envelope<Request>> requests = new ArrayList<>();
+ public final List<Envelope<Reply>> responses = new ArrayList<>();
+
+ public RecordingMessageSink(Node.Id node, Network network)
+ {
+ super(node, network);
+ }
+
+ @Override
+ public void send(Node.Id to, Request request)
+ {
+ requests.add(new Envelope<>(to, request, null));
+ super.send(to, request);
+ }
+
+ @Override
+ public void send(Node.Id to, Request request, Callback callback)
+ {
+ requests.add(new Envelope<>(to, request, callback));
+ super.send(to, request, callback);
+ }
+
+ @Override
+ public void reply(Node.Id replyingToNode, long replyingToMessage, Reply reply)
+ {
+ responses.add(new Envelope<>(replyingToNode, reply, null));
+ super.reply(replyingToNode, replyingToMessage, reply);
+ }
+
+ public void assertHistorySizes(int requests, int responses)
+ {
+ Assertions.assertEquals(requests, this.requests.size());
+ Assertions.assertEquals(responses, this.responses.size());
+ }
+
+ public void clearHistory()
+ {
+ requests.clear();
+ responses.clear();
+ }
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
new file mode 100644
index 0000000..8ddbeca
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/mock/SimpleMessageSink.java
@@ -0,0 +1,37 @@
+package accord.impl.mock;
+
+import accord.local.Node;
+import accord.api.MessageSink;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+
+public class SimpleMessageSink implements MessageSink
+{
+ public final Node.Id node;
+ public final Network network;
+
+ public SimpleMessageSink(Node.Id node, Network network)
+ {
+ this.node = node;
+ this.network = network;
+ }
+
+ @Override
+ public void send(Node.Id to, Request request)
+ {
+ network.send(node, to, request, null);
+ }
+
+ @Override
+ public void send(Node.Id to, Request request, Callback callback)
+ {
+ network.send(node, to, request, callback);
+ }
+
+ @Override
+ public void reply(Node.Id replyingToNode, long replyingToMessage, Reply reply)
+ {
+ network.reply(node, replyingToNode, replyingToMessage, reply);
+ }
+}
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
new file mode 100644
index 0000000..3082f70
--- /dev/null
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -0,0 +1,120 @@
+package accord.messages;
+
+import accord.local.Instance;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.api.MessageSink;
+import accord.api.Scheduler;
+import accord.impl.mock.MockCluster.Clock;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.utils.ThreadPoolScheduler;
+import accord.local.*;
+import accord.txn.Keys;
+import accord.impl.IntKey;
+import accord.impl.mock.Network;
+import accord.impl.mock.RecordingMessageSink;
+import accord.impl.TestAgent;
+import accord.impl.mock.MockStore;
+import accord.topology.Shards;
+import accord.impl.TopologyFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Random;
+
+import static accord.Utils.id;
+import static accord.Utils.writeTxn;
+
+public class PreAcceptTest
+{
+ private static final Id ID1 = id(1);
+ private static final Id ID2 = id(2);
+ private static final Id ID3 = id(3);
+ private static final List<Id> IDS = List.of(ID1, ID2, ID3);
+ private static final Shards TOPOLOGY = TopologyFactory.toShards(IDS, 3, IntKey.range(0, 100));
+
+ private static Node createNode(Id nodeId, MessageSink messageSink, Clock clock)
+ {
+ Random random = new Random();
+ MockStore store = new MockStore();
+ Scheduler scheduler = new ThreadPoolScheduler();
+ return new Node(nodeId, TOPOLOGY, TOPOLOGY.forNode(nodeId), messageSink, random, clock, () -> store, new TestAgent(), scheduler);
+ }
+
+ @Test
+ void initialCommandTest()
+ {
+ RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
+ Clock clock = new Clock(100);
+ Node node = createNode(ID1, messageSink, clock);
+
+ IntKey key = IntKey.key(10);
+ Instance instance = node.local(key).orElseThrow();
+ Assertions.assertFalse(instance.hasCommandsForKey(key));
+
+ TxnId txnId = clock.idForNode(ID2);
+ Txn txn = writeTxn(Keys.of(key));
+ PreAccept preAccept = new PreAccept(txnId, txn);
+ clock.increment(10);
+ preAccept.process(node, ID2, 0);
+
+ Command command = instance.commandsForKey(key).uncommitted.get(txnId);
+ Assertions.assertEquals(Status.PreAccepted, command.status());
+
+ messageSink.assertHistorySizes(0, 1);
+ Assertions.assertEquals(ID2, messageSink.responses.get(0).to);
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId, new Dependencies()),
+ messageSink.responses.get(0).payload);
+ }
+
+ @Test
+ void nackTest()
+ {
+ RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
+ Clock clock = new Clock(100);
+ Node node = createNode(ID1, messageSink, clock);
+
+ IntKey key = IntKey.key(10);
+ Instance instance = node.local(key).orElseThrow();
+ Assertions.assertFalse(instance.hasCommandsForKey(key));
+
+ TxnId txnId = clock.idForNode(ID2);
+ Txn txn = writeTxn(Keys.of(key));
+ PreAccept preAccept = new PreAccept(txnId, txn);
+ preAccept.process(node, ID2, 0);
+ }
+
+ @Test
+ void singleKeyTimestampUpdate()
+ {
+ }
+
+ @Test
+ void multiKeyTimestampUpdate()
+ {
+ RecordingMessageSink messageSink = new RecordingMessageSink(ID1, Network.BLACK_HOLE);
+ Clock clock = new Clock(100);
+ Node node = createNode(ID1, messageSink, clock);
+
+ IntKey key1 = IntKey.key(10);
+ PreAccept preAccept1 = new PreAccept(clock.idForNode(ID2), writeTxn(Keys.of(key1)));
+ preAccept1.process(node, ID2, 0);
+
+ messageSink.clearHistory();
+ IntKey key2 = IntKey.key(11);
+ TxnId txnId2 = new TxnId(50, 0, ID3);
+ PreAccept preAccept2 = new PreAccept(txnId2, writeTxn(Keys.of(key1, key2)));
+ clock.increment(10);
+ preAccept2.process(node, ID3, 0);
+
+ messageSink.assertHistorySizes(0, 1);
+ Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
+ Dependencies expectedDeps = new Dependencies();
+ Assertions.assertEquals(new PreAccept.PreAcceptOk(new TxnId(110, 0, ID1), expectedDeps),
+ messageSink.responses.get(0).payload);
+ }
+}
diff --git a/accord-core/src/test/java/accord/topology/ShardTest.java b/accord-core/src/test/java/accord/topology/ShardTest.java
new file mode 100644
index 0000000..422aeb3
--- /dev/null
+++ b/accord-core/src/test/java/accord/topology/ShardTest.java
@@ -0,0 +1,108 @@
+package accord.topology;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class ShardTest
+{
+ @Test
+ void toleratedFailures()
+ {
+ Assertions.assertEquals(0, Shard.maxToleratedFailures(1));
+ Assertions.assertEquals(0, Shard.maxToleratedFailures(2));
+ Assertions.assertEquals(1, Shard.maxToleratedFailures(3));
+ Assertions.assertEquals(1, Shard.maxToleratedFailures(4));
+ Assertions.assertEquals(2, Shard.maxToleratedFailures(5));
+ Assertions.assertEquals(2, Shard.maxToleratedFailures(6));
+ Assertions.assertEquals(3, Shard.maxToleratedFailures(7));
+ Assertions.assertEquals(3, Shard.maxToleratedFailures(8));
+ Assertions.assertEquals(4, Shard.maxToleratedFailures(9));
+ Assertions.assertEquals(4, Shard.maxToleratedFailures(10));
+ Assertions.assertEquals(5, Shard.maxToleratedFailures(11));
+ Assertions.assertEquals(5, Shard.maxToleratedFailures(12));
+ Assertions.assertEquals(6, Shard.maxToleratedFailures(13));
+ Assertions.assertEquals(6, Shard.maxToleratedFailures(14));
+ Assertions.assertEquals(7, Shard.maxToleratedFailures(15));
+ Assertions.assertEquals(7, Shard.maxToleratedFailures(16));
+ Assertions.assertEquals(8, Shard.maxToleratedFailures(17));
+ Assertions.assertEquals(8, Shard.maxToleratedFailures(18));
+ Assertions.assertEquals(9, Shard.maxToleratedFailures(19));
+ Assertions.assertEquals(9, Shard.maxToleratedFailures(20));
+ }
+
+ int fastPathQuorumSize(int allReplicas, int electorateSize)
+ {
+ int f = Shard.maxToleratedFailures(allReplicas);
+ return (int) Math.ceil((electorateSize + f + 1) / 2.0);
+ }
+
+ void assertFastPathQuorumSize(int expected, int replicas, int fpElectorate)
+ {
+ int f = Shard.maxToleratedFailures(replicas);
+ int actual = Shard.fastPathQuorumSize(replicas, fpElectorate, f);
+ Assertions.assertEquals(fastPathQuorumSize(replicas, fpElectorate), actual);
+ Assertions.assertEquals(expected, actual);
+ }
+
+ void assertInvalidFastPathElectorateSize(int replicas, int fpElectorate)
+ {
+ int f = Shard.maxToleratedFailures(replicas);
+ try
+ {
+ Shard.fastPathQuorumSize(replicas, fpElectorate, f);
+ Assertions.fail(String.format("Expected exception for fp electorate size %s for replica set size %s (f %s)",
+ fpElectorate, replicas, f));
+ }
+ catch (IllegalArgumentException e)
+ {
+ // noop
+ }
+ }
+
+ @Test
+ void fastPathQuorumSizeTest()
+ {
+ // rf=3
+ assertFastPathQuorumSize(3, 3, 3);
+ assertFastPathQuorumSize(2, 3, 2);
+ assertInvalidFastPathElectorateSize(3, 1);
+
+ // rf=4
+ assertFastPathQuorumSize(3, 4, 4);
+ assertFastPathQuorumSize(3, 4, 3);
+ assertInvalidFastPathElectorateSize(4, 2);
+
+ // rf=5
+ assertFastPathQuorumSize(4, 5, 5);
+ assertFastPathQuorumSize(4, 5, 4);
+ assertFastPathQuorumSize(3, 5, 3);
+ assertInvalidFastPathElectorateSize(5, 2);
+
+ // rf=6
+ assertFastPathQuorumSize(5, 6, 6);
+ assertFastPathQuorumSize(4, 6, 5);
+ assertFastPathQuorumSize(4, 6, 4);
+ assertInvalidFastPathElectorateSize(6, 3);
+
+ // rf=7
+ assertFastPathQuorumSize(6, 7, 7);
+ assertFastPathQuorumSize(5, 7, 6);
+ assertFastPathQuorumSize(5, 7, 5);
+ assertFastPathQuorumSize(4, 7, 4);
+ assertInvalidFastPathElectorateSize(7, 3);
+
+ // rf=8
+ assertFastPathQuorumSize(6, 8, 8);
+ assertFastPathQuorumSize(6, 8, 7);
+ assertFastPathQuorumSize(5, 8, 6);
+ assertInvalidFastPathElectorateSize(8, 4);
+
+ // rf=9
+ assertFastPathQuorumSize(7, 9, 9);
+ assertFastPathQuorumSize(7, 9, 8);
+ assertFastPathQuorumSize(6, 9, 7);
+ assertFastPathQuorumSize(6, 9, 6);
+ assertFastPathQuorumSize(5, 9, 5);
+ assertInvalidFastPathElectorateSize(9, 4);
+ }
+}
diff --git a/accord-core/src/test/java/accord/verify/HistoryViolation.java b/accord-core/src/test/java/accord/verify/HistoryViolation.java
new file mode 100644
index 0000000..2429066
--- /dev/null
+++ b/accord-core/src/test/java/accord/verify/HistoryViolation.java
@@ -0,0 +1,12 @@
+package accord.verify;
+
+public class HistoryViolation extends AssertionError
+{
+ final int primaryKey;
+
+ public HistoryViolation(int primaryKey, Object detailMessage)
+ {
+ super(detailMessage);
+ this.primaryKey = primaryKey;
+ }
+}
diff --git a/accord-core/src/test/java/accord/verify/LinearizabilityVerifier.java b/accord-core/src/test/java/accord/verify/LinearizabilityVerifier.java
new file mode 100644
index 0000000..1c00ed7
--- /dev/null
+++ b/accord-core/src/test/java/accord/verify/LinearizabilityVerifier.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.verify;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import static accord.verify.LinearizabilityVerifier.Witness.Type.UPDATE_SUCCESS;
+import static accord.verify.LinearizabilityVerifier.Witness.Type.UPDATE_UNKNOWN;
+
+/**
+ * Linearizability checker.
+ * <p>
+ * We simply verify that there is no viewing of histories backwards or forwards in time (i.e. that the time periods each
+ * unique list is witnessable for is disjoint) and that each list is a prefix of any lists witnessed later
+ *
+ * TODO: merge with SerializabilityVerifier.
+ */
+public class LinearizabilityVerifier
+{
+ // A client observation of a sequence, and the logical start
+ // and end times of the client operation that witnessed it
+ public static class Observation extends Witness implements Comparable<Observation>
+ {
+ final int[] sequence;
+
+ public Observation(int[] sequence, int start, int end)
+ {
+ super(Type.READ, start, end);
+ this.sequence = sequence;
+ }
+
+ // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many pair-wise comparisons the answer is 0
+ public int compareTo(Observation that)
+ {
+ if (this.end < that.start)
+ return -1;
+ if (that.end < this.start)
+ return 1;
+ return 0;
+ }
+
+ public String toString()
+ {
+ return String.format("((%3d,%3d), WITNESS, %s)", start, end, Arrays.toString(sequence));
+ }
+ }
+
+ static class Witness
+ {
+ enum Type { UPDATE_SUCCESS, UPDATE_UNKNOWN, READ }
+
+ final Witness.Type type;
+ final int start;
+ final int end;
+
+ Witness(Witness.Type type, int start, int end)
+ {
+ this.type = type;
+ this.start = start;
+ this.end = end;
+ }
+
+ public String toString()
+ {
+ return String.format("((%3d,%3d),%s)", start, end, type);
+ }
+ }
+
+ static class Event
+ {
+ final List<Witness> log = new ArrayList<>();
+
+ final int eventId;
+ int eventPosition = -1;
+ int[] sequence;
+ int visibleBy = Integer.MAX_VALUE; // witnessed by at least this time
+ int visibleUntil = -1; // witnessed until at least this time (i.e. witnessed nothing newer by then)
+ Boolean result; // unknown, success or (implied by not being witnessed) failure
+
+ Event(int eventId)
+ {
+ this.eventId = eventId;
+ }
+ }
+
+ final int primaryKey;
+ private Event[] byId;
+ private final Queue<Event> unwitnessed = new ArrayDeque<>();
+ private Event[] events = new Event[16];
+
+ public LinearizabilityVerifier(int primaryKey)
+ {
+ this.primaryKey = primaryKey;
+ byId = new Event[16];
+ }
+
+ public void witnessRead(Observation observed)
+ {
+ int eventPosition = observed.sequence.length;
+ int eventId = eventPosition == 0 ? -1 : observed.sequence[eventPosition - 1];
+ Event event = get(eventPosition, eventId);
+ recordWitness(event, observed);
+ recordVisibleBy(event, observed.end);
+ recordVisibleUntil(event, observed.start);
+
+ // see if any of the unwitnessed events can be ruled out
+ if (!unwitnessed.isEmpty())
+ {
+ Iterator<Event> iter = unwitnessed.iterator();
+ while (iter.hasNext())
+ {
+ Event e = iter.next();
+ if (e.visibleBy < observed.start)
+ {
+ if (e.result == null)
+ {
+ // still accessible byId, so if we witness it later we will flag the inconsistency
+ e.result = Boolean.FALSE;
+ iter.remove();
+ }
+ else if (e.result)
+ {
+ throw fail(primaryKey, "%d witnessed as absent at T%d", e.eventId, observed.end);
+ }
+ }
+ }
+ }
+ }
+
+ public void witnessWrite(int eventId, int start, int end, boolean success)
+ {
+ Event event = ensureById(eventId);
+ if (event == null)
+ {
+ byId[eventId] = event = new Event(eventId);
+ unwitnessed.add(event);
+ }
+
+ event.log.add(new Witness(success ? UPDATE_SUCCESS : UPDATE_UNKNOWN, start, end));
+ recordVisibleUntil(event, start);
+ recordVisibleBy(event, end); // even the result is unknown, the result must be visible to other operations by the time we terminate
+ if (success)
+ {
+ if (event.result == Boolean.FALSE)
+ throw fail(primaryKey, "witnessed absence of %d but event returned success", eventId);
+ event.result = Boolean.TRUE;
+ }
+ }
+
+ void recordWitness(Event event, Observation witness)
+ {
+ recordWitness(event, witness.sequence.length, witness, witness.sequence);
+ }
+
+ void recordWitness(Event event, int eventPosition, Observation witness, int[] sequence)
+ {
+ while (true)
+ {
+ event.log.add(witness);
+ if (event.sequence != null)
+ {
+ if (!Arrays.equals(event.sequence, sequence))
+ throw fail(primaryKey, "%s previously witnessed %s", witness, event.sequence);
+ return;
+ }
+
+ event.sequence = sequence;
+ event.eventPosition = eventPosition;
+
+ event = prev(event);
+ if (event == null)
+ break;
+
+ if (event.sequence != null)
+ {
+ // verify it's a strict prefix
+ if (!equal(event.sequence, sequence, sequence.length - 1))
+ throw fail(primaryKey, "%s previously witnessed %s", sequence, event.sequence);
+ break;
+ }
+
+ // if our predecessor event hasn't been witnessed directly, witness it by this event, even if
+ // we say nothing about the times it may have been witnessed (besides those implied by the write event)
+ eventPosition -= 1;
+ sequence = Arrays.copyOf(sequence, eventPosition);
+ }
+ }
+
+ void recordVisibleBy(Event event, int visibleBy)
+ {
+ if (visibleBy < event.visibleBy)
+ {
+ event.visibleBy = visibleBy;
+ Event prev = prev(event);
+ if (prev != null && prev.visibleUntil >= visibleBy)
+ throw fail(primaryKey, "%s not witnessed >= %d, but also witnessed <= %d", event.sequence, event.eventId, prev.visibleUntil, event.visibleBy);
+ }
+ }
+
+ void recordVisibleUntil(Event event, int visibleUntil)
+ {
+ if (visibleUntil > event.visibleUntil)
+ {
+ event.visibleUntil = visibleUntil;
+ Event next = next(event);
+ if (next != null && visibleUntil >= next.visibleBy)
+ throw fail(primaryKey, "%s %d not witnessed >= %d, but also witnessed <= %d", next.sequence, next.eventId, event.visibleUntil, next.visibleBy);
+ }
+ }
+
+ Event ensureById(int id)
+ {
+ if (byId.length <= id)
+ byId = Arrays.copyOf(byId, id + 1 + (id / 2));
+ return byId[id];
+ }
+
+ /**
+ * Initialise the Event representing both eventPosition and eventId for witnessing
+ */
+ Event get(int eventPosition, int eventId)
+ {
+ if (eventPosition >= events.length)
+ events = Arrays.copyOf(events, Integer.max(eventPosition + 1, events.length * 2));
+
+ Event event = events[eventPosition];
+ if (event == null)
+ {
+ if (eventId < 0)
+ {
+ assert eventId == -1;
+ events[eventPosition] = event = new Event(eventId);
+ }
+ else
+ {
+ event = ensureById(eventId);
+ if (event != null)
+ {
+ if (event.eventPosition >= 0)
+ throw fail(primaryKey, "%d occurs at positions %d and %d", eventId, eventPosition, event.eventPosition);
+ events[eventPosition] = event;
+ unwitnessed.remove(event);
+ }
+ else
+ {
+ byId[eventId] = events[eventPosition] = event = new Event(eventId);
+ }
+ }
+ }
+ else
+ {
+ if (eventId != event.eventId)
+ throw fail(primaryKey, "(eventId, eventPosition): (%d, %d) != (%d, %d)", eventId, eventPosition, event.eventId, event.eventPosition);
+ else if (eventPosition != event.eventPosition)
+ throw fail(primaryKey, "%d occurs at positions %d and %d", eventId, eventPosition, event.eventPosition);
+ }
+ return event;
+ }
+
+ Event prev(Event event)
+ {
+ // we can reach here via recordOutcome without knowing our Observation,
+ // in which case we won't know our predecessor event, so we cannot do anything useful
+ if (event.sequence == null)
+ return null;
+
+ int eventPosition = event.eventPosition - 1;
+ if (eventPosition < 0)
+ return null;
+
+ // initialise the event, if necessary importing information from byId
+ return get(eventPosition, eventPosition == 0 ? -1 : event.sequence[eventPosition - 1]);
+ }
+
+ Event next(Event event)
+ {
+ int eventPosition = event.eventPosition + 1;
+ if (eventPosition == 0 || eventPosition >= events.length)
+ return null;
+
+ // we cannot initialise the event meaningfully, so just return what is already known (if anything)
+ return events[eventPosition];
+ }
+
+ void print()
+ {
+ for (Event e : events)
+ {
+ if (e == null) break;
+ System.err.printf("%d: (%4d,%4d) %s %s\n", primaryKey, e.visibleBy, e.visibleUntil, Arrays.toString(e.sequence), e.log);
+ }
+ for (Event e : byId)
+ {
+ if (e == null) continue;
+ System.err.printf("%s: %s\n", e.eventId, e.log);
+ }
+ }
+
+ static Error fail(int primaryKey, String message, Object ... params)
+ {
+ for (int i = 0 ; i < params.length ; ++i)
+ if (params[i] instanceof int[]) params[i] = Arrays.toString((int[]) params[i]);
+ throw new HistoryViolation(primaryKey, "history violation on " + primaryKey + ": " + String.format(message, params));
+ }
+
+ static boolean equal(int[] a, int [] b, int count)
+ {
+ for (int i = 0 ; i < count ; ++i)
+ if (a[i] != b[i])
+ return false;
+ return true;
+ }
+}
diff --git a/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java b/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java
new file mode 100644
index 0000000..895053b
--- /dev/null
+++ b/accord-core/src/test/java/accord/verify/SerializabilityVerifier.java
@@ -0,0 +1,496 @@
+package accord.verify;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+/**
+ * Nomenclature:
+ * register: the values associated with a given key
+ * step: a logical point in the causal sequence of events for a register
+ * step index: the index of a step; since we observe a strictly growing sequence this translates directly to
+ * the length of an observed sequence for a key. This imposes a total causal order for a given register
+ * (sequences [], [1], [1,2] have step indexes of 0, 1 and 2 respectively, with 2 necessarily happening after 1 and 0)
+ * predecessor: a step for a referent key that must occur before the referring step and key.
+ * two kinds: 1) those values for keys [B,C..) at step index i for a read of key A, precede key A's step index i +1
+ * 2) those values for keys [B,C..) at step index i for a write of key A, precede key A's step index i
+ * max predecessor: the maximum predecessor that may be reached via any predecessor relation
+ *
+ * Ensure there are no cycles in the implied list of predecessors, i.e. that we have a serializable order.
+ * That is, we maintain links to the maximum predecessor step for each key, at each step for each key.
+ * In combination with a linearizability verifier for each register/partition, we verify strict-serializability.
+ *
+ * TODO: find and report a path when we encounter a violation
+ */
+public class SerializabilityVerifier
+{
+ /**
+ * A link to the maximum predecessor node for a given key reachable from the transitive closure of predecessor
+ * relations from a given register's observation (i.e. for a given sequence observed for a given key).
+ *
+ * A predecessor is an absolute happens-before relationship. This is created either:
+ * 1) by witnessing some read for key A coincident with a write for key B,
+ * therefore the write for key B happened strictly after the write for key A; or
+ * 2) any value for key A witnessed alongside a step index i for key B happens before i+1 for key B.
+ *
+ * For every observation step index i for key A, we have an outgoing MaxPredecessor link to every key.
+ * This object both maintains the current maximum predecessor step index reachable via the transitive closure
+ * of happens-before relationships, but represents a back-link from that step index for the referred-to key,
+ * so that when its own predecessor memoized maximum predecessor values are updated we can propagate them here.
+ *
+ * In essence, each node in the happens-before graph maintains a link to every possible frontier of its transitive
+ * closure in the graph, so that each time that frontier is updated the internal nodes that reference it are updated
+ * to the new frontier. This ensures ~computationally optimal maintenance of this transitive closure at the expense
+ * of quadratic memory utilisation, but for small numbers of unique keys this remains quite manageable (i.e. steps*keys^2)
+ *
+ * This graph can be interpreted quite simply: if any step index for a key can reach itself (or a successor) via
+ * the transitive closure of happens-before relations then there is a serializability violation.
+ */
+ static class MaxPredecessor implements Comparable<MaxPredecessor>
+ {
+ MaxPredecessor prev = this, next = this;
+
+ final int ofKey;
+ final int ofStepIndex;
+
+ // TODO: we probably don't need this field, as it's implied by the node we point to, that we have when we enqueue refresh
+ final int predecessorKey;
+ int predecessorStepIndex;
+
+ MaxPredecessor(int ofKey, int ofStepIndex, int predecessorKey)
+ {
+ this.ofKey = ofKey;
+ this.ofStepIndex = ofStepIndex;
+ this.predecessorKey = predecessorKey;
+ this.predecessorStepIndex = -1;
+ }
+
+ @Override
+ public int compareTo(MaxPredecessor that)
+ {
+ if (this.ofStepIndex != that.ofStepIndex) return Integer.compare(this.ofStepIndex, that.ofStepIndex);
+ else return Integer.compare(this.ofKey, that.ofKey);
+ }
+
+ /**
+ * Unlink {@code push} from any list in which it presently resides, and link it to this one
+ */
+ void push(MaxPredecessor push)
+ {
+ MaxPredecessor head = this;
+ // unlink push from its current list
+ push.next.prev = push.prev;
+ push.prev.next = push.next;
+ // link push to this list
+ push.next = head.next;
+ push.prev = head;
+ head.next = push;
+ push.next.prev = push;
+ }
+
+ /**
+ * Apply {@code forEach} to each element in this list
+ */
+ void forEach(Consumer<MaxPredecessor> forEach)
+ {
+ MaxPredecessor next = this.next;
+ while (next != this)
+ {
+ forEach.accept(next);
+ next = next.next;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return Integer.toString(predecessorStepIndex);
+ }
+ }
+
+ /**
+ * Represents the graph state for a step for a single key, and maintains backwards references to all
+ * internal nodes of the graph whose maximum predecessor for this key points to this step.
+ * When this step is updated, we queue all of these internal nodes to update their own max predecessors.
+ */
+ static class Step extends MaxPredecessor
+ {
+ /**
+ * the maximum _step_ of the corresponding key's sequence that was witnessed alongside this step for this key
+ */
+ final int[] maxPeers;
+
+ /**
+ * The maximum _step_ of the corresponding key's sequence that was witnessed by any transitive predecessor of this key for this step.
+ * That is, if we look at the directly preceding step for this key (which must by definition precede this step) and explore all of
+ * its predecessors in the same manner, what is the highest step we can reach for each key.
+ */
+ final MaxPredecessor[] maxPredecessors;
+
+ /**
+ * The next instantiated sequence's observation.
+ * This may not be stepIndex+1, if we have not witnessed stepIndex+1 directly.
+ * i.e. if we have witnessed [0] and [0,1,2] then 0's successor will be 2.
+ * If we later witness [0,1], 0's successor will be updated to 1, whose successor will be 2.
+ */
+ Step successor;
+
+ Step(int key, int stepIndex, int keyCount)
+ {
+ super(key, stepIndex, key);
+ this.maxPeers = new int[keyCount];
+ Arrays.fill(maxPeers, -1);
+ this.maxPredecessors = new MaxPredecessor[keyCount];
+ this.predecessorStepIndex = stepIndex - 1;
+ }
+
+ /**
+ * The maxPredecessor for {@code key}, instantiating it if none currently exists
+ */
+ MaxPredecessor maxPredecessor(int key)
+ {
+ if (maxPredecessors[key] == null)
+ maxPredecessors[key] = new MaxPredecessor(ofKey, ofStepIndex, key);
+ return maxPredecessors[key];
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" + Arrays.toString(maxPeers) + ", " + Arrays.toString(maxPredecessors) + '}';
+ }
+ }
+
+ /**
+ * The history of observations for a given key, or the set of nodes in the graph of observations for this key.
+ *
+ * TODO: extend LinearizabilityVerifier
+ */
+ class Register
+ {
+ final int key;
+ // the total order sequence for this register
+ int[] sequence = new int[0];
+ Step[] steps = new Step[1];
+
+ /**
+ * Any write value we don't know the step index for because we did not perform a coincident read;
+ * we wait until we witness a read containing the
+ */
+ Map<Integer, List<Deferred>> deferred = new HashMap<>();
+
+ Register(int key)
+ {
+ this.key = key;
+ }
+
+ private void updateSequence(int[] sequence, int maybeWrite)
+ {
+ for (int i = 0, max = Math.min(sequence.length, this.sequence.length) ; i < max ; ++i)
+ {
+ if (sequence[i] != this.sequence[i])
+ throw new HistoryViolation(key, "Inconsistent sequences: " + Arrays.toString(this.sequence) + " vs " + Arrays.toString(sequence));
+ }
+ if (this.sequence.length > sequence.length)
+ {
+ if (maybeWrite >= 0 && maybeWrite != this.sequence[sequence.length])
+ throw new HistoryViolation(key, "Inconsistent sequences: " + Arrays.toString(this.sequence) + " vs " + Arrays.toString(sequence) + "+" + maybeWrite);
+ }
+ else
+ {
+ if (maybeWrite >= 0)
+ {
+ sequence = Arrays.copyOf(sequence, sequence.length + 1);
+ sequence[sequence.length - 1] = maybeWrite;
+ }
+ if (sequence.length > this.sequence.length)
+ {
+ // process any sequences deferred because we didnt know what step they occurred on
+ for (int i = this.sequence.length ; i < sequence.length ; ++i)
+ {
+ List<Deferred> deferreds = deferred.remove(i);
+ if (deferreds != null)
+ {
+ for (Deferred deferred : deferreds)
+ {
+ deferred.update(key, i + 1);
+ deferred.process(SerializabilityVerifier.this);
+ }
+ }
+ }
+ this.sequence = sequence;
+ }
+ }
+ }
+
+ Step step(int step)
+ {
+ if (steps.length <= step)
+ steps = Arrays.copyOf(steps, Math.max(step + 1, steps.length * 2));
+
+ if (steps[step] == null)
+ {
+ steps[step] = new Step(key, step, keyCount);
+ int i = step;
+ while (--i >= 0 && steps[i] == null) {}
+ if (i >= 0)
+ {
+ steps[step].successor = steps[i].successor;
+ steps[i].successor = steps[step];
+ updatePredecessors(steps[step], steps[i], true);
+ }
+ else
+ {
+ i = step;
+ while (++i < steps.length && steps[i] == null) {}
+ if (i < steps.length)
+ {
+ steps[step].successor = steps[i];
+ updatePredecessors(steps[i], steps[step], true);
+ }
+ }
+ }
+ return steps[step];
+ }
+
+ private void updatePeersAndPredecessors(int[] newPeers, int[][] reads, int[] writes)
+ {
+ int stepIndex = newPeers[key];
+ Step step = step(stepIndex);
+ boolean updated = updatePeers(step, newPeers);
+ updated |= updatePredecessorsOfWrite(step, reads, writes);
+ if (updated)
+ onChange(step);
+ }
+
+ private boolean updatePeers(Step step, int[] newPeers)
+ {
+ boolean updated = false;
+ for (int key = 0 ; key < keyCount ; ++key)
+ {
+ int newPeer = newPeers[key];
+ int maxPeer = step.maxPeers[key];
+ if (newPeer > maxPeer)
+ {
+ updated = true;
+ step.maxPeers[key] = newPeers[key];
+ }
+ }
+ return updated;
+ }
+
+ private void updatePredecessors(Step updateStep, Step fromStep, boolean includeSelf)
+ {
+ boolean updated = false;
+ for (int key = 0 ; key < keyCount ; ++key)
+ {
+ MaxPredecessor newPredecessor = fromStep.maxPredecessors[key];
+ int selfPredecessorStepIndex = includeSelf ? fromStep.maxPeers[key] : -1;
+ int newPredecessorStepIndex = newPredecessor == null ? selfPredecessorStepIndex
+ : Math.max(selfPredecessorStepIndex, newPredecessor.predecessorStepIndex);
+ MaxPredecessor maxPredecessor;
+ if (newPredecessorStepIndex >= 0 && newPredecessorStepIndex > (maxPredecessor = updateStep.maxPredecessor(key)).predecessorStepIndex)
+ {
+ maxPredecessor.predecessorStepIndex = newPredecessorStepIndex;
+ registers[key].step(newPredecessorStepIndex).push(maxPredecessor);
+ updated = true;
+ }
+ }
+ if (updated)
+ onChange(updateStep);
+ }
+
+ /**
+ * keys that are written as part of the transaction occur with the transaction,
+ * so those that are only read must precede them
+ */
+ private boolean updatePredecessorsOfWrite(Step step, int[][] reads, int[] writes)
+ {
+ if (writes[key] < 0)
+ return false;
+
+ boolean updated = false;
+ for (int key = 0 ; key < writes.length ; ++key)
+ {
+ if (reads[key] == null)
+ continue;
+
+ int newPredecessorStepIndex = reads[key].length;
+ MaxPredecessor maxPredecessor;
+ if (newPredecessorStepIndex > (maxPredecessor = step.maxPredecessor(key)).predecessorStepIndex)
+ {
+ maxPredecessor.predecessorStepIndex = newPredecessorStepIndex;
+ Step fromStep = registers[key].step(newPredecessorStepIndex);
+ fromStep.push(maxPredecessor);
+ updatePredecessors(step, fromStep, false);
+ updated = true;
+ }
+ }
+ return updated;
+ }
+
+ void onChange(Step step)
+ {
+ if (step.maxPredecessor(key).predecessorStepIndex >= step.ofStepIndex)
+ throw new HistoryViolation(key, "Cycle detected on key " + key + ", step " + step.ofStepIndex + " " + Arrays.toString(Arrays.copyOf(sequence, step.ofStepIndex)));
+
+ step.forEach(refresh::add);
+ if (step.successor != null)
+ updatePredecessors(step.successor, step, true);
+ }
+
+ void registerDeferred(int unknownStepWriteValue, Deferred deferred)
+ {
+ this.deferred.computeIfAbsent(unknownStepWriteValue, ignore -> new ArrayList<>())
+ .add(deferred);
+ }
+
+ @Override
+ public String toString()
+ {
+ return Arrays.toString(steps);
+ }
+ }
+
+ // writes without a corresponding read don't know their position in the total order for the register
+ // once this is known we can process the implied predecessor graph for them
+ private static class Deferred
+ {
+ final int[] newPeers;
+ final int[][] reads;
+ final int[] writes;
+
+ Deferred(int[] newPeers, int[][] reads, int[] writes)
+ {
+ this.newPeers = newPeers;
+ this.reads = reads;
+ this.writes = writes;
+ }
+
+ void update(int key, int step)
+ {
+ newPeers[key] = step;
+ }
+
+ void process(SerializabilityVerifier verifier)
+ {
+ for (int k = 0; k < newPeers.length ; ++k)
+ {
+ if (newPeers[k] >= 0)
+ verifier.registers[k].updatePeersAndPredecessors(newPeers, reads, writes);
+ }
+ }
+ }
+
+ final int keyCount;
+ final Register[] registers;
+
+ // TODO: use another intrusive list or intrusive tree
+ final TreeSet<MaxPredecessor> refresh = new TreeSet<>();
+
+ // [key]->the sequence returned by any read performed in this transaction
+ final int[][] bufReads;
+ // [key]->the value of any write performed in this transaction
+ final int[] bufWrites;
+ // [key]->the step witnessed with the current transaction (if known)
+ final int[] bufNewPeerSteps;
+
+ public SerializabilityVerifier(int keyCount)
+ {
+ this.keyCount = keyCount;
+ this.bufNewPeerSteps = new int[keyCount];
+ this.bufWrites = new int[keyCount];
+ this.bufReads = new int[keyCount][];
+ this.registers = IntStream.range(0, keyCount)
+ .mapToObj(Register::new)
+ .toArray(Register[]::new);
+ }
+
+ /**
+ * Start a new set of coincident observations
+ */
+ public void begin()
+ {
+ Arrays.fill(bufWrites, -1);
+ Arrays.fill(bufNewPeerSteps, -1);
+ Arrays.fill(bufReads, null);
+ }
+
+ /**
+ * Buffer a new read observation.
+ *
+ * Note that this should EXCLUDE any witnessed write for this key.
+ * This is to simplify the creation of direct happens-before edges with observations for other keys
+ * that are implied by the witnessing of a write (and is also marginally more efficient).
+ */
+ public void witnessRead(int key, int[] sequence)
+ {
+ if (bufReads[key] != null)
+ throw new IllegalStateException("Can buffer only one read observation for each key");
+ bufReads[key] = sequence;
+ // if we have a write, then for causality sequence is implicitly longer by one to include the write
+ bufNewPeerSteps[key] = bufWrites[key] >= 0 ? sequence.length + 1 : sequence.length;
+ }
+
+ /**
+ * Buffer a new read observation
+ */
+ public void witnessWrite(int key, int id)
+ {
+ if (bufWrites[key] >= 0)
+ throw new IllegalStateException("Can buffer only one write observation for each key");
+ bufWrites[key] = id;
+ if (bufReads[key] != null)
+ bufNewPeerSteps[key] = bufReads[key].length;
+ }
+
+ /**
+ * Apply the pending coincident observations to the verification graph
+ */
+ public void apply()
+ {
+ for (int k = 0; k < bufReads.length ; ++k)
+ {
+ if (bufWrites[k] >= 0 && bufReads[k] == null)
+ {
+ int i = Arrays.binarySearch(registers[k].sequence, bufWrites[k]);
+ if (i >= 0)
+ bufNewPeerSteps[k] = i + 1;
+ }
+ }
+
+ Deferred deferred = null;
+ for (int k = 0; k < bufReads.length ; ++k)
+ {
+ if (bufReads[k] != null)
+ registers[k].updateSequence(bufReads[k], bufWrites[k]);
+
+ if (bufNewPeerSteps[k] >= 0)
+ {
+ registers[k].updatePeersAndPredecessors(bufNewPeerSteps, bufReads, bufWrites);
+ }
+ else if (bufWrites[k] >= 0)
+ {
+ if (deferred == null)
+ deferred = new SerializabilityVerifier.Deferred(bufNewPeerSteps.clone(), bufReads.clone(), bufWrites.clone());
+ registers[k].registerDeferred(bufWrites[k], deferred);
+ }
+ }
+
+ refreshTransitive();
+ }
+
+ private void refreshTransitive()
+ {
+ for (MaxPredecessor next = refresh.pollFirst(); next != null; next = refresh.pollFirst())
+ {
+ registers[next.ofKey].updatePredecessors(registers[next.ofKey].step(next.ofStepIndex),
+ registers[next.predecessorKey].step(next.predecessorStepIndex), false);
+ }
+ }
+}
diff --git a/accord-core/src/test/java/accord/verify/SerializabilityVerifierTest.java b/accord-core/src/test/java/accord/verify/SerializabilityVerifierTest.java
new file mode 100644
index 0000000..17c8ba2
--- /dev/null
+++ b/accord-core/src/test/java/accord/verify/SerializabilityVerifierTest.java
@@ -0,0 +1,154 @@
+package accord.verify;
+
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Test;
+
+import static accord.verify.SerializabilityVerifierTest.Observation.r;
+import static accord.verify.SerializabilityVerifierTest.Observation.rw;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SerializabilityVerifierTest
+{
+ static class Observation
+ {
+ final int write;
+ final int[] reads;
+
+ Observation(int write, int[] reads)
+ {
+ this.reads = reads;
+ this.write = write;
+ }
+
+ static Observation w(int write)
+ {
+ return new Observation(write, null);
+ }
+
+ static Observation rw(int write, int ... reads)
+ {
+ return new Observation(write, reads);
+ }
+
+ static Observation r(int ... reads)
+ {
+ return new Observation(-1, reads);
+ }
+ }
+
+ @Test
+ public void noCycle()
+ {
+ assertNoViolation(new Observation[] { r( ), r( ) },
+ new Observation[] { r( ), r(1) },
+ new Observation[] { r(1), r(1) });
+ assertNoViolation(new Observation[] { r( ), r( ) },
+ new Observation[] { r( ), r(1,2) },
+ new Observation[] { r(1,2), r(1,2) });
+ assertNoViolation(new Observation[] { r( ), r( ) },
+ new Observation[] { r( ), r(1 ) },
+ new Observation[] { r(1 ), r(1 ) },
+ new Observation[] { r(1,2), r(1 ) },
+ new Observation[] { r(1,2), r(1,2) });
+ }
+
+ @Test
+ public void directCycle()
+ {
+ assertViolation(new Observation[] { r(0), r( ) },
+ new Observation[] { r( ), r(1) });
+ assertViolation(new Observation[] { r(0), r( ), r( ) },
+ new Observation[] { r( ), r(1), r( ) });
+ }
+
+ @Test
+ public void indirectCycle()
+ {
+ assertViolation(new Observation[] { r(0), r( ), r( ) },
+ new Observation[] { r( ), r(1), r( ) },
+ new Observation[] { r( ), r( ), r(2) });
+ assertViolation(new Observation[] { r( ), r(1), null },
+ new Observation[] { null, r( ), r(2) },
+ new Observation[] { r(0), null, r( ) });
+ assertViolation(new Observation[] { r( ), r(1,2), r( ) },
+ new Observation[] { r( ), r( ), r(2,3) },
+ new Observation[] { r(0,1), r( ), r( ) });
+ assertViolation(new Observation[] { r(0), r( ), r( ), r( ), r( ) },
+ new Observation[] { r( ), r(1), r( ), r( ), r( ) },
+ new Observation[] { r( ), r( ), r(2), r( ), r( ) },
+ new Observation[] { r( ), r( ), r( ), r(3), r( ) },
+ new Observation[] { r( ), r( ), r( ), r( ), r(4) });
+ }
+
+ @Test
+ public void writeCycle()
+ {
+ assertViolation(new Observation[] { r(), rw(1) },
+ new Observation[] { r(0), r(1)});
+ assertViolation(new Observation[] { r(), rw(1) },
+ new Observation[] { r(0, 1), r(1)});
+ assertViolation(new Observation[] { r(), rw(1) },
+ new Observation[] { r(0, 1), r(1, 2)});
+ }
+
+ private static void assertViolation(int[][] ... setOfObservations)
+ {
+ assertThrows(HistoryViolation.class, () -> {
+ SerializabilityVerifier verifier = new SerializabilityVerifier(setOfObservations[0].length);
+ for (int[][] observations : setOfObservations)
+ {
+ verifier.begin();
+ for (int i = 0 ; i < observations.length ; ++i)
+ {
+ int[] observation = observations[i];
+ if (observation != null)
+ verifier.witnessRead(i, observation);
+ }
+ verifier.apply();
+ }
+ });
+ }
+
+ private static void run(Observation[][] setOfObservations)
+ {
+ SerializabilityVerifier verifier = new SerializabilityVerifier(setOfObservations[0].length);
+ for (Observation[] observations : setOfObservations)
+ {
+ verifier.begin();
+ for (int key = 0; key < observations.length; ++key)
+ {
+ Observation observation = observations[key];
+ if (observation != null && observation.reads != null)
+ verifier.witnessRead(key, observation.reads);
+ if (observation != null && observation.write >= 0)
+ verifier.witnessWrite(key, observation.write);
+ }
+ verifier.apply();
+ }
+ }
+
+ private static void forEach(Observation[][] permute, Consumer<Observation[][]> forEach)
+ {
+ Observation[][] permuted = new Observation[permute.length][];
+ for (int offset = 0 ; offset < permute.length ; ++offset)
+ {
+ // TODO: more permutations
+ for (int i = 0 ; i < permute.length ; ++i)
+ permuted[i] = permute[(offset + i) % permute.length];
+
+ forEach.accept(permuted);
+ }
+ }
+
+ private static void assertNoViolation(Observation[] ... setOfObservations)
+ {
+ forEach(setOfObservations, SerializabilityVerifierTest::run);
+ }
+
+ private static void assertViolation(Observation[] ... setOfObservations)
+ {
+ forEach(setOfObservations, permuted -> assertThrows(HistoryViolation.class, () -> run(permuted)));
+ }
+
+}
diff --git a/accord-maelstrom/build.gradle b/accord-maelstrom/build.gradle
new file mode 100644
index 0000000..b468830
--- /dev/null
+++ b/accord-maelstrom/build.gradle
@@ -0,0 +1,49 @@
+plugins {
+ id 'java'
+}
+
+group 'accord'
+version '1.0-SNAPSHOT'
+
+repositories {
+ mavenCentral()
+}
+
+compileJava {
+ sourceCompatibility = JavaVersion.VERSION_11
+}
+
+dependencies {
+ compile project(':accord-core')
+ implementation 'com.google.code.gson:gson:2.8.7'
+ implementation 'com.google.guava:guava:30.1.1-jre'
+ implementation 'ch.qos.logback:logback-classic:1.2.3'
+ testCompile project(path: ':accord-core', configuration: 'testClasses')
+ testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+ testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
+}
+
+test {
+ useJUnitPlatform()
+}
+
+jar {
+ manifest {
+ attributes(
+ 'Main-Class': 'accord.maelstrom.Main',
+ )
+ }
+}
+
+task fatJar(type: Jar) {
+ manifest.from jar.manifest
+ classifier = 'all'
+ from {
+ configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
+ } {
+ exclude "META-INF/*.SF"
+ exclude "META-INF/*.DSA"
+ exclude "META-INF/*.RSA"
+ }
+ with jar
+}
\ No newline at end of file
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Body.java b/accord-maelstrom/src/main/java/accord/maelstrom/Body.java
new file mode 100644
index 0000000..651fc68
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Body.java
@@ -0,0 +1,159 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.gson.JsonArray;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import accord.local.Node.Id;
+import accord.txn.Txn;
+import accord.maelstrom.Packet.Type;
+
+public class Body
+{
+ public static final long SENTINEL_MSG_ID = Long.MIN_VALUE;
+
+ final Type type;
+ final long msg_id;
+ final long in_reply_to;
+
+ public Body(Type type, long msg_id, long in_reply_to)
+ {
+ this.type = type;
+ this.msg_id = msg_id;
+ this.in_reply_to = in_reply_to;
+ }
+
+ void writeBody(JsonWriter out) throws IOException
+ {
+ out.name("type");
+ out.value(type.name());
+ if (msg_id > SENTINEL_MSG_ID)
+ {
+ out.name("msg_id");
+ out.value(msg_id);
+ }
+ if (in_reply_to > SENTINEL_MSG_ID)
+ {
+ out.name("in_reply_to");
+ out.value(in_reply_to);
+ }
+ }
+
+ public static final TypeAdapter<Body> GSON_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Body value) throws IOException
+ {
+ out.beginObject();
+ value.writeBody(out);
+ out.endObject();
+ }
+
+ @Override
+ public Body read(JsonReader in) throws IOException
+ {
+ return Body.read(in, null);
+ }
+ };
+
+ public static final TypeAdapter<Body> FAIL_READ = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Body value) throws IOException
+ {
+ out.beginObject();
+ value.writeBody(out);
+ out.endObject();
+ }
+
+ @Override
+ public Body read(JsonReader in)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ public static Body read(JsonReader in, Id from) throws IOException
+ {
+ Type type = null;
+ long msg_id = 0, in_reply_to = 0;
+ int code = -1;
+ String text = null;
+ Txn txn = null;
+ MaelstromResult txn_ok = null;
+ Object body = null;
+ Id node_id = null;
+ List<Id> node_ids = null;
+ String deferredTxn = null;
+
+ in.beginObject();
+ while (in.hasNext())
+ {
+ String field = in.nextName();
+ switch (field)
+ {
+ case "type":
+ String v = in.nextString();
+ type = Type.valueOf(v);
+ break;
+ case "msg_id":
+ msg_id = in.nextLong();
+ break;
+ case "in_reply_to":
+ in_reply_to = in.nextLong();
+ break;
+ case "code":
+ code = in.nextInt();
+ break;
+ case "text":
+ text = in.nextString();
+ break;
+ case "body":
+ body = Json.GSON.fromJson(in, type.type);
+ break;
+ case "txn":
+ if (from == null)
+ throw new IllegalStateException();
+ if (msg_id == 0 || type == null) deferredTxn = Json.GSON.fromJson(in, JsonArray.class).toString();
+ else if (type == Type.txn) txn = MaelstromRequest.readTxnExternal(in, from, msg_id);
+ else txn_ok = MaelstromReply.readResultExternal(in, from, msg_id);
+ break;
+ case "node_id":
+ node_id = Json.ID_ADAPTER.read(in);
+ break;
+ case "node_ids":
+ node_ids = new ArrayList<>();
+ in.beginArray();
+ while (in.hasNext())
+ node_ids.add(Json.ID_ADAPTER.read(in));
+ in.endArray();
+ break;
+ default:
+ throw new IllegalStateException("Unexpected field " + field);
+ }
+ }
+ in.endObject();
+
+ if (deferredTxn != null)
+ {
+ JsonReader in2 = new JsonReader(new StringReader(deferredTxn));
+ if (type == Type.txn) txn = MaelstromRequest.readTxnExternal(in2, from, msg_id);
+ else txn_ok = MaelstromReply.readResultExternal(in2, from, msg_id);
+ }
+
+ switch (type)
+ {
+ case init: return new MaelstromInit(msg_id, node_id, node_ids.toArray(Id[]::new));
+ case init_ok: return new Body(Type.init_ok, msg_id, in_reply_to);
+ case txn: return new MaelstromRequest(msg_id, txn);
+ case txn_ok: return new MaelstromReply(in_reply_to, txn_ok);
+ case error: return new Error(in_reply_to, code, text);
+ default: return new Wrapper(type, msg_id, in_reply_to, body);
+ }
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
new file mode 100644
index 0000000..d9585a2
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -0,0 +1,276 @@
+package accord.maelstrom;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.api.MessageSink;
+import accord.messages.Timeout;
+import accord.messages.Callback;
+import accord.messages.Reply;
+import accord.messages.Request;
+import accord.topology.Shards;
+import accord.api.Scheduler;
+
+// TODO: merge with accord.impl.basic.Cluster
+public class Cluster implements Scheduler
+{
+ public interface Queue<T>
+ {
+ void add(T item);
+ void add(T item, long delay, TimeUnit units);
+ T poll();
+ int size();
+ }
+
+ public interface QueueSupplier
+ {
+ <T> Queue<T> get();
+ }
+
+ public static class InstanceSink implements MessageSink
+ {
+ final Id self;
+ final Function<Id, Node> lookup;
+ final Cluster parent;
+ final Random random;
+
+ int nextMessageId = 0;
+ Map<Long, Callback> callbacks = new LinkedHashMap<>();
+
+ public InstanceSink(Id self, Function<Id, Node> lookup, Cluster parent, Random random)
+ {
+ this.self = self;
+ this.lookup = lookup;
+ this.parent = parent;
+ this.random = random;
+ }
+
+ @Override
+ public synchronized void send(Id to, Request send)
+ {
+ parent.add(self, to, Body.SENTINEL_MSG_ID, send);
+ }
+
+ @Override
+ public void send(Id to, Request send, Callback callback)
+ {
+ long messageId = nextMessageId++;
+ callbacks.put(messageId, callback);
+ parent.add(self, to, messageId, send);
+ parent.pending.add((Runnable)() -> {
+ if (callback == callbacks.remove(messageId))
+ callback.onFailure(to, new Timeout());
+ }, 1000 + random.nextInt(10000), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void reply(Id replyToNode, long replyToMessage, Reply reply)
+ {
+ parent.add(self, replyToNode, replyToMessage, reply);
+ }
+ }
+
+ final Function<Id, Node> lookup;
+ final Queue<Object> pending;
+ final Consumer<Packet> responseSink;
+ final Map<Id, InstanceSink> sinks = new HashMap<>();
+ final PrintWriter err;
+ int clock;
+ int recurring;
+ Set<Id> partitionSet;
+
+ public Cluster(QueueSupplier queueSupplier, Function<Id, Node> lookup, Consumer<Packet> responseSink, OutputStream stderr)
+ {
+ this.pending = queueSupplier.get();
+ this.lookup = lookup;
+ this.responseSink = responseSink;
+ this.err = new PrintWriter(stderr);
+ this.partitionSet = new HashSet<>();
+ }
+
+ InstanceSink create(Id self, Random random)
+ {
+ InstanceSink sink = new InstanceSink(self, lookup, this, random);
+ sinks.put(self, sink);
+ return sink;
+ }
+
+ private void add(Packet packet)
+ {
+ err.println(clock++ + " SEND " + packet);
+ err.flush();
+ if (lookup.apply(packet.dest) == null) responseSink.accept(packet);
+ else pending.add(packet);
+ }
+
+ void add(Id from, Id to, long messageId, Request send)
+ {
+ add(new Packet(from, to, messageId, send));
+ }
+
+ void add(Id from, Id to, long replyId, Reply send)
+ {
+ add(new Packet(from, to, replyId, send));
+ }
+
+ public boolean processPending()
+ {
+ if (pending.size() == recurring)
+ return false;
+
+ Object next = pending.poll();
+ if (next == null)
+ return false;
+
+ if (next instanceof Packet)
+ {
+ Packet deliver = (Packet) next;
+ Node on = lookup.apply(deliver.dest);
+ switch (deliver.body.type)
+ {
+ case init:
+ throw new IllegalStateException();
+ case txn:
+ err.println(clock++ + " RECV " + deliver);
+ err.flush();
+ on.receive((MaelstromRequest)deliver.body, deliver.src, deliver.body.msg_id);
+ break;
+ default:
+ // Drop the message if it goes across the partition
+ boolean drop = !(partitionSet.contains(deliver.src) && partitionSet.contains(deliver.dest)
+ || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dest));
+ if (drop)
+ {
+ err.println(clock++ + " DROP " + deliver);
+ err.flush();
+ break;
+ }
+ err.println(clock++ + " RECV " + deliver);
+ err.flush();
+ if (deliver.body.in_reply_to > Body.SENTINEL_MSG_ID)
+ {
+ Reply reply = (Reply)((Wrapper)deliver.body).body;
+ Callback callback = reply.isFinal() ? sinks.get(deliver.dest).callbacks.remove(deliver.body.in_reply_to)
+ : sinks.get(deliver.dest).callbacks.get(deliver.body.in_reply_to);
+ if (callback != null)
+ on.scheduler().now(() -> callback.onSuccess(deliver.src, reply));
+ }
+ else on.receive((Request)((Wrapper)deliver.body).body, deliver.src, deliver.body.msg_id);
+ }
+ }
+ else
+ {
+ ((Runnable) next).run();
+ }
+ return true;
+ }
+
+ class CancellableRunnable implements Runnable, Scheduled
+ {
+ final boolean recurring;
+ final long delay;
+ final TimeUnit units;
+ Runnable run;
+
+ CancellableRunnable(Runnable run, boolean recurring, long delay, TimeUnit units)
+ {
+ this.run = run;
+ this.recurring = recurring;
+ this.delay = delay;
+ this.units = units;
+ }
+
+ @Override
+ public void run()
+ {
+ if (run != null)
+ {
+ run.run();
+ if (recurring)
+ pending.add(this, delay, units);
+ }
+ }
+
+ @Override
+ public void cancel()
+ {
+ run = null;
+ }
+ }
+
+ @Override
+ public Scheduled recurring(Runnable run, long delay, TimeUnit units)
+ {
+ CancellableRunnable result = new CancellableRunnable(run, true, delay, units);
+ ++recurring;
+ pending.add(result, delay, units);
+ return result;
+ }
+
+ @Override
+ public Scheduled once(Runnable run, long delay, TimeUnit units)
+ {
+ CancellableRunnable result = new CancellableRunnable(run, false, delay, units);
+ pending.add(result, delay, units);
+ return result;
+ }
+
+ @Override
+ public void now(Runnable run)
+ {
+ run.run();
+ }
+
+ public static void run(Id[] nodes, QueueSupplier queueSupplier, Consumer<Packet> responseSink, Supplier<Random> randomSupplier, Supplier<LongSupplier> nowSupplier, TopologyFactory topologyFactory, InputStream stdin, OutputStream stderr) throws IOException
+ {
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(stdin)))
+ {
+ run(nodes, queueSupplier, responseSink, randomSupplier, nowSupplier, topologyFactory, () -> {
+ try
+ {
+ return Packet.parse(in.readLine());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }, stderr);
+ }
+ }
+
+ public static void run(Id[] nodes, QueueSupplier queueSupplier, Consumer<Packet> responseSink, Supplier<Random> randomSupplier, Supplier<LongSupplier> nowSupplier, TopologyFactory topologyFactory, Supplier<Packet> in, OutputStream stderr)
+ {
+ Shards shards = topologyFactory.toShards(nodes);
+ Map<Id, Node> lookup = new HashMap<>();
+ Cluster sinks = new Cluster(queueSupplier, lookup::get, responseSink, stderr);
+ for (Id node : nodes)
+ lookup.put(node, new Node(node, shards, shards.forNode(node), sinks.create(node, randomSupplier.get()),
+ randomSupplier.get(), nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks));
+
+ List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
+ sinks.recurring(() ->
+ {
+ Collections.shuffle(nodesList, randomSupplier.get());
+ int partitionSize = randomSupplier.get().nextInt((topologyFactory.rf+1)/2);
+ sinks.partitionSet = new HashSet<>(nodesList.subList(0, partitionSize));
+ }, 5L, TimeUnit.SECONDS);
+
+ Packet next;
+ while ((next = in.get()) != null)
+ sinks.add(next);
+
+ while (sinks.processPending());
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java b/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
new file mode 100644
index 0000000..cb2b1a5
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
@@ -0,0 +1,277 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+import java.util.function.BiFunction;
+import java.util.zip.CRC32C;
+
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+
+public class Datum<D extends Datum<D>> implements Comparable<D>
+{
+ public static final boolean COMPARE_BY_HASH = true;
+
+ public static class Hash implements Comparable<Hash>
+ {
+ final int hash;
+ public Hash(int hash)
+ {
+ this.hash = hash;
+ }
+
+ @Override
+ public int compareTo(Hash that)
+ {
+ return Integer.compare(this.hash, that.hash);
+ }
+
+ public String toString()
+ {
+ return "#" + hash;
+ }
+ }
+
+ public enum Kind
+ {
+ STRING, LONG, DOUBLE, HASH;
+
+ public MaelstromKey[] split(int count)
+ {
+ if (count <= 1)
+ throw new IllegalArgumentException();
+
+ MaelstromKey[] result = new MaelstromKey[count];
+ switch (this)
+ {
+ case STRING:
+ {
+ // use only alphanumeric values to compute ranges
+ long range = 63 * 63 * 63 * 63 + 1;
+ long delta = range / count;
+ for (int i = 0 ; i < count ; ++i)
+ result[i] = new MaelstromKey(this, toString(i * delta));
+ break;
+ }
+ case DOUBLE:
+ {
+ result[0] = new MaelstromKey(Double.NEGATIVE_INFINITY);
+ if (count == 2)
+ {
+ result[1] = new MaelstromKey(0d);
+ }
+ else
+ {
+ double delta = Double.MAX_VALUE * (2d / count);
+ double cur = -Double.MAX_VALUE;
+ for (int i = 1 ; i < count ; ++i)
+ result[i] = new MaelstromKey(cur += delta);
+ }
+ break;
+ }
+ case LONG:
+ {
+ long delta = 2 * (Long.MAX_VALUE / count);
+ long start = Long.MIN_VALUE;
+ for (int i = 0 ; i < count ; ++i)
+ result[i] = new MaelstromKey(this, start + i * delta);
+ break;
+ }
+ case HASH:
+ {
+ int delta = 2 * (Integer.MAX_VALUE / count);
+ int start = Integer.MIN_VALUE;
+ for (int i = 0 ; i < count ; ++i)
+ result[i] = new MaelstromKey(this, new Hash(start + i * delta));
+ break;
+ }
+ }
+ return result;
+ }
+
+ private static final int CHARS = 63;
+
+ private static String toString(long v)
+ {
+ if (v == 0) return "";
+ --v;
+ char[] buf = new char[4];
+ for (int i = 3 ; i >= 0 ; --i)
+ {
+ buf[i] = toChar(v % CHARS);
+ v /= CHARS;
+ }
+ return new String(buf);
+ }
+
+ private static char toChar(long v)
+ {
+ if (v == 0) return ' ';
+ v -= 1;
+ if (v < 10) return (char) ('0' + v);
+ v -= 10;
+ if (v < 26) return (char) ('A' + v);
+ v -= 26;
+ return (char) ('a' + v);
+ }
+
+ }
+
+ public final Kind kind;
+ public final Object value;
+
+ Datum(Kind kind, Object value)
+ {
+ this.kind = kind;
+ this.value = value;
+ }
+
+ public Datum(String value)
+ {
+ this(Kind.STRING, value);
+ }
+
+ public Datum(Long value)
+ {
+ this(Kind.LONG, value);
+ }
+
+ public Datum(Double value)
+ {
+ this(Kind.DOUBLE, value);
+ }
+
+ public Datum(Hash value)
+ {
+ this(Kind.HASH, value);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return value == null ? 0 : hash(value);
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that == this || (that instanceof Datum && equals((Datum) that));
+ }
+
+ public boolean equals(Datum that)
+ {
+ return this.kind.equals(that.kind) && this.value.equals(that.value);
+ }
+
+ @Override
+ public String toString()
+ {
+ return value == null ? kind + ":+Inf" : value.toString();
+ }
+
+ @Override
+ public int compareTo(Datum that)
+ {
+ int c = 0;
+ if (COMPARE_BY_HASH)
+ c = Integer.compare(hash(this.value), hash(that.value));
+ if (c == 0) c = this.kind.compareTo(that.kind);
+ if (c != 0) return c;
+ if (this.value == null || that.value == null)
+ {
+ if (this.value == null && that.value == null)
+ return 0;
+ return this.value == null ? 1 : -1;
+ }
+ return ((Comparable)this.value).compareTo(that.value);
+ }
+
+ static int hash(Object object)
+ {
+ if (object == null)
+ return Integer.MAX_VALUE;
+
+ if (object instanceof Hash)
+ return ((Hash) object).hash;
+
+ CRC32C crc32c = new CRC32C();
+ int i = object.hashCode();
+ crc32c.update(i);
+ crc32c.update(i >> 8);
+ crc32c.update(i >> 16);
+ crc32c.update(i >> 24);
+ return (int)crc32c.getValue();
+ }
+
+ public static Datum read(JsonReader in) throws IOException
+ {
+ return read(in, Datum::new);
+ }
+
+ public void write(JsonWriter out) throws IOException
+ {
+ if (!isSimple())
+ {
+ out.beginArray();
+ out.value(kind.toString());
+ if (kind == Kind.HASH)
+ out.value(((Hash)value).hash);
+ out.endArray();
+ return;
+ }
+ switch (kind)
+ {
+ default: throw new IllegalStateException();
+ case LONG: out.value((Long) value); break;
+ case DOUBLE: out.value((Double) value); break;
+ case STRING: out.value((String) value); break;
+ }
+ }
+
+ public boolean isSimple()
+ {
+ return value != null && kind != Kind.HASH;
+ }
+
+ protected static <V> V read(JsonReader in, BiFunction<Kind, Object, V> constructor) throws IOException
+ {
+ Datum.Kind type;
+ Object value;
+ switch (in.peek())
+ {
+ default:
+ throw new IllegalStateException();
+ case BEGIN_ARRAY:
+ in.beginArray();
+ type = Kind.valueOf(in.nextString());
+ if (type == Kind.HASH) value = new Hash(in.nextInt());
+ else value = null;
+ in.endArray();
+ break;
+ case STRING:
+ value = in.nextString();
+ type = Datum.Kind.STRING;
+ break;
+ case NUMBER:
+ try { value = in.nextLong(); type = Datum.Kind.LONG; }
+ catch (IllegalArgumentException iae) { value = in.nextDouble(); type = Datum.Kind.DOUBLE; }
+ break;
+ }
+ return constructor.apply(type, value);
+ }
+
+ public static final TypeAdapter<Datum> GSON_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Datum value) throws IOException
+ {
+ value.write(out);
+ }
+
+ @Override
+ public Datum read(JsonReader in) throws IOException
+ {
+ return Datum.read(in);
+ }
+ };
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Error.java b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
new file mode 100644
index 0000000..71a368b
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Error.java
@@ -0,0 +1,30 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+
+import accord.maelstrom.Packet.Type;
+import com.google.gson.stream.JsonWriter;
+import accord.messages.Reply;
+
+public class Error extends Body implements Reply
+{
+ final int code;
+ final String text;
+
+ public Error(long in_reply_to, int code, String text)
+ {
+ super(Type.error, SENTINEL_MSG_ID, in_reply_to);
+ this.code = code;
+ this.text = text;
+ }
+
+ @Override
+ void writeBody(JsonWriter out) throws IOException
+ {
+ super.writeBody(out);
+ out.name("code");
+ out.value(code);
+ out.name("text");
+ out.value(text);
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
new file mode 100644
index 0000000..b4dae5e
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -0,0 +1,482 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import accord.local.Node;
+import accord.api.Result;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import accord.local.Node.Id;
+import accord.api.Key;
+import accord.txn.Dependencies;
+import accord.txn.Txn;
+import accord.txn.TxnId;
+import accord.txn.Writes;
+import accord.txn.Ballot;
+import accord.txn.Keys;
+import accord.txn.Timestamp;
+import accord.messages.ReadData.ReadOk;
+
+public class Json
+{
+ public static final Gson GSON;
+ public static final TypeAdapter<Object> DEFAULT_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Object value)
+ {
+ GSON.toJson(value, Object.class, out);
+ }
+
+ @Override
+ public Object read(JsonReader in)
+ {
+ return GSON.fromJson(in, Object.class);
+ }
+ };
+
+ public static final TypeAdapter<Id> ID_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Id value) throws IOException
+ {
+ if (value.id == 0) out.nullValue();
+ else out.value(Json.toString(value));
+ }
+
+ @Override
+ public Id read(JsonReader in) throws IOException
+ {
+ if (in.peek() == JsonToken.NULL)
+ {
+ in.nextNull();
+ return Id.NONE;
+ }
+
+ return parseId(in.nextString());
+ }
+ };
+
+ public static Id parseId(String id)
+ {
+ switch (id.charAt(0))
+ {
+ case 'c': return new Id(-Long.parseLong(id.substring(1)));
+ case 'n':return new Id( Long.parseLong(id.substring(1)));
+ default: throw new IllegalStateException();
+ }
+ }
+
+ public static String toString(Id id)
+ {
+ if (id.id < 0) return "c" + id.id;
+ else return "n" + id.id;
+ }
+
+ public static final TypeAdapter<Timestamp> TIMESTAMP_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Timestamp value) throws IOException
+ {
+ if (value == null) out.nullValue();
+ else writeTimestamp(out, value);
+ }
+
+ @Override
+ public Timestamp read(JsonReader in) throws IOException
+ {
+ if (in.peek() == JsonToken.NULL)
+ {
+ in.nextNull();
+ return null;
+ }
+ return readTimestamp(in, Timestamp::new);
+ }
+ };
+
+ private interface TimestampFactory<T>
+ {
+ T create(long real, int logical, Id node);
+ }
+
+ private static <T> T readTimestamp(JsonReader in, TimestampFactory<T> factory) throws IOException
+ {
+ in.beginArray();
+ long real = in.nextLong();
+ int logical = in.nextInt();
+ Id node = ID_ADAPTER.read(in);
+ in.endArray();
+ return factory.create(real, logical, node);
+ }
+
+ private static void writeTimestamp(JsonWriter out, Timestamp timestamp) throws IOException
+ {
+ out.beginArray();
+ out.value(timestamp.real);
+ out.value(timestamp.logical);
+ ID_ADAPTER.write(out, timestamp.node);
+ out.endArray();
+ }
+
+ public static final TypeAdapter<TxnId> TXNID_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, TxnId value) throws IOException
+ {
+ writeTimestamp(out, value);
+ }
+
+ @Override
+ public TxnId read(JsonReader in) throws IOException
+ {
+ return readTimestamp(in, TxnId::new);
+ }
+ };
+
+ public static final TypeAdapter<Ballot> BALLOT_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Ballot value) throws IOException
+ {
+ writeTimestamp(out, value);
+ }
+
+ @Override
+ public Ballot read(JsonReader in) throws IOException
+ {
+ return readTimestamp(in, Ballot::new);
+ }
+ };
+
+
+ public static final TypeAdapter<Keys> KEYS_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Keys value) throws IOException
+ {
+ out.beginArray();
+ for (Key key : value)
+ ((MaelstromKey)key).write(out);
+ out.endArray();
+ }
+
+ @Override
+ public Keys read(JsonReader in) throws IOException
+ {
+ List<MaelstromKey> keys = new ArrayList<>();
+ in.beginArray();
+ while (in.hasNext())
+ keys.add(MaelstromKey.read(in));
+ in.endArray();
+ return new Keys(keys.toArray(Key[]::new));
+ }
+ };
+
+ public static final TypeAdapter<Txn> TXN_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Txn txn) throws IOException
+ {
+ if (txn == null)
+ {
+ out.nullValue();
+ return;
+ }
+
+ Keys keys = txn.keys;
+ MaelstromRead read = (MaelstromRead) txn.read;
+ MaelstromUpdate update = (MaelstromUpdate) txn.update;
+
+ out.beginObject();
+ out.name("r");
+ out.beginArray();
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ MaelstromKey key = (MaelstromKey) keys.get(i);
+ if (read.keys.indexOf(key) >= 0)
+ {
+ key.write(out);
+ }
+ }
+ out.endArray();
+ out.name("append");
+ out.beginArray();
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ MaelstromKey key = (MaelstromKey) keys.get(i);
+ if (update != null && update.containsKey(key))
+ {
+ out.beginArray();
+ key.write(out);
+ update.get(key).write(out);
+ out.endArray();
+ }
+ }
+ out.endArray();
+ out.name("client");
+ out.value(((MaelstromQuery)txn.query).client.id);
+ out.name("requestId");
+ out.value(((MaelstromQuery)txn.query).requestId);
+ out.endObject();
+ }
+
+ @Override
+ public Txn read(JsonReader in) throws IOException
+ {
+ if (in.peek() == JsonToken.NULL)
+ return null;
+
+ NavigableSet<Key> buildReadKeys = new TreeSet<>();
+ NavigableSet<Key> buildKeys = new TreeSet<>();
+ MaelstromUpdate update = new MaelstromUpdate();
+
+ Node.Id client = null;
+ long requestId = Long.MIN_VALUE;
+ in.beginObject();
+ while (in.hasNext())
+ {
+ String kind = in.nextName();
+ switch (kind)
+ {
+ default: throw new IllegalStateException("Invalid kind: " + kind);
+ case "r":
+ in.beginArray();
+ while (in.hasNext())
+ buildReadKeys.add(MaelstromKey.read(in));
+ in.endArray();
+ break;
+ case "append":
+ in.beginArray();
+ while (in.hasNext())
+ {
+ in.beginArray();
+ Key key = MaelstromKey.read(in);
+ buildKeys.add(key);
+ Value append = Value.read(in);
+ update.put(key, append);
+ in.endArray();
+ }
+ in.endArray();
+ break;
+ case "client":
+ client = ID_ADAPTER.read(in);
+ break;
+ case "requestId":
+ requestId = in.nextLong();
+ break;
+ }
+ }
+ in.endObject();
+
+ if (client == null)
+ throw new IllegalStateException();
+
+ buildKeys.addAll(buildReadKeys);
+ Keys readKeys = new Keys(buildReadKeys);
+ Keys keys = new Keys(buildKeys);
+ MaelstromRead read = new MaelstromRead(keys);
+ MaelstromQuery query = new MaelstromQuery(client, requestId, readKeys, update);
+
+ return new Txn(keys, read, query, update);
+ }
+ };
+
+ public static final TypeAdapter<Dependencies> DEPS_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Dependencies value) throws IOException
+ {
+ out.beginArray();
+ for (Map.Entry<TxnId, Txn> e : value.deps.entrySet())
+ {
+ out.beginArray();
+ GSON.toJson(e.getKey(), TxnId.class, out);
+ TXN_ADAPTER.write(out, e.getValue());
+ out.endArray();
+ }
+ out.endArray();
+ }
+
+ @Override
+ public Dependencies read(JsonReader in) throws IOException
+ {
+ NavigableMap<TxnId, Txn> deps = new TreeMap<>();
+ in.beginArray();
+ if (!in.hasNext())
+ {
+ in.endArray();
+ return new Dependencies();
+ }
+
+ while (in.hasNext())
+ {
+ in.beginArray();
+ TxnId txnId = GSON.fromJson(in, TxnId.class);
+ Txn txn = TXN_ADAPTER.read(in);
+ deps.put(txnId, txn);
+ in.endArray();
+ }
+ in.endArray();
+ return new Dependencies(deps);
+ }
+ };
+
+ public static final TypeAdapter<Writes> TXN_WRITES_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Writes value) throws IOException
+ {
+ if (value == null)
+ {
+ out.nullValue();
+ return;
+ }
+ out.beginObject();
+ out.name("executeAt");
+ GSON.toJson(value.executeAt, Timestamp.class, out);
+ out.name("keys");
+ Keys keys = value.keys;
+ KEYS_ADAPTER.write(out, keys);
+ out.name("writes");
+ MaelstromWrite write = (MaelstromWrite) value.write;
+ out.beginArray();
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ Value append = write.get(keys.get(i));
+ if (append == null) out.nullValue();
+ else append.write(out);
+ }
+ out.endArray();
+ out.endObject();
+ }
+
+ @Override
+ public Writes read(JsonReader in) throws IOException
+ {
+ if (in.peek() == JsonToken.NULL)
+ return null;
+
+ in.beginObject();
+ Timestamp executeAt = null;
+ Keys keys = null;
+ List<Value> writes = null;
+ while (in.hasNext())
+ {
+ switch (in.nextName())
+ {
+ default: throw new IllegalStateException();
+ case "executeAt":
+ executeAt = GSON.fromJson(in, Timestamp.class);
+ break;
+ case "keys":
+ keys = KEYS_ADAPTER.read(in);
+ break;
+ case "writes":
+ writes = new ArrayList<>();
+ in.beginArray();
+ while (in.hasNext())
+ writes.add(Value.read(in));
+ in.endArray();
+ break;
+ }
+ }
+ in.endObject();
+
+ MaelstromWrite write = new MaelstromWrite();
+ if (writes != null)
+ {
+ for (int i = 0 ; i < writes.size() ; ++i)
+ {
+ if (writes.get(i) != null)
+ write.put(keys.get(i), writes.get(i));
+ }
+ }
+ return new Writes(executeAt, keys, write);
+ }
+ };
+
+ public static final TypeAdapter<ReadOk> READ_OK_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, ReadOk value) throws IOException
+ {
+ out.beginArray();
+ for (Map.Entry<Key, Value> e : ((MaelstromData)value.data).entrySet())
+ {
+ out.beginArray();
+ ((MaelstromKey)e.getKey()).write(out);
+ e.getValue().write(out);
+ out.endArray();
+ }
+ out.endArray();
+ }
+
+ @Override
+ public ReadOk read(JsonReader in) throws IOException
+ {
+ MaelstromData result = new MaelstromData();
+ in.beginArray();
+ while (in.hasNext())
+ {
+ in.beginArray();
+ MaelstromKey key = MaelstromKey.read(in);
+ Value value = Value.read(in);
+ result.put(key, value);
+ in.endArray();
+ }
+ in.endArray();
+ return new ReadOk(result);
+ }
+ };
+
+ static final TypeAdapter FAIL = new TypeAdapter()
+ {
+ @Override
+ public void write(JsonWriter out, Object value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object read(JsonReader in)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ static
+ {
+ // TODO: Maelstrom hooks should be registered at run-time to permit separate tree
+ GSON = new GsonBuilder().registerTypeAdapter(Packet.class, Packet.GSON_ADAPTER)
+ .registerTypeAdapter(Id.class, ID_ADAPTER)
+ .registerTypeAdapter(Txn.class, TXN_ADAPTER)
+ .registerTypeAdapter(Ballot.class, BALLOT_ADAPTER)
+ .registerTypeAdapter(TxnId.class, TXNID_ADAPTER)
+ .registerTypeAdapter(Timestamp.class, TIMESTAMP_ADAPTER)
+ .registerTypeAdapter(Datum.class, Datum.GSON_ADAPTER)
+ .registerTypeAdapter(MaelstromKey.class, MaelstromKey.GSON_ADAPTER)
+ .registerTypeAdapter(Value.class, Value.GSON_ADAPTER)
+ .registerTypeAdapter(Writes.class, TXN_WRITES_ADAPTER)
+ .registerTypeAdapter(MaelstromResult.class, MaelstromResult.GSON_ADAPTER)
+ .registerTypeAdapter(ReadOk.class, READ_OK_ADAPTER)
+ .registerTypeAdapter(Dependencies.class, Json.DEPS_ADAPTER)
+ .registerTypeAdapter(Keys.class, KEYS_ADAPTER)
+ .registerTypeAdapter(Body.class, Body.FAIL_READ)
+ .registerTypeAdapter(Result.class, MaelstromResult.GSON_ADAPTER)
+ .registerTypeAdapter(MaelstromRequest.class, Body.FAIL_READ)
+ .registerTypeAdapter(MaelstromReply.class, Body.FAIL_READ)
+ .create();
+ }
+
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
new file mode 100644
index 0000000..e25e617
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -0,0 +1,28 @@
+package accord.maelstrom;
+
+import accord.local.Node;
+import accord.api.Agent;
+import accord.api.Result;
+import accord.local.Command;
+import accord.txn.Timestamp;
+
+public class MaelstromAgent implements Agent
+{
+ static final MaelstromAgent INSTANCE = new MaelstromAgent();
+
+ @Override
+ public void onRecover(Node node, Result success, Throwable fail)
+ {
+ if (success != null)
+ {
+ MaelstromResult result = (MaelstromResult) success;
+ node.reply(result.client, result.requestId, new MaelstromReply(result.requestId, result));
+ }
+ }
+
+ @Override
+ public void onInconsistentTimestamp(Command command, Timestamp prev, Timestamp next)
+ {
+ throw new AssertionError();
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java
new file mode 100644
index 0000000..ef2cb62
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java
@@ -0,0 +1,16 @@
+package accord.maelstrom;
+
+import java.util.TreeMap;
+
+import accord.api.Data;
+import accord.api.Key;
+
+public class MaelstromData extends TreeMap<Key, Value> implements Data
+{
+ @Override
+ public Data merge(Data data)
+ {
+ this.putAll(((MaelstromData)data));
+ return this;
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromInit.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromInit.java
new file mode 100644
index 0000000..feea0c6
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromInit.java
@@ -0,0 +1,33 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+
+import accord.maelstrom.Packet.Type;
+import com.google.gson.stream.JsonWriter;
+import accord.local.Node.Id;
+
+public class MaelstromInit extends Body
+{
+ final Id self;
+ final Id[] cluster;
+
+ public MaelstromInit(long msg_id, Id self, Id[] cluster)
+ {
+ super(Type.init, msg_id, SENTINEL_MSG_ID);
+ this.self = self;
+ this.cluster = cluster;
+ }
+
+ @Override
+ void writeBody(JsonWriter out) throws IOException
+ {
+ super.writeBody(out);
+ out.name("node_id");
+ out.value(self.id);
+ out.name("node_ids");
+ out.beginArray();
+ for (Id node : cluster)
+ out.value(node.id);
+ out.endArray();
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
new file mode 100644
index 0000000..a16ccd2
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -0,0 +1,57 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+
+import accord.api.Key;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+
+public class MaelstromKey extends Datum<MaelstromKey> implements Key<MaelstromKey>
+{
+ public MaelstromKey(Kind kind, Object value)
+ {
+ super(kind, value);
+ }
+
+ public MaelstromKey(String value)
+ {
+ super(value);
+ }
+
+ public MaelstromKey(Long value)
+ {
+ super(value);
+ }
+
+ public MaelstromKey(Double value)
+ {
+ super(value);
+ }
+
+ @Override
+ public int compareTo(MaelstromKey that)
+ {
+ return compareTo((Datum) that);
+ }
+
+ public static MaelstromKey read(JsonReader in) throws IOException
+ {
+ return read(in, MaelstromKey::new);
+ }
+
+ public static final TypeAdapter<MaelstromKey> GSON_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, MaelstromKey value) throws IOException
+ {
+ value.write(out);
+ }
+
+ @Override
+ public MaelstromKey read(JsonReader in) throws IOException
+ {
+ return MaelstromKey.read(in);
+ }
+ };
+ }
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
new file mode 100644
index 0000000..868f51d
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromQuery.java
@@ -0,0 +1,36 @@
+package accord.maelstrom;
+
+import java.util.Map;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.api.Data;
+import accord.api.Key;
+import accord.api.Query;
+import accord.api.Result;
+import accord.txn.Keys;
+
+public class MaelstromQuery implements Query
+{
+ final Node.Id client;
+ final long requestId;
+ final Keys read;
+ final MaelstromUpdate update; // we have to return the writes as well for some reason
+
+ public MaelstromQuery(Id client, long requestId, Keys read, MaelstromUpdate update)
+ {
+ this.client = client;
+ this.requestId = requestId;
+ this.read = read;
+ this.update = update;
+ }
+
+ @Override
+ public Result compute(Data data)
+ {
+ Value[] values = new Value[read.size()];
+ for (Map.Entry<Key, Value> e : ((MaelstromData)data).entrySet())
+ values[read.indexOf(e.getKey())] = e.getValue();
+ return new MaelstromResult(client, requestId, read, values, update);
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
new file mode 100644
index 0000000..5a2630f
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -0,0 +1,27 @@
+package accord.maelstrom;
+
+import accord.api.Data;
+import accord.api.Key;
+import accord.api.Store;
+import accord.api.Read;
+import accord.txn.Keys;
+
+public class MaelstromRead implements Read
+{
+ final Keys keys;
+
+ public MaelstromRead(Keys keys)
+ {
+ this.keys = keys;
+ }
+
+ @Override
+ public Data read(Key start, Key end, Store store)
+ {
+ MaelstromStore s = (MaelstromStore)store;
+ MaelstromData result = new MaelstromData();
+ for (int i = keys.ceilIndex(start), limit = keys.ceilIndex(end) ; i < limit ; ++i)
+ result.put(keys.get(i), s.get(keys.get(i)));
+ return result;
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java
new file mode 100644
index 0000000..593e6ff
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReply.java
@@ -0,0 +1,100 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import accord.local.Node;
+import accord.api.Key;
+import accord.txn.Keys;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import accord.maelstrom.Packet.Type;
+import accord.messages.Reply;
+
+public class MaelstromReply extends Body implements Reply
+{
+ final MaelstromResult result;
+
+ public MaelstromReply(long in_reply_to, MaelstromResult result)
+ {
+ super(Type.txn_ok, SENTINEL_MSG_ID, in_reply_to);
+ this.result = result;
+ }
+
+ @Override
+ void writeBody(JsonWriter out) throws IOException
+ {
+ super.writeBody(out);
+ out.name("txn");
+ Keys keys = result.keys;
+ Value[] reads = result.read;
+ MaelstromUpdate update = result.update;
+ out.beginArray();
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ MaelstromKey key = (MaelstromKey) keys.get(i);
+ if (reads[i] != null)
+ {
+ out.beginArray();
+ out.value("r");
+ key.write(out);
+ reads[i].writeVerbose(out);
+ out.endArray();
+ }
+ if (update != null && update.containsKey(key))
+ {
+ for (Datum append : update.get(key).contents)
+ {
+ out.beginArray();
+ out.value("append");
+ key.write(out);
+ append.write(out);
+ out.endArray();
+ }
+ }
+ }
+ out.endArray();
+ }
+
+ public static MaelstromResult readResultExternal(JsonReader in, Node.Id client, long requestId) throws IOException
+ {
+ if (in.peek() == JsonToken.NULL)
+ return null;
+
+ NavigableMap<Key, Value> reads = new TreeMap<>();
+ MaelstromUpdate update = new MaelstromUpdate();
+ in.beginArray();
+ while (in.hasNext())
+ {
+ in.beginArray();
+ String op = in.nextString();
+ Key key = MaelstromKey.read(in);
+ switch (op)
+ {
+ default: throw new IllegalStateException("Invalid op: " + op);
+ case "r":
+ {
+ Value value = Value.read(in);
+ reads.put(key, value);
+ break;
+ }
+ case "append":
+ Datum value = Datum.read(in);
+ update.merge(key, new Value(value), Value::append);
+ }
+ in.endArray();
+ }
+ in.endArray();
+
+ for (Key key : update.keySet())
+ reads.putIfAbsent(key, null);
+
+ Keys keys = new Keys(reads.keySet());
+ Value[] values = reads.values().toArray(new Value[0]);
+
+ return new MaelstromResult(client, requestId, keys, values, update);
+ }
+
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
new file mode 100644
index 0000000..9ba99eb
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
@@ -0,0 +1,119 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import accord.api.Key;
+import accord.txn.Keys;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import accord.local.Node;
+import accord.local.Node.Id;
+import accord.txn.Txn;
+import accord.maelstrom.Packet.Type;
+import accord.messages.Request;
+
+public class MaelstromRequest extends Body implements Request
+{
+ final Txn txn;
+
+ public MaelstromRequest(long msg_id, Txn txn)
+ {
+ super(Type.txn, msg_id, SENTINEL_MSG_ID);
+ this.txn = txn;
+ }
+
+ public void process(Node node, Id client, long messageId)
+ {
+ node.coordinate(txn).handle((success, fail) -> {
+ if (success != null) node.reply(client, messageId, new MaelstromReply(messageId, (MaelstromResult) success));
+// else node.reply(client, messageId, new Error(messageId, 13, fail.getMessage()));
+ return null;
+ });
+ }
+
+ @Override
+ void writeBody(JsonWriter out) throws IOException
+ {
+ super.writeBody(out);
+ out.name("txn");
+ writeTxnExternal(out, txn);
+ }
+
+ static void writeTxnExternal(JsonWriter out, Txn txn) throws IOException
+ {
+ if (txn == null)
+ {
+ out.nullValue();
+ return;
+ }
+
+ out.beginArray();
+ Keys keys = txn.keys;
+ MaelstromQuery query = (MaelstromQuery) txn.query;
+ MaelstromUpdate update = (MaelstromUpdate) txn.update;
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ MaelstromKey key = (MaelstromKey) keys.get(i);
+ if (query.read.indexOf(key) >= 0)
+ {
+ out.beginArray();
+ out.value("r");
+ key.write(out);
+ out.nullValue();
+ out.endArray();
+ }
+ if (update.containsKey(key))
+ {
+ out.beginArray();
+ out.value("append");
+ key.write(out);
+ update.get(key).write(out);
+ out.endArray();
+ }
+ }
+ out.endArray();
+ }
+
+ public static Txn readTxnExternal(JsonReader in, Node.Id client, long requestId) throws IOException
+ {
+ if (in.peek() == JsonToken.NULL)
+ return null;
+
+ NavigableSet<Key> buildReadKeys = new TreeSet<>();
+ NavigableSet<Key> buildKeys = new TreeSet<>();
+ MaelstromUpdate update = new MaelstromUpdate();
+ in.beginArray();
+ while (in.hasNext())
+ {
+ in.beginArray();
+ String op = in.nextString();
+ Key key = MaelstromKey.read(in);
+ switch (op)
+ {
+ default: throw new IllegalStateException("Invalid op: " + op);
+ case "r":
+ in.nextNull();
+ buildReadKeys.add(key);
+ break;
+ case "append":
+ Datum value = Datum.read(in);
+ buildKeys.add(key);
+ update.merge(key, new Value(value), Value::append);
+ }
+ in.endArray();
+ }
+ in.endArray();
+
+ buildKeys.addAll(buildReadKeys);
+ Keys readKeys = new Keys(buildReadKeys);
+ Keys keys = new Keys(buildKeys);
+ MaelstromRead read = new MaelstromRead(keys);
+ MaelstromQuery query = new MaelstromQuery(client, requestId, readKeys, update);
+
+ return new Txn(keys, read, query, update);
+ }
+
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java
new file mode 100644
index 0000000..cf5f06c
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromResult.java
@@ -0,0 +1,147 @@
+package accord.maelstrom;
+
+import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import accord.local.Node;
+import accord.local.Node.Id;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import accord.api.Key;
+import accord.api.Result;
+import accord.txn.Keys;
+
+public class MaelstromResult implements Result
+{
+ final Node.Id client;
+ final long requestId;
+ final Keys keys;
+ final Value[] read;
+ final MaelstromUpdate update;
+
+ public MaelstromResult(Id client, long requestId, Keys keys, Value[] read, MaelstromUpdate update)
+ {
+ this.client = client;
+ this.requestId = requestId;
+ this.keys = keys;
+ this.read = read;
+ this.update = update;
+ }
+
+ public static final TypeAdapter<Result> GSON_ADAPTER = new TypeAdapter<>()
+ {
+ @Override
+ public void write(JsonWriter out, Result value) throws IOException
+ {
+ if (value == null)
+ {
+ out.nullValue();
+ return;
+ }
+
+ MaelstromResult result = (MaelstromResult) value;
+ Keys keys = result.keys;
+ Value[] reads = result.read;
+ MaelstromUpdate update = result.update;
+ out.beginObject();
+ out.name("r");
+ out.beginArray();
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ MaelstromKey key = (MaelstromKey) keys.get(i);
+ if (reads[i] != null)
+ {
+ out.beginArray();
+ key.write(out);
+ reads[i].write(out);
+ out.endArray();
+ }
+ }
+ out.endArray();
+ out.name("append");
+ out.beginArray();
+ for (int i = 0 ; i < keys.size() ; ++i)
+ {
+ MaelstromKey key = (MaelstromKey) keys.get(i);
+ if (update != null && update.containsKey(key))
+ {
+ out.beginArray();
+ key.write(out);
+ update.get(key).write(out);
+ out.endArray();
+ }
+ }
+ out.endArray();
+ out.name("client");
+ out.value(result.client.id);
+ out.name("requestId");
+ out.value(result.requestId);
+ out.endObject();
+ }
+
+ @Override
+ public Result read(JsonReader in) throws IOException
+ {
+ if (in.peek() == JsonToken.NULL)
+ return null;
+
+ Node.Id client = null;
+ long requestId = Long.MIN_VALUE;
+ NavigableMap<Key, Value> reads = new TreeMap<>();
+ MaelstromUpdate update = new MaelstromUpdate();
+ in.beginObject();
+ while (in.hasNext())
+ {
+ String kind = in.nextName();
+ switch (kind)
+ {
+ default: throw new IllegalStateException("Invalid kind: " + kind);
+ case "r":
+ in.beginArray();
+ while (in.hasNext())
+ {
+ in.beginArray();
+ Key key = MaelstromKey.read(in);
+ Value value = Value.read(in);
+ reads.put(key, value);
+ in.endArray();
+ }
+ in.endArray();
+ break;
+ case "append":
+ in.beginArray();
+ while (in.hasNext())
+ {
+ in.beginArray();
+ Key key = MaelstromKey.read(in);
+ Value append = Value.read(in);
+ update.put(key, append);
+ in.endArray();
+ }
+ in.endArray();
+ break;
+ case "client":
+ client = Json.ID_ADAPTER.read(in);
+ break;
+ case "requestId":
+ requestId = in.nextLong();
+ break;
+ }
+ }
+ in.endObject();
+
+ if (client == null)
+ throw new IllegalStateException();
+
+ for (Key key : update.keySet())
+ reads.putIfAbsent(key, null);
+
+ Keys keys = new Keys(reads.keySet());
+ Value[] values = reads.values().toArray(new Value[0]);
+ return new MaelstromResult(client, requestId, keys, values, update);
+ }
+ };
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
new file mode 100644
index 0000000..23051e5
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromStore.java
@@ -0,0 +1,25 @@
+package accord.maelstrom;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import accord.api.Key;
+import accord.api.Store;
+import accord.utils.Timestamped;
+
+public class MaelstromStore implements Store
+{
+ final Map<Key, Timestamped<Value>> data = new ConcurrentHashMap<>();
+
+ public Value read(Key key)
+ {
+ Timestamped<Value> v = data.get(key);
+ return v == null ? Value.EMPTY : v.data;
+ }
+
+ public Value get(Key key)
+ {
+ Timestamped<Value> v = data.get(key);
+ return v == null ? Value.EMPTY : v.data;
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromUpdate.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromUpdate.java
new file mode 100644
index 0000000..483797b
--- /dev/null
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromUpdate.java
@@ -0,0 +1,21 @@
+package accord.maelstrom;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import accord.api.Key;
+import accord.api.Data;
+import accord.api.Update;
+
+public class MaelstromUpdate extends TreeMap<Key, Value> implements Update
+{
+ @Override
+ public MaelstromWrite apply(Data read)
+ {
+ MaelstromWrite write = new MaelstromWrite();
+ Map<Key, Value> data = (MaelstromData)read;
+ for (Map.Entry<Key, Value> e : entrySet())
+ write.put(e.getKey(), data.get(e.getKey()).append(e.getValue()));
+ return write;
+ }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
new file mode 100644
index 0000000..5adbd9d
--- /dev/null
... 1297 lines suppressed ...
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org