You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2023/01/17 10:04:21 UTC

[GitHub] [cassandra-accord] belliottsmith opened a new pull request, #26: ExclusiveSyncPoint

belliottsmith opened a new pull request, #26:
URL: https://github.com/apache/cassandra-accord/pull/26

   Introduce a special kind of `ExclusiveSyncPoint` transaction that invalidates all `TxnId` lower than it that are not witnessed by it. This permits bootstrapping nodes to ignore all `TxnId` older than this when starting their log.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093765145


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,
+         * so that is effect on invalidation of earlier transactions is durable. This is most useful for ExclusiveSyncPoint.

Review Comment:
   An `ExclusiveSyncPoint` invalidates all `TxnId` lower than it that aren't witnessed by it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094942728


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   I am figuring through how regular transactions don't agree on their deps, so with 1RT yeah that makes sense.
   
   There are no changes to how `Accept` handles deps so I was trying to understand what it means for these transactions to agree on deps, but regular transactions don't. So one different is we always run accept, and I guess in the future you mean that accept won't persist dependencies for regular transactions (recovery will go find them again?), but we will continue to persist them for barriers so when they recover the precise set of dependencies is returned. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093760064


##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -152,16 +164,18 @@ public RecoverReply reduce(RecoverReply r1, RecoverReply r2)
         if (!ok1.status.hasBeen(PreAccepted)) throw new IllegalStateException();
 
         PartialDeps deps = ok1.deps.with(ok2.deps);
+        PartialDeps acceptedDeps = ok1.deps == ok1.acceptedDeps && ok2.deps == ok2.acceptedDeps ? deps : ok1.acceptedDeps.with(ok2.acceptedDeps);

Review Comment:
   If either are NONE then the operation bails out cheaply. We don't know if either of them are NONE though, they might both be full deps. Perhaps I'm misunderstanding. All I'm doing with the ternary is avoiding calling `with` twice if I can reuse the prior result.



##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -216,19 +230,21 @@ public static class RecoverOk extends RecoverReply
         public final Ballot accepted;
         public final Timestamp executeAt;
         public final PartialDeps deps;
+        public final PartialDeps acceptedDeps; // only those deps that have previously been proposed

Review Comment:
   Yep, agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093759222


##########
accord-core/src/main/java/accord/local/SafeCommandStore.java:
##########
@@ -66,7 +70,37 @@
         EXECUTES_AFTER
     }
     enum TestDep { WITH, WITHOUT, ANY_DEPS }
-    enum TestKind { Ws, RorWs }
+    enum TestKind implements Predicate<Txn.Kind>

Review Comment:
   Yep, that's the idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094928867


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   Yep, my expectation is that the coordinator would register a listener for the application of the transaction locally. The apply at remote replicas is just a matter of cleaning up really. We do not need any remote replica to apply in order to achieve our goal, so we can return immediately upon reaching consensus and use our dependencies for our purposes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094859003


##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -152,16 +164,18 @@ public RecoverReply reduce(RecoverReply r1, RecoverReply r2)
         if (!ok1.status.hasBeen(PreAccepted)) throw new IllegalStateException();
 
         PartialDeps deps = ok1.deps.with(ok2.deps);
+        PartialDeps acceptedDeps = ok1.deps == ok1.acceptedDeps && ok2.deps == ok2.acceptedDeps ? deps : ok1.acceptedDeps.with(ok2.acceptedDeps);

Review Comment:
   Ok, it took me a minute. What this is doing is creating the accepted deps only if it has to because it's not the same as `deps` when one or more has returned `NONE` for accepted deps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094944399


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   OK, there are two versions of this though. One is locally listening (key migration), but in the other we actually want to know it was applied at a quorum remotely (range migration)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093761245


##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -297,7 +313,7 @@ private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore comm
     {
         try (Deps.Builder builder = Deps.builder())
         {
-            commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
+            commandStore.mapReduce(keys, ranges, Any, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,

Review Comment:
   These collections are used for _invalidation_, not for deps or execution. We're looking here for the transactions that _invalidate_ us by _not witnessing us_. For sync points (most importantly exclusive sync points) this is the whole purpose of durably agreeing Deps - so that we know for sure we have seen all `TxnId` that can _possibly_ be agreed to execute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093737046


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)

Review Comment:
   Happy to introduce an inner class. I did previously have a static method I could pass a callback to that would create an anonymous class to wrap the callback but that felt wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094827526


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,
+         * so that is effect on invalidation of earlier transactions is durable. This is most useful for ExclusiveSyncPoint.

Review Comment:
   Ah got it, mixed up that `SyncPointer` includes both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#issuecomment-1425762120

   Ok, have addressed the last nits. Merge on hold until immutable state lands.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1101739479


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   I can do it later. I think I will also need to add the local metadata only behavior of the sync point where we already know it has been satisfied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094839344


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   I still don't follow something basic. What does agree mean?
   
   Maybe we need to step up a level and explain how a sync point achieves its objective where when a result is returned all the dependencies of this txn have reached applied.
   
   Maybe I am crazy, but it looks like the callback fires after `Propose` completes, and doesn't wait for the dependencies to actually apply?
   
   I am going to reread again today and see if I can get to a better understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094929690


##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -54,6 +63,36 @@ public int id()
         return id;
     }
 
+    protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+    {
+        this.rejectBefore = newRejectBefore;
+    }
+
+    Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys, SafeCommandStore safeStore)
+    {
+        NodeTimeService time = safeStore.time();
+        boolean isExpired = agent().isExpired(txnId, safeStore.time().now());

Review Comment:
   Oh, I see. I thought you meant the method, but you mean the local variable. Makes perfect sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094951133


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   True, for the global range migration version we might want to tweak this to support waiting for a certain level of persistence at all replicas. Is it better to add that at point of use, or would you like to add it here in this patch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093737785


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   Basically, though in practice this is a stub for whatever we may want to do with our implementations that use this. Or, for some distributed cleanup for those nodes that don't need it - this is probably the one part of this patch that isn't "done" (but is close enough that we could probably commit as things are so we have a shared base to operate on)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093742649


##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();

Review Comment:
   So, this is something that is probably going to evolve. In practice we *don't* need to treat them differently today, because we do actually send deps with accept today, *and* because we are able to invalidate any transaction that hadn't reached a quorum of accepts (though we don't currently, we probably *should*, as it is consistent with our behaviour elsewhere of invalidating transactions we know were not executed by their coordinator). 
   
   However (and this is documented, but evidently not well), standard transactions are happy to get deps that are superset of those they need, and then filter them. However, exclusive sync points want to _precisely_ exclude txnId they did not witness. We don't want extra txnId sneaking in next time around, as that wouldn't have been witnessed by the replicas that wanted to know for sure they had a complete set of txnId lower than that sync point. So for these transactions we _require_ that we re-propose precisely the same deps as the original coordinator _or_ we invalidate the transaction if they cannot be found.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093736559


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())

Review Comment:
   Do you mean should we check the `agent.isExpired` thing? Since this has no user visible impact, and we presumably want it for some process reason it didn't seem valuable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093757114


##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();
+                    else deps = mergeDeps();
+
+                    if (deps != null)
+                    {
+                        // TODO (desired, behaviour): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour
+                        propose(acceptOrCommit.executeAt, deps);
+                        return;
+                    }
+
+                    // if we propose deps, and cannot assemble a complete set, then we never reached consensus and can be invalidated

Review Comment:
   Send `Accept` with a specific set of `Deps` we want to be durably recoverable before we use them. So for these transactions we're really agreeing `Deps` rather than `executeAt` (since they always use `txnId` for that, though this is arbitrary)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093742910


##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();

Review Comment:
   I will of course try to express this better than I have, and in more places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094866059


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   Agree means we have a quorum of accepted proposals. Ie we have reached a durable decision. I think this is quite normal terminology? At least that’s how I talk about consensus



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094884582


##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -54,6 +63,36 @@ public int id()
         return id;
     }
 
+    protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+    {
+        this.rejectBefore = newRejectBefore;
+    }
+
+    Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys, SafeCommandStore safeStore)
+    {
+        NodeTimeService time = safeStore.time();
+        boolean isExpired = agent().isExpired(txnId, safeStore.time().now());

Review Comment:
   I am just reacting to `isExpired` being reused for the result of `rejectBefore`
   If you want to call both of them expiration that's pretty reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1101739479


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   I can do it later. I think I will also need to add the local metadata only version of the sync point where we already know it has been satisfied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093763768


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -53,6 +66,16 @@ public boolean isRead()
             return this == Read;
         }
 
+        /**
+         * Note that it is only possible to do this for transactions whose execution time is not dependent on
+         * others, i.e. where we may safely propose executeAt = txnId regardless of when it is witnessed by

Review Comment:
   It's only possible to "propose deps". I will clean up this comment, to contain similar text to the other proposed places to explain this distinction. In this case we're essentially saying that, because this transaction doesn't actually execute, it doesn't need to agree an `executeAt`, permitting it the option of instead agreeing some `deps` from the single `PreAccept` round (since the deps after `PreAccept` are valid for `executeAt=txnId`, but not for `executeAt>txnId`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094871497


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   I mentioned this earlier, but for inclusive sync point I think we need to know when it is applied (because apply waits for dependencies right?) since we want the side effects of the dependencies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094870089


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)

Review Comment:
   I don't really care. It's not hard to understand I just noted we have 3 ways to get at different types of proposals (Invalidate, ProposeAndExecute, and this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094847379


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   The reason the dependencies need to be applied is that the use case is needing the side effects of the dependencies to already have occurred for migration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093738412


##########
accord-core/src/main/java/accord/coordinate/Propose.java:
##########
@@ -128,12 +131,7 @@ public void onCallbackFailure(Id from, Throwable failure)
         callback.accept(null, failure);
     }
 
-    private void onAccepted()
-    {
-        isDone = true;
-        Deps deps = Deps.merge(acceptOks, ok -> ok.deps);
-        Execute.execute(node, txnId, txn, route, executeAt, deps, callback);
-    }
+    abstract void onAccepted();
 
     // A special version for proposing the invalidation of a transaction; only needs to succeed on one shard
     static class Invalidate implements Callback<AcceptReply>

Review Comment:
   I don't think so, but happy to be told otherwise. We have a pattern of ActionX and ActionX.Invalidate - though perhaps this isn't a good pattern anyway 🤷 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093735288


##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -54,6 +63,36 @@ public int id()
         return id;
     }
 
+    protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+    {
+        this.rejectBefore = newRejectBefore;
+    }
+
+    Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys, SafeCommandStore safeStore)
+    {
+        NodeTimeService time = safeStore.time();
+        boolean isExpired = agent().isExpired(txnId, safeStore.time().now());

Review Comment:
   We use it on both replica and coordinator, and on coordinator that perhaps isn't quite right. Though we could have different criteria for each, or we could stop checking it on coordinator and just check if any of the votes were to reject it,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093735718


##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -374,9 +528,9 @@ public void run()
             }
         }
 
-        class AsyncState extends State implements SafeCommandStore
+        class AsyncState extends InMemoryCommandStore.State implements SafeCommandStore

Review Comment:
   I think this is just Java 8's compilation woes, sometimes we have to rejig imports until it decides things are fine again. Just noise. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093644771


##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -374,9 +528,9 @@ public void run()
             }
         }
 
-        class AsyncState extends State implements SafeCommandStore
+        class AsyncState extends InMemoryCommandStore.State implements SafeCommandStore

Review Comment:
   Is removing the import an improvement?



##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())

Review Comment:
   Should we expire? 



##########
accord-core/src/main/java/accord/coordinate/Propose.java:
##########
@@ -128,12 +131,7 @@ public void onCallbackFailure(Id from, Throwable failure)
         callback.accept(null, failure);
     }
 
-    private void onAccepted()
-    {
-        isDone = true;
-        Deps deps = Deps.merge(acceptOks, ok -> ok.deps);
-        Execute.execute(node, txnId, txn, route, executeAt, deps, callback);
-    }
+    abstract void onAccepted();
 
     // A special version for proposing the invalidation of a transaction; only needs to succeed on one shard
     static class Invalidate implements Callback<AcceptReply>

Review Comment:
   Should invalidate be pulled out into a separate class now?



##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -152,16 +164,18 @@ public RecoverReply reduce(RecoverReply r1, RecoverReply r2)
         if (!ok1.status.hasBeen(PreAccepted)) throw new IllegalStateException();
 
         PartialDeps deps = ok1.deps.with(ok2.deps);
+        PartialDeps acceptedDeps = ok1.deps == ok1.acceptedDeps && ok2.deps == ok2.acceptedDeps ? deps : ok1.acceptedDeps.with(ok2.acceptedDeps);

Review Comment:
   If one or more of them is `PartialDeps.NONE` why are we doing `with`? To make sure we pick whichever one might not be `NONE` or get double `NONE` merged?



##########
accord-core/src/main/java/accord/primitives/Timestamp.java:
##########
@@ -19,12 +19,16 @@
 package accord.primitives;
 
 import accord.local.Node.Id;
-import accord.utils.Invariants;
+
+import static accord.utils.Invariants.checkArgument;
 
 import javax.annotation.Nonnull;
 
 public class Timestamp implements Comparable<Timestamp>
 {
+    private static final int REJECTED_FLAG = 0x8000;

Review Comment:
   Should `REJECTED_FLAG` be the same as `MERGE_FLAGS`?



##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)

Review Comment:
   Anonymously subclassing `Propose` to override `onAccepted` is moderately weird, but sure. For `ProposeAndExecute` there is a class.



##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -54,6 +63,36 @@ public int id()
         return id;
     }
 
+    protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore)
+    {
+        this.rejectBefore = newRejectBefore;
+    }
+
+    Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys, SafeCommandStore safeStore)
+    {
+        NodeTimeService time = safeStore.time();
+        boolean isExpired = agent().isExpired(txnId, safeStore.time().now());

Review Comment:
   Maybe rename `isExpired` to `shouldReject`?



##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   We send the apply to avoid having this transaction block others, and we don't need a response because it's fine if this transaction needs recovery?



##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();

Review Comment:
   We don't need to collect deps anymore? Does this mean we always enter `Recover` after reaching a quorum from each shard?



##########
accord-core/src/main/java/accord/coordinate/Propose.java:
##########
@@ -74,18 +73,19 @@ class Propose implements Callback<AcceptReply>
         this.acceptTracker = new QuorumTracker(topologies);
     }
 
-    public static void propose(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,
-                               Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
+    public static void propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,

Review Comment:
   Unused now 



##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();

Review Comment:
   Why do we have this distinction here for this transaction type? (needs comment)
   
   Seems like this is the one case where we are ok with accepted deps being null, but for a regular transaction it's an invariant that it not be null?
   
   So a `proposeDeps` transaction must arrive here with less information, but still want to proceed to invalidation? Why is that?



##########
accord-core/src/main/java/accord/local/SafeCommandStore.java:
##########
@@ -66,7 +70,37 @@
         EXECUTES_AFTER
     }
     enum TestDep { WITH, WITHOUT, ANY_DEPS }
-    enum TestKind { Ws, RorWs }
+    enum TestKind implements Predicate<Txn.Kind>

Review Comment:
   So with this do sync points only conflict in one direction and basically not block other transactions? Nice.



##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -216,19 +230,21 @@ public static class RecoverOk extends RecoverReply
         public final Ballot accepted;
         public final Timestamp executeAt;
         public final PartialDeps deps;
+        public final PartialDeps acceptedDeps; // only those deps that have previously been proposed

Review Comment:
   Good place to comment on when/why the distinction matters, but it if it can/should be covered where this is used that is also fine



##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -53,6 +66,16 @@ public boolean isRead()
             return this == Read;
         }
 
+        /**
+         * Note that it is only possible to do this for transactions whose execution time is not dependent on
+         * others, i.e. where we may safely propose executeAt = txnId regardless of when it is witnessed by

Review Comment:
   How does a transaction end up being that way? What transaction could safely propose a txnId of 0 at any time?
   
   Also what is "this"? :-)
   
   It's proposing invalidations, don't those need to properly order with what already occurred?



##########
accord-core/src/main/java/accord/messages/BeginRecovery.java:
##########
@@ -297,7 +313,7 @@ private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore comm
     {
         try (Deps.Builder builder = Deps.builder())
         {
-            commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
+            commandStore.mapReduce(keys, ranges, Any, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,

Review Comment:
   Why use `Any` for recovery? What is the interaction you are looking for with sync transactions?
   
   Does recovery force them to order with sync transactions, but during regular execution we don't?



##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();
+                    else deps = mergeDeps();
+
+                    if (deps != null)
+                    {
+                        // TODO (desired, behaviour): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour
+                        propose(acceptOrCommit.executeAt, deps);
+                        return;
+                    }
+
+                    // if we propose deps, and cannot assemble a complete set, then we never reached consensus and can be invalidated

Review Comment:
   What does it mean to propose deps?



##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   Precise collection needs elaboration, what you mean is that after `Preaccept` we have a precise set because we are fine with extra dependencies?
   
   Don't all transactions determine a precise set of dependencies in order to determine their `executeAt`?



##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,
+         * so that is effect on invalidation of earlier transactions is durable. This is most useful for ExclusiveSyncPoint.

Review Comment:
   This last half really confuses me, `SyncPoint` shouldn't invalidate? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094868548


##########
accord-core/src/main/java/accord/coordinate/Propose.java:
##########
@@ -74,18 +73,19 @@ class Propose implements Callback<AcceptReply>
         this.acceptTracker = new QuorumTracker(topologies);
     }
 
-    public static void propose(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,
-                               Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
+    public static void propose(Node node, Topologies topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,

Review Comment:
   FYI still unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093764809


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   Nope, most transactions do not _agree_ their deps, and indeed for those that agree with 1RT consensus there are no durable deps to recover (and we intend to stop supplying them for accept too). But for SyncPoint, for the reasons stated elsewhere, this is fine because we always have `executeAt=txnId` and this is fine because we don't "execute" them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093761748


##########
accord-core/src/main/java/accord/primitives/Timestamp.java:
##########
@@ -19,12 +19,16 @@
 package accord.primitives;
 
 import accord.local.Node.Id;
-import accord.utils.Invariants;
+
+import static accord.utils.Invariants.checkArgument;
 
 import javax.annotation.Nonnull;
 
 public class Timestamp implements Comparable<Timestamp>
 {
+    private static final int REJECTED_FLAG = 0x8000;

Review Comment:
   I will comment. The idea here is that we probably don't want all flags to be mergeable, though today we only have one flag.



##########
accord-core/src/main/java/accord/primitives/Timestamp.java:
##########
@@ -19,12 +19,16 @@
 package accord.primitives;
 
 import accord.local.Node.Id;
-import accord.utils.Invariants;
+
+import static accord.utils.Invariants.checkArgument;
 
 import javax.annotation.Nonnull;
 
 public class Timestamp implements Comparable<Timestamp>
 {
+    private static final int REJECTED_FLAG = 0x8000;

Review Comment:
   I will comment. The idea here is that we probably don't want all flags to be mergeable, though today we only have one flag, and we do want it to propagate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093723225


##########
accord-core/src/main/java/accord/utils/ReducingRangeMap.java:
##########
@@ -0,0 +1,131 @@
+package accord.utils;
+
+import accord.api.RoutingKey;
+import accord.primitives.*;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import static accord.utils.SortedArrays.Search.FAST;
+
+public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V>
+{
+    final RoutingKeys endKeys;
+
+    public ReducingRangeMap(V value)
+    {
+        super(value);
+        this.endKeys = RoutingKeys.EMPTY;
+    }
+
+    ReducingRangeMap(boolean inclusiveEnds, RoutingKey[] ends, V[] values)
+    {
+        super(inclusiveEnds, ends, values);
+        this.endKeys = RoutingKeys.ofSortedUnique(ends);
+    }
+
+    public <V2> V2 foldl(Routables<?, ?> routables, BiFunction<V, V2, V2> fold, V2 initialValue, Predicate<V2> terminate)
+    {
+        switch (routables.domain())
+        {
+            default: throw new AssertionError();
+            case Key: return foldl((AbstractKeys<?, ?>) routables, fold, initialValue, terminate);
+            case Range: return Routables.foldl(endKeys, (AbstractRanges<?>) routables, (f, vs, routingKey, v, index) -> f.apply(vs[index], v), fold, values, initialValue, terminate);
+        }
+    }
+
+    public <V2> V2 foldl(AbstractKeys<?, ?> keys, BiFunction<V, V2, V2> reduce, V2 accumulator, Predicate<V2> terminate)
+    {
+        int i = 0, j = 0;
+        while (j < keys.size())
+        {
+            i = endKeys.findNext(i, keys.get(j), FAST);
+            if (i < 0) i = -1 - i;
+            else if (inclusiveEnds) ++i;
+
+            accumulator = reduce.apply(values[i], accumulator);
+            if (i == endKeys.size() || terminate.test(accumulator))
+                return accumulator;
+
+            j = keys.findNext(j + 1, endKeys.get(i), FAST);
+            if (j < 0) j = -1 - j;
+            else if (inclusiveEnds) ++j;
+        }
+        return accumulator;
+    }
+
+    /**
+     * returns a copy of this LinearRangeMap limited to the ranges supplied, with all other ranges reporting Ballot.none()

Review Comment:
   It's not a ballot anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094877230


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())

Review Comment:
   Looks like we expire requests so if the transaction is stuck it will eventually generate an error so yeah seems fine to not align with a specific timeout just lack of progress.
   
   I do wonder if it could get wedged in say the transaction logic, but then a retry wouldn't help either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094944399


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)
+            {
+                @Override
+                void onAccepted()
+                {
+                    node.send(tracker.nodes(), id -> new Apply(id, tracker.topologies(), tracker.topologies(), txnId.epoch(), txnId, route, txnId, deps, txn.execute(txnId, null), txn.result(txnId, null)));

Review Comment:
   OK, there are two versions of this though. One is locally listening (key migration), but in the other we actually want to know it was applied at a quorum remotely (range migration)?
   
   Or can we locally listen for the state change to globally durable?
   
   I wonder if we will have trouble with lost notifications here. Like we may not be involved in the dependency transactions (range we don't replicate) at all so why would we receive notification of it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094927869


##########
accord-core/src/main/java/accord/coordinate/CoordinateSyncPoint.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import accord.local.Node;
+import accord.messages.Apply;
+import accord.messages.PreAccept.PreAcceptOk;
+import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import java.util.List;
+
+import static accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
+import static accord.primitives.Timestamp.mergeMax;
+import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
+import static accord.utils.Functions.foldl;
+
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ *
+ * TODO (desired, testing): dedicated burn test to validate outcomes
+ */
+public class CoordinateSyncPoint extends CoordinatePreAccept<SyncPoint>
+{
+    private CoordinateSyncPoint(Node node, TxnId txnId, Txn txn, FullRoute<?> route)
+    {
+        super(node, txnId, txn, route);
+    }
+
+    public static Future<SyncPoint> exclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(ExclusiveSyncPoint, node, keysOrRanges);
+    }
+
+    public static Future<SyncPoint> inclusive(Node node, Seekables<?, ?> keysOrRanges)
+    {
+        return coordinate(Kind.SyncPoint, node, keysOrRanges);
+    }
+
+    private static Future<SyncPoint> coordinate(Kind kind, Node node, Seekables<?, ?> keysOrRanges)
+    {
+        TxnId txnId = node.nextTxnId(kind, keysOrRanges.domain());
+        FullRoute<?> route = node.computeRoute(txnId, keysOrRanges);
+        CoordinateSyncPoint coordinate = new CoordinateSyncPoint(node, txnId, node.agent().emptyTxn(kind, keysOrRanges), route);
+        coordinate.start();
+        return coordinate;
+    }
+
+    void onPreAccepted(List<PreAcceptOk> successes)
+    {
+        Deps deps = Deps.merge(successes, ok -> ok.deps);
+        Timestamp executeAt = foldl(successes, (ok, prev) -> mergeMax(ok.witnessedAt, prev), Timestamp.NONE);
+        if (executeAt.isRejected())
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, route.homeKey(), route, executeAt, this);
+        }
+        else
+        {
+            // SyncPoint transactions always propose their own txnId as their executeAt, as they are not really executed.
+            // They only create happens-after relationships wrt their dependencies, which represent all transactions
+            // that *may* execute before their txnId, so once these dependencies apply we can say that any action that
+            // awaits these dependencies applies after them. In the case of ExclusiveSyncPoint, we additionally guarantee
+            // that no lower TxnId can later apply.
+            new Propose<SyncPoint>(node, tracker.topologies(), Ballot.ZERO, txnId, txn, route, deps, txnId, this)

Review Comment:
   I guess we could put `ProposeAndExecute` as an inner class. I don't think of this as three different kinds of Propose though, but as two different behaviours conducted after `Propose`, and one special kind of propose (invalidation), as this is behavioural different at the replica, rather than in action following. 
   
   We could chain a layer of indirection with callbacks, but it seemed clearer to me this way. I don't have a super strong feeling, but would prefer to retain the symmetry of `Invalidate` with other classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093739655


##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();

Review Comment:
   Yes, we require a quorum from each side for recovery. There were TODOs about using the deps we have where possible, and for this work it made sense to just do that since we need it for the exclusive sync point - which *proposes* deps, i.e. expects *precisely* those deps to be re-proposed should it fail to durably commit/apply/whathaveyou.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1093758974


##########
accord-core/src/main/java/accord/coordinate/Recover.java:
##########
@@ -212,56 +213,63 @@ private void recover()
             {
                 default: throw new IllegalStateException();
                 case Invalidated:
+                {
                     commitInvalidate();
                     return;
+                }
 
                 case Applied:
                 case PreApplied:
-                    // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, acceptOrCommit.executeAt, (deps, fail) -> {
-                            if (fail != null)
-                            {
-                                accept(null, fail);
-                            }
-                            else
-                            {
-                                // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
-                                Persist.persistAndCommit(node, txnId, route, txn, executeAt, deps, acceptOrCommit.writes, acceptOrCommit.result);
-                                accept(acceptOrCommit.result, null);
-                            }
-                        });
-                    });
+                {
+                    // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Deps acceptedDeps = tryMergeAcceptedDeps();
+                    Invariants.checkState(acceptedDeps != null);
+                    // TODO (required, consider): when writes/result are partially replicated, need to confirm we have quorum of these
+                    Persist.persistAndCommit(node, txnId, route, txn, executeAt, acceptedDeps, acceptOrCommit.writes, acceptOrCommit.result);
+                    accept(acceptOrCommit.result, null);
                     return;
+                }
 
                 case ReadyToExecute:
                 case PreCommitted:
                 case Committed:
+                {
+                    Deps acceptedDeps = tryMergeAcceptedDeps(); // must have gone through Accepted, so we must have witnessed >= Accepted for each shard
+                    Invariants.checkState(acceptedDeps != null);
                     // TODO (desired, efficiency): in some cases we can use the deps we already have (e.g. if we have a quorum of Committed responses)
-                    node.withEpoch(executeAt.epoch(), () -> {
-                        CollectDeps.withDeps(node, txnId, route, txn, executeAt, (deps, fail) -> {
-                            if (fail != null) accept(null, fail);
-                            else Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, deps, this);
-                        });
-                    });
+                    Execute.execute(node, txnId, txn, route, acceptOrCommit.executeAt, acceptedDeps, this);
                     return;
+                }
 
                 case Accepted:
-                    // no need to preaccept the later round, as future operations always include every old epoch (until it is fully migrated)
-                    propose(acceptOrCommit.executeAt, mergeDeps());
-                    return;
+                {
+                    Deps deps;
+                    if (txnId.rw().proposesDeps()) deps = tryMergeAcceptedDeps();
+                    else deps = mergeDeps();
+
+                    if (deps != null)
+                    {
+                        // TODO (desired, behaviour): if we didn't find Accepted in *every* shard, consider invalidating for consistency of behaviour
+                        propose(acceptOrCommit.executeAt, deps);
+                        return;
+                    }
+
+                    // if we propose deps, and cannot assemble a complete set, then we never reached consensus and can be invalidated

Review Comment:
   I think this will become more confusing when we permit regular transactions to _not_ send deps, but it is already the case that we do not treat the `deps` register on replicas as durable/consistent for regular transactions. I will think about where best to document this - I will at least mention it in `Command`, and perhaps also in `RecoverOk` and `Accept`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "aweisberg (via GitHub)" <gi...@apache.org>.
aweisberg commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094885024


##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -374,9 +528,9 @@ public void run()
             }
         }
 
-        class AsyncState extends State implements SafeCommandStore
+        class AsyncState extends InMemoryCommandStore.State implements SafeCommandStore

Review Comment:
   Yikes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on a diff in pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on code in PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#discussion_r1094949442


##########
accord-core/src/main/java/accord/primitives/Txn.java:
##########
@@ -32,13 +32,26 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import static accord.primitives.Routables.Slice.Overlapping;
-
 public interface Txn
 {
     enum Kind
     {
-        Read, Write;
+        Read,
+        Write,
+
+        /**
+         * A pseudo-transaction whose deps represent the complete set of transactions that may execute before it,
+         * without interfering with their execution.
+         *
+         * A SyncPoint is unique in that it agrees not only an executeAt but also a precise collection of dependencies,

Review Comment:
   Yes, but it’s slightly more than that: since an Accept round for a normal transaction means it has (likely) taken a later executeAt the dependencies aren’t enough for execution as they don’t include the full set of deps for the higher executeAt - for that we use the dependencies returned in the Acceot response



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Re: [PR] ExclusiveSyncPoint [cassandra-accord]

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith closed pull request #26: ExclusiveSyncPoint
URL: https://github.com/apache/cassandra-accord/pull/26


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


[GitHub] [cassandra-accord] belliottsmith commented on pull request #26: ExclusiveSyncPoint

Posted by "belliottsmith (via GitHub)" <gi...@apache.org>.
belliottsmith commented on PR #26:
URL: https://github.com/apache/cassandra-accord/pull/26#issuecomment-1529004917

   I've rebased and back ported some relevant changes from the bootstrap PR (plus some changes not super tied to either). I've also brought in some changes to listener handling. I plan to merge this to cep-15-accord alongside bootstrap, to simplify things, but would be good to get this review signed off once you're ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org