You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/11/18 22:15:53 UTC

[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #14: Invalidation fixes/improvements

aweisberg commented on code in PR #14:
URL: https://github.com/apache/cassandra-accord/pull/14#discussion_r1025765346


##########
accord-core/src/main/java/accord/coordinate/CheckOn.java:
##########
@@ -161,25 +171,39 @@ void start()
         public Void apply(SafeCommandStore safeStore)
         {
             Command command = safeStore.command(txnId);
-            switch (sufficientTo)
+            switch (sufficientFor.propagate())
             {
                 default: throw new IllegalStateException();
-                case Invalidation:
+                case Invalidated:
                     command.commitInvalidate(safeStore);
                     break;
-                case Outcome:
+
+                case Applied:

Review Comment:
   Applied is not a return value of `Status.Known.propagate()`



##########
accord-core/src/main/java/accord/coordinate/CheckOn.java:
##########
@@ -161,25 +171,39 @@ void start()
         public Void apply(SafeCommandStore safeStore)
         {
             Command command = safeStore.command(txnId);
-            switch (sufficientTo)
+            switch (sufficientFor.propagate())
             {
                 default: throw new IllegalStateException();
-                case Invalidation:
+                case Invalidated:
                     command.commitInvalidate(safeStore);
                     break;
-                case Outcome:
+
+                case Applied:
+                case PreApplied:
                     if (untilLocalEpoch >= full.executeAt.epoch)
                     {
                         confirm(command.commit(safeStore, maxRoute, progressKey, partialTxn, full.executeAt, partialDeps));
                         confirm(command.apply(safeStore, untilLocalEpoch, maxRoute, full.executeAt, partialDeps, full.writes, full.result));
                         break;
                     }
-                case ExecutionOrder:
+
+                case Committed:
+                case ReadyToExecute:

Review Comment:
   `ReadyToExecute` is not a return value



##########
accord-core/src/main/java/accord/coordinate/Coordinate.java:
##########
@@ -289,14 +168,22 @@ private void onPreAccepted()
                     }
                     else
                     {
-                        commitInvalidate(node, txnId, route, executeAt);
-                        accept(null, new Timeout(txnId, route.homeKey));
+                        node.withEpoch(executeAt.epoch, () -> {

Review Comment:
   Is it possible to somehow assert on whether we are missing these withEpoch calls?
   
   Like we know the executeAtEpoch from inside `commitInvalidate`.



##########
accord-core/src/main/java/accord/coordinate/CheckOn.java:
##########
@@ -161,25 +171,39 @@ void start()
         public Void apply(SafeCommandStore safeStore)
         {
             Command command = safeStore.command(txnId);
-            switch (sufficientTo)
+            switch (sufficientFor.propagate())
             {
                 default: throw new IllegalStateException();
-                case Invalidation:
+                case Invalidated:
                     command.commitInvalidate(safeStore);
                     break;
-                case Outcome:
+
+                case Applied:
+                case PreApplied:
                     if (untilLocalEpoch >= full.executeAt.epoch)
                     {
                         confirm(command.commit(safeStore, maxRoute, progressKey, partialTxn, full.executeAt, partialDeps));
                         confirm(command.apply(safeStore, untilLocalEpoch, maxRoute, full.executeAt, partialDeps, full.writes, full.result));
                         break;
                     }
-                case ExecutionOrder:
+
+                case Committed:
+                case ReadyToExecute:
                     confirm(command.commit(safeStore, maxRoute, progressKey, partialTxn, full.executeAt, partialDeps));
                     break;
-                case Definition:
+
+                case PreCommitted:
+                    command.precommit(safeStore, full.executeAt);
+                    if (!sufficientFor.definition.isKnown())
+                        break;
+
+                case Accepted:

Review Comment:
   `Accepted` is not a return value of propagate



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -88,14 +76,24 @@ public boolean hasBeen(Status status)
         return status().hasBeen(status);
     }
 
-    public boolean hasBeen(Known phase)
+    public boolean has(Known known)

Review Comment:
   It's not clear from the enum values which enum is in play especially now that there are quite a few. Should `has` be called something that does more heavy lifting?



##########
accord-core/src/main/java/accord/coordinate/CheckOn.java:
##########
@@ -161,25 +171,39 @@ void start()
         public Void apply(SafeCommandStore safeStore)
         {
             Command command = safeStore.command(txnId);
-            switch (sufficientTo)
+            switch (sufficientFor.propagate())
             {
                 default: throw new IllegalStateException();
-                case Invalidation:
+                case Invalidated:
                     command.commitInvalidate(safeStore);
                     break;
-                case Outcome:
+
+                case Applied:
+                case PreApplied:
                     if (untilLocalEpoch >= full.executeAt.epoch)
                     {
                         confirm(command.commit(safeStore, maxRoute, progressKey, partialTxn, full.executeAt, partialDeps));
                         confirm(command.apply(safeStore, untilLocalEpoch, maxRoute, full.executeAt, partialDeps, full.writes, full.result));
                         break;
                     }
-                case ExecutionOrder:
+
+                case Committed:
+                case ReadyToExecute:
                     confirm(command.commit(safeStore, maxRoute, progressKey, partialTxn, full.executeAt, partialDeps));
                     break;
-                case Definition:
+
+                case PreCommitted:
+                    command.precommit(safeStore, full.executeAt);
+                    if (!sufficientFor.definition.isKnown())
+                        break;
+
+                case Accepted:
+                case PreAccepted:
                     command.preaccept(safeStore, partialTxn, maxRoute, progressKey);
-                case Nothing:
+                    break;
+
+                case AcceptedInvalidate:
+                case NotWitnessed:

Review Comment:
   These two aren't either. Should this use a more specific enum for this concept of what information should be propagated?



##########
accord-core/src/main/java/accord/coordinate/FetchData.java:
##########
@@ -30,16 +33,16 @@ public static Object fetch(Known phase, Node node, TxnId txnId, RoutingKeys some
         else return fetchWithSomeKeys(phase, node, txnId, someKeys, untilLocalEpoch, callback);
     }
 
-    public static Object fetch(Known phase, Node node, TxnId txnId, AbstractRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)
+    public static Object fetch(Known fetch, Node node, TxnId txnId, AbstractRoute route, @Nullable Timestamp executeAt, long untilLocalEpoch, BiConsumer<Known, Throwable> callback)

Review Comment:
   Not consistent about renaming to fetch?



##########
accord-core/src/main/java/accord/coordinate/FindRoute.java:
##########
@@ -32,7 +32,7 @@ public Result(Route route, Timestamp executeAt)
         public Result(CheckStatusOk ok)
         {
             this.route = (Route)ok.route;
-            this.executeAt = ok.saveStatus.status.compareTo(Status.Committed) >= 0 ? ok.executeAt : null;
+            this.executeAt = ok.saveStatus.status.compareTo(Status.PreCommitted) >= 0 ? ok.executeAt : null;

Review Comment:
   What exactly is this new precommitted state and what are we using it for?



##########
accord-core/src/main/java/accord/coordinate/tracking/InvalidationTracker.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.api.RoutingKey;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class InvalidationTracker extends AbstractTracker<InvalidationTracker.InvalidationShardTracker, Node.Id>
+{
+    public static class InvalidationShardTracker extends ShardTracker
+    {
+        protected int fastPathRejects;
+        protected int promises;
+        protected int successes;
+        protected int inflight;
+
+        public InvalidationShardTracker(Shard shard)
+        {
+            super(shard);
+            inflight = shard.rf();
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromisedNoFastPath(Node.Id from)
+        {
+            if (shard.fastPathElectorate.contains(from))
+                ++fastPathRejects;
+
+            return onPromised(from);
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromised(Node.Id from)
+        {
+            ++promises;
+            ++successes;
+            --inflight;
+
+            if (isNewPromiseAcceptance())
+                return promised(rejectsFastPath());
+
+            if (isNewQuorum() && isPromiseRejected())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromiseRejectedNoFastPath(Node.Id from)
+        {
+            if (shard.fastPathElectorate.contains(from))
+                ++fastPathRejects;
+
+            return onPromiseRejected(from);
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromiseRejected(Node.Id from)
+        {
+            ++successes;
+            --inflight;
+
+            if (isNewPromiseRejection() && hasReachedQuorum())
+                return rejected(rejectsFastPath());
+
+            if (isNewQuorum() && isPromiseRejected())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onFailure(Node.Id from)
+        {
+            --inflight;
+
+            if (isNewFailure())
+                return failed(rejectsFastPath());
+
+            if (isNewPromiseRejection() && hasReachedQuorum())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        private boolean isNewPromiseAcceptance()
+        {
+            return promises == shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewQuorum()
+        {
+            return successes == shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewPromiseRejection()
+        {
+            return promises + inflight == shard.slowPathQuorumSize - 1;
+        }
+
+        private boolean isPromiseRejected()
+        {
+            return promises + inflight < shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewFailure()
+        {
+            return successes + inflight == shard.slowPathQuorumSize - 1;
+        }
+
+        public boolean rejectsFastPath()
+        {
+            return fastPathRejects > shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
+        }
+
+        public boolean isPromised()
+        {
+            return promises >= shard.slowPathQuorumSize;
+        }
+
+        public boolean canPromise()

Review Comment:
   Unused



##########
accord-core/src/main/java/accord/coordinate/Invalidate.java:
##########
@@ -22,126 +22,114 @@
 import java.util.List;
 import java.util.function.BiConsumer;
 
+import accord.coordinate.tracking.InvalidationTracker;
+import accord.messages.Commit;
 import accord.primitives.*;
-import com.google.common.base.Preconditions;
+import accord.topology.Topologies;
 
 import accord.api.RoutingKey;
-import accord.coordinate.tracking.AbstractQuorumTracker.QuorumShardTracker;
 import accord.local.Node;
 import accord.local.Node.Id;
-import accord.local.Status;
 import accord.messages.BeginInvalidation;
-import accord.messages.BeginInvalidation.InvalidateNack;
-import accord.messages.BeginInvalidation.InvalidateOk;
 import accord.messages.BeginInvalidation.InvalidateReply;
 import accord.messages.Callback;
-import accord.topology.Shard;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
 
 import static accord.coordinate.Propose.Invalidate.proposeInvalidate;
+import static accord.coordinate.tracking.RequestStatus.Failed;
 import static accord.local.PreLoadContext.contextFor;
-import static accord.local.Status.Accepted;
-import static accord.local.Status.PreAccepted;
-import static accord.messages.Commit.Invalidate.commitInvalidate;
 import static accord.primitives.ProgressToken.INVALIDATED;
 
 public class Invalidate implements Callback<InvalidateReply>
 {
     final Node node;
     final Ballot ballot;
     final TxnId txnId;
-    final RoutingKeys informKeys;
-    final RoutingKey invalidateWithKey;
-    final Status recoverIfAtLeast;
+    final RoutingKeys invalidateWithKeys;
     final BiConsumer<Outcome, Throwable> callback;
 
     boolean isDone;
-    final List<InvalidateOk> invalidateOks = new ArrayList<>();
-    final QuorumShardTracker preacceptTracker;
+    boolean isPrepareDone;
+    final List<InvalidateReply> replies = new ArrayList<>();
+    final InvalidationTracker tracker;
+    Throwable failure;
 
-    private Invalidate(Node node, Shard shard, Ballot ballot, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, Status recoverIfAtLeast, BiConsumer<Outcome, Throwable> callback)
+    private Invalidate(Node node, Ballot ballot, TxnId txnId, RoutingKeys invalidateWithKeys, BiConsumer<Outcome, Throwable> callback)
     {
         this.callback = callback;
-        Preconditions.checkArgument(informKeys.contains(invalidateWithKey));
         this.node = node;
         this.ballot = ballot;
         this.txnId = txnId;
-        this.informKeys = informKeys;
-        this.invalidateWithKey = invalidateWithKey;
-        this.recoverIfAtLeast = recoverIfAtLeast;
-        this.preacceptTracker = new QuorumShardTracker(shard);
-    }
-
-    public static Invalidate invalidateIfNotWitnessed(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
-    {
-        return invalidate(node, txnId, informKeys, invalidateWithKey, PreAccepted, callback);
+        this.invalidateWithKeys = invalidateWithKeys;
+        Topologies topologies = node.topology().forEpoch(invalidateWithKeys, txnId.epoch);
+        this.tracker = new InvalidationTracker(topologies);
     }
 
-    public static Invalidate invalidate(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, BiConsumer<Outcome, Throwable> callback)
+    public static Invalidate invalidate(Node node, TxnId txnId, RoutingKeys invalidateWithKeys, BiConsumer<Outcome, Throwable> callback)
     {
-        return invalidate(node, txnId, informKeys, invalidateWithKey, Accepted, callback);
+        Ballot ballot = new Ballot(node.uniqueNow());
+        Invalidate invalidate = new Invalidate(node, ballot, txnId, invalidateWithKeys, callback);
+        invalidate.start();
+        return invalidate;
     }
 
-    private static Invalidate invalidate(Node node, TxnId txnId, RoutingKeys informKeys, RoutingKey invalidateWithKey, Status recoverIfAtLeast, BiConsumer<Outcome, Throwable> callback)
+    private void start()
     {
-        Ballot ballot = new Ballot(node.uniqueNow());
-        Shard shard = node.topology().forEpochIfKnown(invalidateWithKey, txnId.epoch);
-        Invalidate invalidate = new Invalidate(node, shard, ballot, txnId, informKeys, invalidateWithKey, recoverIfAtLeast, callback);
-        node.send(shard.nodes, to -> new BeginInvalidation(txnId, invalidateWithKey, ballot), invalidate);
-        return invalidate;
+        node.send(tracker.nodes(), to -> new BeginInvalidation(to, tracker.topologies(), txnId, invalidateWithKeys, ballot), this);
     }
 
     @Override
     public synchronized void onSuccess(Id from, InvalidateReply reply)
     {
-        if (isDone || preacceptTracker.hasReachedQuorum())
+        if (isDone || isPrepareDone)
             return;
 
-        if (!reply.isOk())
+        replies.add(reply);
+        switch (tracker.recordSuccess(from, reply.isPromised(), reply.acceptedFastPath))
         {
-            InvalidateNack nack = (InvalidateNack) reply;
-            if (nack.homeKey != null)
-            {
-                node.ifLocalSince(contextFor(txnId), invalidateWithKey, txnId, safeStore -> {
-                    safeStore.command(txnId).updateHomeKey(safeStore, nack.homeKey);
-                }).addCallback(node.agent());
-            }
+            default: throw new AssertionError();
+            case Success:
+                invalidate();
+                break;
 
-            isDone = true;
-            callback.accept(null, new Preempted(txnId, null));
-            return;
-        }
+            case Failed:

Review Comment:
   This isn't failed so much as this is not ready to invalidate. Failure is a bit exceptional sounding. Maybe add a comment?
   // This transaction is not able to be invalidated because reasons 



##########
accord-core/src/main/java/accord/coordinate/tracking/InvalidationTracker.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.api.RoutingKey;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class InvalidationTracker extends AbstractTracker<InvalidationTracker.InvalidationShardTracker, Node.Id>

Review Comment:
   I reviewed this and I think it does what it is supposed to do which is figure out if there is a shard, any shard, that will reject the fast path and has given us sufficient promises to propose invalidation.
   
   Also apparently collecting a quorum of promises from all the shards is good enough which I didn't understand.



##########
accord-core/src/main/java/accord/coordinate/Invalidate.java:
##########
@@ -151,12 +139,12 @@ private void invalidate()
                     {
                         RecoverWithRoute.recover(node, ballot, txnId, route, callback);

Review Comment:
   If recovery shouldn't loop back can we maybe have some kind of state check that the callback going into recovery isn't already a recovery callback?



##########
accord-core/src/main/java/accord/coordinate/tracking/InvalidationTracker.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.api.RoutingKey;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class InvalidationTracker extends AbstractTracker<InvalidationTracker.InvalidationShardTracker, Node.Id>
+{
+    public static class InvalidationShardTracker extends ShardTracker
+    {
+        protected int fastPathRejects;
+        protected int promises;
+        protected int successes;
+        protected int inflight;
+
+        public InvalidationShardTracker(Shard shard)
+        {
+            super(shard);
+            inflight = shard.rf();
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromisedNoFastPath(Node.Id from)
+        {
+            if (shard.fastPathElectorate.contains(from))
+                ++fastPathRejects;
+
+            return onPromised(from);
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromised(Node.Id from)
+        {
+            ++promises;
+            ++successes;
+            --inflight;
+
+            if (isNewPromiseAcceptance())
+                return promised(rejectsFastPath());
+
+            if (isNewQuorum() && isPromiseRejected())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromiseRejectedNoFastPath(Node.Id from)
+        {
+            if (shard.fastPathElectorate.contains(from))
+                ++fastPathRejects;
+
+            return onPromiseRejected(from);
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromiseRejected(Node.Id from)
+        {
+            ++successes;
+            --inflight;
+
+            if (isNewPromiseRejection() && hasReachedQuorum())
+                return rejected(rejectsFastPath());
+
+            if (isNewQuorum() && isPromiseRejected())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onFailure(Node.Id from)
+        {
+            --inflight;
+
+            if (isNewFailure())
+                return failed(rejectsFastPath());
+
+            if (isNewPromiseRejection() && hasReachedQuorum())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        private boolean isNewPromiseAcceptance()
+        {
+            return promises == shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewQuorum()
+        {
+            return successes == shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewPromiseRejection()
+        {
+            return promises + inflight == shard.slowPathQuorumSize - 1;
+        }
+
+        private boolean isPromiseRejected()
+        {
+            return promises + inflight < shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewFailure()
+        {
+            return successes + inflight == shard.slowPathQuorumSize - 1;
+        }
+
+        public boolean rejectsFastPath()
+        {
+            return fastPathRejects > shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
+        }
+
+        public boolean isPromised()
+        {
+            return promises >= shard.slowPathQuorumSize;
+        }
+
+        public boolean canPromise()
+        {
+            return promises + inflight >= shard.slowPathQuorumSize;
+        }
+
+        public boolean isDecided()
+        {
+            return hasFailed() || (hasReachedQuorum() && (isPromised() || isPromiseRejected()));
+        }
+
+        @Override
+        boolean hasFailed()
+        {
+            // we don't want any single shard to declare failure; it's only a failure if ALL have not reached a quorum
+            // and NONE have rejected the fast path
+            return successes + inflight < shard.slowPathQuorumSize;
+        }
+
+        @Override
+        boolean hasReachedQuorum()
+        {
+            return successes >= shard.slowPathQuorumSize;
+        }
+
+        @Override
+        boolean hasInFlight()
+        {
+            return inflight > 0;
+        }
+    }
+
+    private int promisedShard = -1;
+    private boolean rejectsFastPath;
+    private boolean hasFailedShard;
+    public InvalidationTracker(Topologies topologies)
+    {
+        super(topologies, InvalidationShardTracker[]::new, InvalidationShardTracker::new);
+    }
+
+    public int promisedShardIndex()
+    {
+        return promisedShard;
+    }
+
+    public Shard promisedShard()
+    {
+        return get(promisedShard).shard;
+    }
+
+    public boolean isPromised()
+    {
+        return promisedShard >= 0;
+    }
+
+    public boolean isSafeToInvalidate(boolean contactedAllShards)
+    {
+        return rejectsFastPath || (contactedAllShards && !hasFailedShard);

Review Comment:
   If we contacted all shards and they all promised to invalidate how do we know there wasn't a fast path preaccept and actually we can't invalidate?
   
   preacceptInvalidate checks nothing but the Ballot we provided?



##########
accord-core/src/main/java/accord/coordinate/tracking/InvalidationTracker.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.api.RoutingKey;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class InvalidationTracker extends AbstractTracker<InvalidationTracker.InvalidationShardTracker, Node.Id>
+{
+    public static class InvalidationShardTracker extends ShardTracker
+    {
+        protected int fastPathRejects;
+        protected int promises;
+        protected int successes;
+        protected int inflight;
+
+        public InvalidationShardTracker(Shard shard)
+        {
+            super(shard);
+            inflight = shard.rf();
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromisedNoFastPath(Node.Id from)
+        {
+            if (shard.fastPathElectorate.contains(from))
+                ++fastPathRejects;
+
+            return onPromised(from);
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromised(Node.Id from)
+        {
+            ++promises;
+            ++successes;
+            --inflight;
+
+            if (isNewPromiseAcceptance())
+                return promised(rejectsFastPath());
+
+            if (isNewQuorum() && isPromiseRejected())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromiseRejectedNoFastPath(Node.Id from)
+        {
+            if (shard.fastPathElectorate.contains(from))
+                ++fastPathRejects;
+
+            return onPromiseRejected(from);
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onPromiseRejected(Node.Id from)
+        {
+            ++successes;
+            --inflight;
+
+            if (isNewPromiseRejection() && hasReachedQuorum())
+                return rejected(rejectsFastPath());
+
+            if (isNewQuorum() && isPromiseRejected())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        public ShardOutcome<? super InvalidationTracker> onFailure(Node.Id from)
+        {
+            --inflight;
+
+            if (isNewFailure())
+                return failed(rejectsFastPath());
+
+            if (isNewPromiseRejection() && hasReachedQuorum())
+                return rejected(rejectsFastPath());
+
+            return noChangeToQuorum(rejectsFastPath());
+        }
+
+        private boolean isNewPromiseAcceptance()
+        {
+            return promises == shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewQuorum()
+        {
+            return successes == shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewPromiseRejection()
+        {
+            return promises + inflight == shard.slowPathQuorumSize - 1;
+        }
+
+        private boolean isPromiseRejected()
+        {
+            return promises + inflight < shard.slowPathQuorumSize;
+        }
+
+        private boolean isNewFailure()
+        {
+            return successes + inflight == shard.slowPathQuorumSize - 1;
+        }
+
+        public boolean rejectsFastPath()
+        {
+            return fastPathRejects > shard.fastPathElectorate.size() - shard.fastPathQuorumSize;
+        }
+
+        public boolean isPromised()
+        {
+            return promises >= shard.slowPathQuorumSize;
+        }
+
+        public boolean canPromise()
+        {
+            return promises + inflight >= shard.slowPathQuorumSize;
+        }
+
+        public boolean isDecided()
+        {
+            return hasFailed() || (hasReachedQuorum() && (isPromised() || isPromiseRejected()));
+        }
+
+        @Override
+        boolean hasFailed()
+        {
+            // we don't want any single shard to declare failure; it's only a failure if ALL have not reached a quorum
+            // and NONE have rejected the fast path
+            return successes + inflight < shard.slowPathQuorumSize;
+        }
+
+        @Override
+        boolean hasReachedQuorum()
+        {
+            return successes >= shard.slowPathQuorumSize;
+        }
+
+        @Override
+        boolean hasInFlight()
+        {
+            return inflight > 0;
+        }
+    }
+
+    private int promisedShard = -1;
+    private boolean rejectsFastPath;
+    private boolean hasFailedShard;
+    public InvalidationTracker(Topologies topologies)
+    {
+        super(topologies, InvalidationShardTracker[]::new, InvalidationShardTracker::new);
+    }
+
+    public int promisedShardIndex()

Review Comment:
   Unused?



##########
accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java:
##########
@@ -91,43 +101,47 @@ protected void onDone(Done done, Throwable failure)
         }
 
         CheckStatusOkFull merged = (CheckStatusOkFull) this.merged;
-        switch (merged.sufficientFor(route))
+        Known known = merged.sufficientFor(route);
+        switch (known.outcome)
         {
-            case Nothing:
-            {
-                Invalidate.invalidate(node, txnId, route, route.homeKey, callback);
-                break;
-            }
-            case Definition:
-            case ExecutionOrder:
-            {
-                Txn txn = merged.partialTxn.reconstitute(route);
-                Recover.recover(node, txnId, txn, route, callback);
+            default: throw new AssertionError();
+            case OutcomeUnknown:
+                if (known.definition.isKnown())
+                {
+                    Txn txn = merged.partialTxn.reconstitute(route);
+                    Recover.recover(node, txnId, txn, route, callback);
+                }
+                else
+                {
+                    Invalidate.invalidate(node, txnId, route, callback);
+                }
                 break;
-            }
-            case Outcome:
-            {
-                assert merged.executeAt != null;
+
+            case OutcomeApplied:
+            case OutcomeKnown:
+                Preconditions.checkState(known.definition.isKnown());
+                Preconditions.checkState(known.order.hasExecuteAt());
                 // TODO: we might not be able to reconstitute Txn if we have GC'd on some shards
                 Txn txn = merged.partialTxn.reconstitute(route);
-                if (merged.committedDeps.covers(route))
+                if (known.order.hasDeps())

Review Comment:
   Is it really better not to assert this on the actual deps?



##########
accord-core/src/main/java/accord/local/Status.java:
##########
@@ -131,8 +290,45 @@ public boolean hasBeen(Status equalOrGreaterThan)
         return compareTo(equalOrGreaterThan) >= 0;
     }
 
-    public boolean isAtLeast(Phase equalOrGreaterThan)
+    public static <T> T max(List<T> list, Function<T, Status> getStatus, Function<T, Ballot> getAccepted, Predicate<T> filter)
+    {
+        T max = null;
+        Status maxStatus = null;
+        Ballot maxAccepted = null;
+        for (T item : list)
+        {
+            if (!filter.test(item))
+                continue;
+
+            Status status = getStatus.apply(item);
+            Ballot accepted = getAccepted.apply(item);
+            boolean update = max == null
+                          || maxStatus.phase.compareTo(status.phase) < 0
+                          || (status.phase.equals(Phase.Accept) && maxAccepted.compareTo(accepted) < 0);

Review Comment:
   Why is the `Ballot` from the `Accept` phase important vs a ballot from a phase later than accept?



##########
accord-core/src/main/java/accord/coordinate/tracking/RecoveryTracker.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate.tracking;
+
+import accord.coordinate.tracking.QuorumTracker.QuorumShardTracker;
+import accord.local.Node;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+
+import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+
+public class RecoveryTracker extends AbstractTracker<RecoveryTracker.RecoveryShardTracker, Node.Id>
+{
+    public static class RecoveryShardTracker extends QuorumShardTracker
+    {
+        protected int fastPathRejects = 0;
+
+        public RecoveryShardTracker(Shard shard)

Review Comment:
   Why did these 3 need to become public?



##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -351,13 +369,14 @@ public enum CommitOutcome { Success, Redundant, Insufficient }
     // relies on mutual exclusion for each key
     public CommitOutcome commit(SafeCommandStore safeStore, AbstractRoute route, @Nullable RoutingKey progressKey, @Nullable PartialTxn partialTxn, Timestamp executeAt, PartialDeps partialDeps)
     {
-        if (hasBeen(Committed))
+        if (hasBeen(PreCommitted))

Review Comment:
   What is going on here with Precommitted vs Committed?



##########
accord-core/src/main/java/accord/local/Status.java:
##########
@@ -131,8 +290,45 @@ public boolean hasBeen(Status equalOrGreaterThan)
         return compareTo(equalOrGreaterThan) >= 0;
     }
 
-    public boolean isAtLeast(Phase equalOrGreaterThan)
+    public static <T> T max(List<T> list, Function<T, Status> getStatus, Function<T, Ballot> getAccepted, Predicate<T> filter)
+    {
+        T max = null;
+        Status maxStatus = null;
+        Ballot maxAccepted = null;
+        for (T item : list)
+        {
+            if (!filter.test(item))
+                continue;
+
+            Status status = getStatus.apply(item);
+            Ballot accepted = getAccepted.apply(item);
+            boolean update = max == null
+                          || maxStatus.phase.compareTo(status.phase) < 0
+                          || (status.phase.equals(Phase.Accept) && maxAccepted.compareTo(accepted) < 0);
+
+            if (!update)
+                continue;
+
+            max = item;
+            maxStatus = status;
+            maxAccepted = accepted;
+        }
+
+        return max;
+    }
+
+    public static <T> T max(T a, Status statusA, Ballot acceptedA, T b, Status statusB, Ballot acceptedB)
+    {
+        int c = statusA.phase.compareTo(statusB.phase);
+        if (c > 0) return a;
+        if (c < 0) return b;
+        if (statusA.phase != Phase.Accept || acceptedA.compareTo(acceptedB) >= 0)

Review Comment:
   What is this bit about A's phase not being accept mattering? Don't A and B have the same phase at this point?



##########
accord-core/src/main/java/accord/topology/Topologies.java:
##########
@@ -298,10 +308,18 @@ public boolean contains(Id to)
             return false;
         }
 
+        @Override
+        public int estimateUniqueNodes()

Review Comment:
   Since `nodes()` calculates this precisely should we remember that value and then keep returning it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org