You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "aweisberg (via GitHub)" <gi...@apache.org> on 2023/02/01 20:53:04 UTC

[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093644771


##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -374,9 +528,9 @@ public void run()
             }
         }
 
-        class AsyncState extends State implements SafeCommandStore
+        class AsyncState extends InMemoryCommandStore.State implements SafeCommandStore

Review Comment:
   Is removing the import an improvement?



##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())

Review Comment:
   Should we expire? 



##########
accord-core/src/main/java/accord/coordinate/Propose.java:
##########
@@ -128,12 +131,7 @@ public void onCallbackFailure(Id from, Throwable failure)
         callback.accept(null, failure);
     }
 
-    private void onAccepted()
-    {
-        isDone = true;
-        Deps deps = Deps.merge(acceptOks, ok -> ok.deps);
-        Execute.execute(node, txnId, txn, route, executeAt, deps, callback);
-    }
+    abstract void onAccepted();
 
     // A special version for proposing the invalidation of a transaction; only needs to succeed on one shard
     static class Invalidate implements Callback<AcceptReply>

Review Comment:
   Should invalidate be pulled out into a separate class now?



##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -152,16 +164,18 @@ public RecoverReply reduce(RecoverReply r1, RecoverReply r2)
         if (!ok1.status.hasBeen(PreAccepted)) throw new IllegalStateException();
 
         PartialDeps deps = ok1.deps.with(ok2.deps);
+        PartialDeps acceptedDeps = ok1.deps == ok1.acceptedDeps && ok2.deps == ok2.acceptedDeps ? deps : ok1.acceptedDeps.with(ok2.acceptedDeps);

Review Comment:
   If one or more of them is `PartialDeps.NONE` why are we doing `with`? To make sure we pick whichever one might not be `NONE` or get double `NONE` merged?



##########
accord-core/src/main/java/accord/primitives/Timestamp.java:
##########
@@ -19,12 +19,16 @@
 package accord.primitives;
 
 import accord.local.Node.Id;
-import accord.utils.Invariants;
+
+import static accord.utils.Invariants.checkArgument;
 
 import javax.annotation.Nonnull;
 
 public class Timestamp implements Comparable<Timestamp>
 {
+    private static final int REJECTED_FLAG = 0x8000;

Review Comment:
   Should `REJECTED_FLAG` be the same as `MERGE_FLAGS`?



##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)

Review Comment:
   Anonymously subclassing `Propose` to override `onAccepted` is moderately weird, but sure. For `ProposeAndExecute` there is a class.



##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -54,6 +63,36 @@ public int id()
         return id;
     }
 
+    protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+    {
+        this.rejectBefore = newRejectBefore;
+    }
+
+    Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys, SafeCommandStore safeStore)
+    {
+        NodeTimeService time = safeStore.time();
+        boolean isExpired = agent().isExpired(txnId, safeStore.time().now());

Review Comment:
   Maybe rename `isExpired` to `shouldReject`?



##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   We send the apply to avoid having this transaction block others, and we don't need a response because it's fine if this transaction needs recovery?



##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();

Review Comment:
   We don't need to collect deps anymore? Does this mean we always enter `Recover` after reaching a quorum from each shard?



##########
accord-core/src/main/java/accord/coordinate/Propose.java:
##########
@@ -74,18 +73,19 @@ class Propose implements Callback<AcceptReply>
         this.acceptTracker = new QuorumTracker(topologies);
     }
 
-    public static void propose(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,
-                               Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
+    public static void propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,

Review Comment:
   Unused now 



##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();

Review Comment:
   Why do we have this distinction here for this transaction type? (needs comment)
   
   Seems like this is the one case where we are ok with accepted deps being null, but for a regular transaction it's an invariant that it not be null?
   
   So a `proposeDeps` transaction must arrive here with less information, but still want to proceed to invalidation? Why is that?



##########
accord-core/src/main/java/accord/local/SafeCommandStore.java:
##########
@@ -66,7 +70,37 @@
         EXECUTES_AFTER
     }
     enum TestDep { WITH, WITHOUT, ANY_DEPS }
-    enum TestKind { Ws, RorWs }
+    enum TestKind implements Predicate<Txn.Kind>

Review Comment:
   So with this do sync points only conflict in one direction and basically not block other transactions? Nice.



##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -216,19 +230,21 @@ public static class RecoverOk extends RecoverReply
         public final Ballot accepted;
         public final Timestamp executeAt;
         public final PartialDeps deps;
+        public final PartialDeps acceptedDeps; // only those deps that have previously been proposed

Review Comment:
   Good place to comment on when/why the distinction matters, but it if it can/should be covered where this is used that is also fine



##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -53,6 +66,16 @@ public boolean isRead()
             return this == Read;
         }
 
+        /**
+         * Note that it is only possible to do this for transactions whose execution time is not dependent on
+         * others, i.e. where we may safely propose executeAt = txnId regardless of when it is witnessed by

Review Comment:
   How does a transaction end up being that way? What transaction could safely propose a txnId of 0 at any time?
   
   Also what is "this"? :-)
   
   It's proposing invalidations, don't those need to properly order with what already occurred?



##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -297,7 +313,7 @@ private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore comm
     {
         try (Deps.Builder builder = Deps.builder())
         {
-            commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
+            commandStore.mapReduce(keys, ranges, Any, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,

Review Comment:
   Why use `Any` for recovery? What is the interaction you are looking for with sync transactions?
   
   Does recovery force them to order with sync transactions, but during regular execution we don't?



##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();
+                    else deps = mergeDeps();
+
+                    if (deps != null)
+                    {
+                        // TODO (desired, behaviour): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour
+                        propose(acceptOrCommit.executeAt, deps);
+                        return;
+                    }
+
+                    // if we propose deps, and cannot assemble a complete set, then we never reached consensus and can be invalidated

Review Comment:
   What does it mean to propose deps?



##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   Precise collection needs elaboration, what you mean is that after `Preaccept` we have a precise set because we are fine with extra dependencies?
   
   Don't all transactions determine a precise set of dependencies in order to determine their `executeAt`?



##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,
+         * so that is effect on invalidation of earlier transactions is durable. This is most useful for ExclusiveSyncPoint.

Review Comment:
   This last half really confuses me, `SyncPoint` shouldn't invalidate? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org