You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/03/14 09:08:25 UTC

[GitHub] [cassandra-accord] belliottsmith commented on a change in pull request #1: Topology change

belliottsmith commented on a change in pull request #1:
URL: https://github.com/apache/cassandra-accord/pull/1#discussion_r824661636



##########
File path: accord-core/src/main/java/accord/txn/Keys.java
##########
@@ -115,4 +132,52 @@ public static Keys of(Key k0, Key... kn)
 
         return new Keys(keys);
     }
+
+    public Keys intersection(KeyRanges ranges)
+    {
+        Key[] result = null;
+        int resultSize = 0;
+
+        int keyLB = 0;
+        int keyHB = size();
+        int rangeLB = 0;
+        int rangeHB = ranges.rangeIndexForKey(keys[keyHB-1]);
+        rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1;
+
+        for (;rangeLB<rangeHB && keyLB<keyHB;)
+        {
+            Key key = keys[keyLB];
+            rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key);

Review comment:
       ranges.size()->rangeHB?

##########
File path: accord-core/src/main/java/accord/topology/Topology.java
##########
@@ -61,25 +58,93 @@ public Topology(Shard... shards)
         {
             int[] supersetIndexes = e.getValue().stream().mapToInt(i -> i).toArray();
             KeyRanges ranges = this.ranges.select(supersetIndexes);
-            nodeLookup.put(e.getKey(), new Shards.NodeInfo(ranges, supersetIndexes));
+            nodeLookup.put(e.getKey(), new NodeInfo(ranges, supersetIndexes));
         }
     }
 
-    public Topology(Shard[] shards, KeyRanges ranges, Map<Id, Shards.NodeInfo> nodeLookup, KeyRanges subsetOfRanges, int[] supersetIndexes)
+    public Topology(long epoch, Shard[] shards, KeyRanges ranges, Map<Id, NodeInfo> nodeLookup, KeyRanges subsetOfRanges, int[] supersetIndexes)
     {
+        this.epoch = epoch;
         this.shards = shards;
         this.ranges = ranges;
         this.nodeLookup = nodeLookup;
         this.subsetOfRanges = subsetOfRanges;
         this.supersetRangeIndexes = supersetIndexes;
     }
 
-    public Shards forNode(Id node)
+    @Override
+    public String toString()
+    {
+        return "Topology{" + "epoch=" + epoch + ", " + super.toString() + '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Topology that = (Topology) o;
+        if (this.epoch != that.epoch || this.size() != that.size() || !this.subsetOfRanges.equals(that.subsetOfRanges))
+            return false;
+
+        for (int i=0, mi=this.size(); i<mi; i++)
+        {
+            if (!this.get(i).equals(that.get(i)))
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = Objects.hash(epoch, ranges, nodeLookup, subsetOfRanges);
+        result = 31 * result + Arrays.hashCode(shards);
+        result = 31 * result + Arrays.hashCode(supersetRangeIndexes);
+        return result;
+    }
+
+    public static Topology select(long epoch, Shard[] shards, int[] indexes)
+    {
+        Shard[] subset = new Shard[indexes.length];
+        for (int i=0; i<indexes.length; i++)
+            subset[i] = shards[indexes[i]];
+
+        return new Topology(epoch, subset);
+    }
+
+    public boolean isSubset()
+    {
+        return supersetRangeIndexes.length < shards.length;
+    }
+
+    public boolean isSubsetOf(Topology topology)

Review comment:
       unused

##########
File path: accord-core/src/main/java/accord/topology/TopologyManager.java
##########
@@ -0,0 +1,324 @@
+package accord.topology;
+
+import accord.api.ConfigurationService;
+import accord.coordinate.tracking.QuorumTracker;
+import accord.local.Node;
+import accord.messages.Request;
+import accord.messages.TxnRequest;
+import accord.txn.Keys;
+import accord.txn.Txn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.util.*;
+import java.util.function.LongConsumer;
+
+/**
+ * Manages topology state changes and update bookkeeping
+ *
+ * Each time the topology changes we need to:
+ * * confirm previous owners of ranges we replicate are aware of the new config
+ * * learn of any outstanding operations for ranges we replicate
+ * * clean up obsolete data
+ *
+ * Assumes a topology service that won't report epoch n without having n-1 etc also available
+ */
+public class TopologyManager implements ConfigurationService.Listener
+{
+    class EpochState
+    {
+        private final Topology global;
+        private final Topology local;
+        private final QuorumTracker syncTracker;
+        private boolean syncComplete = false;
+        private boolean prevSynced = false;

Review comment:
       always provided in constructor, so perhaps simply set it to the parameter's value?

##########
File path: accord-core/src/main/java/accord/messages/WaitOnCommit.java
##########
@@ -49,46 +53,55 @@ public synchronized void onChange(Command command)
 
         private void ack()
         {
-            if (--waitingOn == 0)
+            if (waitingOn.decrementAndGet() == 0)
                 node.reply(replyToNode, replyToMessage, new WaitOnCommitOk());
         }
 
-        synchronized void setup(TxnId txnId, Keys keys)
+        void process(CommandStore instance)

Review comment:
       `process` is an ambiguous name - ideally this would also be called `setup` or something equivalent, since it is only used on setup.

##########
File path: accord-core/src/main/java/accord/messages/BeginRecovery.java
##########
@@ -118,32 +132,46 @@ public void process(Node node, Id replyToNode, long replyToMessage)
             ok1.earlierAcceptedNoWitness.addAll(ok2.earlierAcceptedNoWitness);
             ok1.earlierAcceptedNoWitness.removeAll(ok1.earlierCommittedWitness);
             return new RecoverOk(
-            ok1.status,
-            Ballot.max(ok1.accepted, ok2.accepted),
-            Timestamp.max(ok1.executeAt, ok2.executeAt),
-            deps,
-            ok1.earlierCommittedWitness,
-            ok1.earlierAcceptedNoWitness,
-                ok1.rejectsFastPath | ok2.rejectsFastPath,
-            ok1.writes, ok1.result);
+                    txnId, ok1.status,
+                    Ballot.max(ok1.accepted, ok2.accepted),
+                    Timestamp.max(ok1.executeAt, ok2.executeAt),
+                    deps,
+                    ok1.earlierCommittedWitness,
+                    ok1.earlierAcceptedNoWitness,
+                    ok1.rejectsFastPath | ok2.rejectsFastPath,
+                    ok1.writes, ok1.result);
         }).orElseThrow();
 
         node.reply(replyToNode, replyToMessage, reply);
         if (reply instanceof RecoverOk && ((RecoverOk) reply).status == Applied)
         {
             // disseminate directly
             RecoverOk ok = (RecoverOk) reply;
-            node.send(node.cluster().forKeys(txn.keys), new Apply(txnId, txn, ok.executeAt, ok.deps, ok.writes, ok.result));
+            ConfigurationService configService = node.configService();
+            if (ok.executeAt.epoch > configService.currentEpoch())
+            {
+                configService.fetchTopologyForEpoch(ok.executeAt.epoch, () -> disseminateApply(node, ok));
+                return;
+            }
+            disseminateApply(node, ok);
         }
     }
 
+    private void disseminateApply(Node node, RecoverOk ok)

Review comment:
       Are we confident it is safe to only send to the latest epoch, and not the possibly earlier `executeAt` epoch, whose members might be waiting on the result to e.g. finish transactions they must sync to the new members?
   
   I can always assess as part of liveness work when I rebase onto this patch.

##########
File path: accord-core/src/main/java/accord/coordinate/Agree.java
##########
@@ -137,13 +232,13 @@ private void onPreAccepted()
             //       but by sending accept we rule out hybrid fast-path
             permitHybridFastPath = executeAt.compareTo(txnId) == 0;
 
-            startAccept(executeAt, deps);
+            startAccept(executeAt, deps, tracker.topologies());
         }
     }
 
     private boolean shouldSlowPathAccept()
     {
-        return !tracker.hasInFlight() && tracker.hasReachedQuorum();
+        return (!tracker.fastPathPermitted || !tracker.hasInFlight()) && tracker.hasReachedQuorum();

Review comment:
       Perhaps encapsulate some of this in `tracker`?

##########
File path: accord-core/src/main/java/accord/local/CommandStore.java
##########
@@ -283,13 +260,35 @@ void processInternal(Consumer<? super CommandStore> consumer, CompletableFuture<
 
     public abstract CompletionStage<Void> process(Consumer<? super CommandStore> consumer);
 
+    public void processBlocking(Consumer<? super CommandStore> consumer)
+    {
+        try
+        {
+            process(consumer).toCompletableFuture().get();
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();

Review comment:
       Should we propagate some kind of exception so this doesn't effectively report success?

##########
File path: accord-core/src/main/java/accord/local/Node.java
##########
@@ -91,19 +86,69 @@ public static int numCommandShards()
     private final Map<TxnId, CompletionStage<Result>> coordinating = new ConcurrentHashMap<>();
     private final Set<TxnId> pendingRecovery = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-    public Node(Id id, Topology cluster, MessageSink messageSink, Random random, LongSupplier nowSupplier,
+    public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier,
                 Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStore.Factory commandStoreFactory)
     {
         this.id = id;
-        this.cluster = cluster;
-        this.random = random;
         this.agent = agent;
-        this.now = new AtomicReference<>(new Timestamp(nowSupplier.getAsLong(), 0, id));
         this.messageSink = messageSink;
+        this.configService = configService;
+        this.topology = new TopologyManager(id, configService::reportEpoch);
+        Topology topology = configService.currentTopology();
+        this.now = new AtomicReference<>(new Timestamp(topology.epoch(), nowSupplier.getAsLong(), 0, id));
         this.nowSupplier = nowSupplier;
         this.scheduler = scheduler;
-        this.commandStores = new CommandStores(numCommandShards(), id, this::uniqueNow, agent, dataSupplier.get(), commandStoreFactory);
-        this.commandStores.updateTopology(cluster.forNode(id));
+        this.commandStores = new CommandStores(numCommandShards(),
+                                               id,

Review comment:
       I know this is technically the spec for formatting in the project, but it's my pet peeve as I think it really harms legibility. I haven't really wanted to bring up revisions, but maybe I will do soon. In this case, we only slightly overspill the one line limit so one line would be fine, but two or three lines of grouped-related parameters would be much more legible IMO.
   
   e.g.
   
   ```
           this.commandStores = new CommandStores(numCommandShards(), id, this::uniqueNow, 
                                                  agent, dataSupplier.get(), commandStoreFactory);
   ```

##########
File path: accord-core/src/main/java/accord/local/Node.java
##########
@@ -116,84 +161,74 @@ public Timestamp uniqueNow()
         return now.updateAndGet(cur -> {
             // TODO: this diverges from proof; either show isomorphism or make consistent
             long now = nowSupplier.getAsLong();
-            if (now > cur.real) return new Timestamp(now, 0, id);
-            else return new Timestamp(cur.real, cur.logical + 1, id);
+            long epoch = Math.max(cur.epoch, topology.epoch());
+            return (now > cur.real)
+                 ? new Timestamp(epoch, now, 0, id)
+                 : new Timestamp(epoch, cur.real, cur.logical + 1, id);
         });
     }
 
     public Timestamp uniqueNow(Timestamp atLeast)
     {
         if (now.get().compareTo(atLeast) < 0)
-            now.accumulateAndGet(atLeast, (a, b) -> a.compareTo(b) < 0 ? new Timestamp(b.real, b.logical + 1, id) : a);
-
-        return now.updateAndGet(cur -> {
-            // TODO: this diverges from proof; either show isomorphism or make consistent
-            long now = nowSupplier.getAsLong();
-            if (now > cur.real) return new Timestamp(now, 0, id);
-            else return new Timestamp(cur.real, cur.logical + 1, id);
-        });
+            now.accumulateAndGet(atLeast, (current, proposed) -> {

Review comment:
       I think this has changed the semantics of this step? It seems to have been flipped, to take `atLeast` itself if it is larger (which might have the wrong `node`, but fortunately this is fixed in `uniqueNow`) but to increment current unnecessarily otherwise.
   
   I suspect previously there was a missing `return` also, as the prior increment to `proposed` (and overwriting of `node` makes it unique immediately.
   
   We could keep the basic flow of this approach, but remove the `logicalNext(id)` application to `current`, but I would probably prefer at least overwriting `id` of `proposed` to ensure the global counter always has the local node `id`.

##########
File path: accord-core/src/main/java/accord/coordinate/Agree.java
##########
@@ -45,11 +49,76 @@ public boolean hasMetFastPathCriteria()
         }
     }
 
+    static class PreacceptTracker extends FastPathTracker<ShardTracker>
+    {
+        volatile long supersedingEpoch = -1;
+        private final boolean fastPathPermitted;
+        private final Set<Id> successes = new HashSet<>();
+        private final Set<Id> failures = new HashSet<>();

Review comment:
       Initialise lazily?

##########
File path: accord-core/src/main/java/accord/messages/WaitOnCommit.java
##########
@@ -1,27 +1,31 @@
 package accord.messages;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import accord.local.*;
 import accord.local.Node.Id;
+import accord.topology.Topologies;
 import accord.txn.TxnId;
 import accord.txn.Keys;
 
-public class WaitOnCommit implements Request
+public class WaitOnCommit extends TxnRequest
 {
     static class LocalWait implements Listener
     {
         final Node node;
         final Id replyToNode;
+        final TxnId txnId;
         final long replyToMessage;
 
-        int waitingOn;
+        final AtomicInteger waitingOn = new AtomicInteger();
 
-        LocalWait(Node node, Id replyToNode, long replyToMessage)
+        LocalWait(Node node, Id replyToNode, TxnId txnId, long replyToMessage)
         {
             this.node = node;
             this.replyToNode = replyToNode;
+            this.txnId = txnId;
             this.replyToMessage = replyToMessage;
         }
 

Review comment:
       Should `onChange` no longer be synchronised?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org