You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/07/22 20:58:55 UTC

[GitHub] [cassandra-accord] bdeggleston commented on a diff in pull request #5: CASSANDRA-17718: Transaction Invalidation

bdeggleston commented on code in PR #5:
URL: https://github.com/apache/cassandra-accord/pull/5#discussion_r927973637


##########
accord-core/src/main/java/accord/coordinate/Coordinate.java:
##########
@@ -1,46 +1,283 @@
 package accord.coordinate;
 
-import accord.api.ConfigurationService;
-
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
 
 import accord.api.Key;
-import accord.local.Node;
 import accord.api.Result;
-import accord.local.Node;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.topology.Shard;
+import accord.topology.Topologies;
 import accord.txn.Ballot;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.txn.Dependencies;
+import accord.local.Node.Id;
+import accord.txn.Timestamp;
+import accord.messages.PreAccept;
+import accord.messages.PreAccept.PreAcceptOk;
 import accord.txn.Txn;
 import accord.txn.TxnId;
+import accord.messages.PreAccept.PreAcceptReply;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
-public class Coordinate
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ */
+public class Coordinate extends AsyncFuture<Result> implements Callback<PreAcceptReply>, BiConsumer<Result, Throwable>

Review Comment:
   WDYT about pulling the preaccept phase into its own class and composing preaccept and other phases in the coordinate class? On one hand, the top level operations, coordinate, recover, etc each have a unique first stage, so I understand the reasoning for bundling them into their respective classes. OTOH, each operation is a composition of several phases, and it would make more sense to me if they were modeled that way.



##########
accord-core/src/main/java/accord/impl/SimpleProgressLog.java:
##########
@@ -129,6 +131,7 @@ boolean isAtLeast(GlobalStatus equalOrGreaterThan)
         Set<Id> globalNotPersisted;
         GlobalPendingDurable globalPendingDurable;
 
+        Object debugInvestigating;

Review Comment:
   I'm assuming this can be removed?



##########
accord-core/src/test/java/accord/burn/BurnTest.java:
##########
@@ -229,6 +233,7 @@ static void burn(Random random, TopologyFactory topologyFactory, List<Id> client
     public static void main(String[] args) throws Exception
     {
         Long overrideSeed = null;
+//        Long overrideSeed = -7320078316311161123L;

Review Comment:
   leftover seed



##########
accord-core/src/main/java/accord/coordinate/Coordinate.java:
##########
@@ -1,46 +1,283 @@
 package accord.coordinate;
 
-import accord.api.ConfigurationService;
-
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
 
 import accord.api.Key;
-import accord.local.Node;
 import accord.api.Result;
-import accord.local.Node;
+import accord.coordinate.tracking.FastPathTracker;
+import accord.topology.Shard;
+import accord.topology.Topologies;
 import accord.txn.Ballot;
+import accord.messages.Callback;
+import accord.local.Node;
+import accord.txn.Dependencies;
+import accord.local.Node.Id;
+import accord.txn.Timestamp;
+import accord.messages.PreAccept;
+import accord.messages.PreAccept.PreAcceptOk;
 import accord.txn.Txn;
 import accord.txn.TxnId;
+import accord.messages.PreAccept.PreAcceptReply;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.Future;
 
-public class Coordinate
+/**
+ * Perform initial rounds of PreAccept and Accept until we have reached agreement about when we should execute.
+ * If we are preempted by a recovery coordinator, we abort and let them complete (and notify us about the execution result)
+ */
+public class Coordinate extends AsyncFuture<Result> implements Callback<PreAcceptReply>, BiConsumer<Result, Throwable>
 {
-    private static Future<Result> fetchEpochOrExecute(Node node, Agreed agreed)
+    static class ShardTracker extends FastPathTracker.FastPathShardTracker
+    {
+        public ShardTracker(Shard shard)
+        {
+            super(shard);
+        }
+
+        @Override
+        public boolean includeInFastPath(Node.Id node, boolean withFastPathTimestamp)
+        {
+            return withFastPathTimestamp && shard.fastPathElectorate.contains(node);
+        }
+
+        @Override
+        public boolean hasMetFastPathCriteria()
+        {
+            return fastPathAccepts >= shard.fastPathQuorumSize;
+        }
+    }
+
+    static class PreacceptTracker extends FastPathTracker<ShardTracker>
+    {
+        volatile long supersedingEpoch = -1;
+        private final boolean fastPathPermitted;
+        private final Set<Id> successes = new HashSet<>();
+        private Set<Id> failures;
+
+        public PreacceptTracker(Topologies topologies, boolean fastPathPermitted)
+        {
+            super(topologies, Coordinate.ShardTracker[]::new, Coordinate.ShardTracker::new);
+            this.fastPathPermitted = fastPathPermitted;
+        }
+
+        public PreacceptTracker(Topologies topologies)
+        {
+            this(topologies, topologies.fastPathPermitted());
+        }
+
+        @Override
+        public boolean failure(Id node)
+        {
+            if (failures == null)
+                failures = new HashSet<>();
+            failures.add(node);
+            return super.failure(node);
+        }
+
+        @Override
+        public void recordSuccess(Id node, boolean withFastPathTimestamp)
+        {
+            successes.add(node);
+            super.recordSuccess(node, withFastPathTimestamp);
+        }
+
+        public void recordSuccess(Id node)
+        {
+            recordSuccess(node, false);
+        }
+
+        public synchronized boolean recordSupersedingEpoch(long epoch)
+        {
+            if (epoch <= supersedingEpoch)
+                return false;
+            supersedingEpoch = epoch;
+            return true;
+        }
+
+        public boolean hasSupersedingEpoch()
+        {
+            return supersedingEpoch > 0;
+        }
+
+        public PreacceptTracker withUpdatedTopologies(Topologies topologies)
+        {
+            PreacceptTracker tracker = new PreacceptTracker(topologies, false);
+            successes.forEach(tracker::recordSuccess);
+            if (failures != null)
+                failures.forEach(tracker::failure);
+            return tracker;
+        }
+
+        @Override
+        public boolean hasMetFastPathCriteria()
+        {
+            return fastPathPermitted && super.hasMetFastPathCriteria();
+        }
+
+        boolean shouldSlowPathAccept()
+        {
+            return (!fastPathPermitted || !hasInFlight()) && hasReachedQuorum();
+        }
+    }
+
+    final Node node;
+    final TxnId txnId;
+    final Txn txn;
+    final Key homeKey;
+
+    private PreacceptTracker tracker;
+    private final List<PreAcceptOk> preAcceptOks = new ArrayList<>();
+    private boolean isPreAccepted;

Review Comment:
   If we decide to not separate preaccept from coordinate: `isDone` might be too generic for the Coordinate class, but `isPreaccepted` implies (to me) that preaccept was successful, and is odd to see set to true in `onFailure`. Maybe `preacceptDone` or something similar?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org