You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by "aweisberg (via GitHub)" <gi...@apache.org> on 2023/04/10 21:13:22 UTC

[GitHub] [cassandra-accord] aweisberg commented on a diff in pull request #38: CASSANDRA-18364: CEP-15: (C*) Accord message processing should avoid being passed on to a Stage and run directly in the messageing handler

aweisberg commented on code in PR #38:
URL: https://github.com/apache/cassandra-accord/pull/38#discussion_r1156293727


##########
accord-core/src/main/java/accord/messages/Commit.java:
##########
@@ -162,7 +162,8 @@ public synchronized ReadNack apply(SafeCommandStore safeStore)
             case Insufficient:
                 SafeCommand safeCommand = safeStore.command(txnId);
                 Invariants.checkState(!safeCommand.current().known().isDefinitionKnown());
-                if (defer == null)
+                // When isDone=true this.process() is called and all CommandStores run apply again; so safe to create a new one
+                if (defer == null || defer.isDone)

Review Comment:
   I need to look closer, but why would we need to defer multiple times, shouldn't defer wait until the read is possible?
   
   Is this a case where defer is created, and completed before all command stores were added to the defer?



##########
accord-core/src/main/java/accord/coordinate/Persist.java:
##########
@@ -94,13 +94,13 @@ public void onSuccess(Id from, ApplyReply reply)
                     {
                         // TODO (low priority, consider, efficiency): send to non-home replicas also, so they may clear their log more easily?
                         Shard homeShard = node.topology().forEpochIfKnown(route.homeKey(), txnId.epoch());
-                        node.send(homeShard, new InformHomeDurable(txnId, route.homeKey(), executeAt, Durable, persistedOn));
+                        node.send(homeShard, new InformHomeDurable(txnId, route.homeKey(), executeAt, Durable, new HashSet<>(persistedOn)));

Review Comment:
   I get this fixes a potential concurrent modification, but on an unrelated note I am a little surprised to notice that we send an `InformHomeDurable` from every single `Persist`. Seems like in most cases it's already the original coordinator running this and it's just sending a message to itself. I guess that is basically free and it works for recovery coordinators as well?



##########
accord-core/src/main/java/accord/local/CommandStores.java:
##########
@@ -416,6 +422,35 @@ public synchronized void shutdown()
             shard.store.shutdown();
     }
 
+    public CommandStore from(Routables<?, ?> routables)
+    {
+        return from(routables::intersects);
+    }
+
+    public CommandStore from(RoutingKey key)
+    {
+        return  from(ranges -> ranges.contains(key));
+    }
+
+    private CommandStore from(Predicate<Ranges> fn)
+    {
+        ShardHolder[] shards = current.shards;
+        for (ShardHolder holder : shards)
+        {
+            if (fn.test(holder.ranges().currentRanges()))
+                return holder.store;
+        }
+        return any();
+    }
+
+    @VisibleForTesting
+    public CommandStore any()

Review Comment:
   I low key wish that this method didn't have to exist because. Even having the lookup by key/range/routable is somewhat undesirable because in so many cases you already know what the correct command store and should use that.
   
   What are the cases where we should be picking a random one vs just continuing to operate in the Netty thread?



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -331,6 +331,62 @@ public static <V> AsyncChain<V> failure(Throwable failure)
         return new Immediate<>(failure);
     }
 
+    public static <V, T> AsyncChain<T> map(AsyncChain<V> chain, Function<? super V, ? extends T> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()

Review Comment:
   This calls `flatMap` not map?



##########
accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java:
##########
@@ -269,10 +272,15 @@ public synchronized void acknowledgeEpoch(long epoch)
     {
         epochs.acknowledge(epoch);
         Topology topology = getTopologyForEpoch(epoch);
-        Node originator = lookup.apply(node);
+        Node originator = originator();
         topologyUpdates.syncEpoch(originator, epoch - 1, topology.nodes());
     }
 
+    private Node originator()
+    {
+        return lookup.apply(node);

Review Comment:
   `MessageSink` is unused now



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -331,6 +331,62 @@ public static <V> AsyncChain<V> failure(Throwable failure)
         return new Immediate<>(failure);
     }
 
+    public static <V, T> AsyncChain<T> map(AsyncChain<V> chain, Function<? super V, ? extends T> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()

Review Comment:
   This calls `flatMap` not `map`?



##########
accord-core/src/main/java/accord/local/Node.java:
##########
@@ -203,7 +203,7 @@ public void withEpoch(long epoch, Runnable runnable)
         else
         {
             configService.fetchTopologyForEpoch(epoch);
-            topology.awaitEpoch(epoch).addCallback(runnable);
+            topology.awaitEpoch(epoch).addCallback(runnable).begin(agent());

Review Comment:
   Yikes, so without this fix `runnable` never runs?



##########
accord-core/src/main/java/accord/topology/TopologyManager.java:
##########
@@ -259,9 +260,14 @@ public synchronized void onTopologyUpdate(Topology topology)
             toComplete.trySuccess(null);
     }
 
-    public synchronized AsyncResult<Void> awaitEpoch(long epoch)
+    public synchronized AsyncChain<Void> awaitEpoch(long epoch)
     {
-        return epochs.awaitEpoch(epoch);
+        AsyncResult<Void> result = epochs.awaitEpoch(epoch);

Review Comment:
   This doesn't preserve the immediate success behavior, of `awaitEpoch`, but I don't think it matters because all the callers are first checking if the `Epoch` is already present.



##########
accord-core/src/main/java/accord/utils/async/AsyncChains.java:
##########
@@ -331,6 +331,62 @@ public static <V> AsyncChain<V> failure(Throwable failure)
         return new Immediate<>(failure);
     }
 
+    public static <V, T> AsyncChain<T> map(AsyncChain<V> chain, Function<? super V, ? extends T> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()
+        {
+            @Override
+            protected void start(BiConsumer<? super T, Throwable> callback)
+            {
+                try
+                {
+                    executor.execute(() -> {
+                        try
+                        {
+                            callback.accept(mapper.apply(v), null);
+                        }
+                        catch (Throwable t)
+                        {
+                            callback.accept(null, t);
+                        }
+                    });
+                }
+                catch (Throwable t)
+                {
+                    callback.accept(null, t);
+                }
+            }
+        });
+    }
+
+    public static <V, T> AsyncChain<T> flatMap(AsyncChain<V> chain, Function<? super V, ? extends AsyncChain<T>> mapper, Executor executor)
+    {
+        return chain.flatMap(v -> new Head<T>()
+        {
+            @Override
+            protected void start(BiConsumer<? super T, Throwable> callback)
+            {
+                try
+                {
+                    executor.execute(() -> {
+                        try
+                        {
+                            mapper.apply(v).addCallback(callback);
+                        }
+                        catch (Throwable t)
+                        {
+                            callback.accept(null, t);
+                        }
+                    });
+                }
+                catch (Throwable t)
+                {
+                    callback.accept(null, t);

Review Comment:
   This doesn't run in the expected executor, maybe it should go to the agent? And same above.
   
   Could be surprising cleanup code doesn't run in the executor requested.



##########
accord-core/src/main/java/accord/api/MessageSink.java:
##########
@@ -27,6 +28,6 @@
 public interface MessageSink
 {
     void send(Id to, Request request);
-    void send(Id to, Request request, Callback callback);
+    void send(Id to, Request request, CommandStore commandStore, Callback callback);

Review Comment:
   Maybe make the storage for `Callback` intrusive so it doesn't need to be a separate allocation?
   
   It would take a little jiggering for it to be a base class since it is currently an interface.
   
   I don't feel strongly about it.



##########
accord-core/src/test/java/accord/burn/TopologyUpdates.java:
##########
@@ -142,7 +140,7 @@ else if (outcome == Nothing)
                         dieExceptionally(invalidate.addCallback(((unused, failure) -> onDone.accept(failure == null))).beginAsResult());
                 }
                 return null;
-            }).beginAsResult();
+            }, node.commandStores().any()).beginAsResult();

Review Comment:
   Why is a random command store acceptable/necessary here? Because the contract for send eventually requires there to be a command store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org