You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/10/12 20:29:35 UTC

[GitHub] [cassandra-accord] dcapwell commented on a diff in pull request #10: Metadata persistence

dcapwell commented on code in PR #10:
URL: https://github.com/apache/cassandra-accord/pull/10#discussion_r993818406


##########
accord-core/src/main/java/accord/local/Command.java:
##########
@@ -380,119 +402,162 @@ public void onChange(Command command)
             case Executed:
             case Applied:
             case Invalidated:
-                if (waitingOnApply != null)
+                if (isUnableToApply())
                 {
                     updatePredecessor(command);
-                    if (waitingOnCommit != null)
+                    if (isWaitingOnCommit())
                     {
-                        if (waitingOnCommit.remove(command.txnId) != null && waitingOnCommit.isEmpty())
-                            waitingOnCommit = null;
+                        removeWaitingOnCommit(command);
                     }
-                    if (waitingOnCommit == null && waitingOnApply.isEmpty())
-                        waitingOnApply = null;
                 }
                 else
                 {
                     command.removeListener(this);
                 }
-                maybeExecute(true);
+                maybeExecute(false);
                 break;
         }
     }
 
-    private void maybeExecute(boolean notifyListeners)
+    @Override
+    public void onChange(Command command)
     {
-        if (status != Committed && status != Executed)
-            return;
+        onChangeInternal(command);
+    }
+
+    protected void postApply()
+    {
+        logger.trace("{} applied, setting status to Applied and notifying listeners", txnId());
+        status(Applied);
+        notifyListeners();
+    }
+
+    private static Function<CommandStore, Void> callPostApply(TxnId txnId)
+    {
+        return commandStore -> {
+            commandStore.command(txnId).postApply();
+            return null;
+        };
+    }
+
+    protected Future<Void> apply()
+    {
+        // important: we can't include a reference to *this* in the lambda, since the C* implementation may evict
+        // the command instance from memory between now and the write completing (and post apply being called)
+        return writes().apply(commandStore()).flatMap(unused ->
+            commandStore().process(this, callPostApply(txnId()))
+        );
+    }
+
+    public Txn.ReadFuture read(Keys scope)
+    {
+        return txn().read(this, scope);
+    }
 
-        if (waitingOnApply != null)
+    private Future<Void> maybeExecute(boolean notifyListenersOnNoop)
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("{}: Maybe executing with status {}. Will notify listeners on noop: {}", txnId(), status(), notifyListenersOnNoop);
+
+        if (status() != Committed && status() != Executed)
+        {
+            if (notifyListenersOnNoop) notifyListeners();
+            return Writes.SUCCESS;
+        }
+
+        if (isUnableToApply())
         {
             BlockedBy blockedBy = blockedBy();
             if (blockedBy != null)
             {
-                commandStore.progressLog().waiting(blockedBy.txnId, blockedBy.someKeys);
-                return;
+                logger.trace("{}: not executing, blocked on {}", txnId(), blockedBy.command.txnId());
+                commandStore().progressLog().waiting(blockedBy.command, blockedBy.someKeys);
+                if (notifyListenersOnNoop) notifyListeners();
+                return Writes.SUCCESS;
             }
-            assert waitingOnApply == null;
+            assert !isWaitingOnApply();
         }
 
-        switch (status)
+        switch (status())
         {
             case Committed:
                 // TODO: maintain distinct ReadyToRead and ReadyToWrite states
-                status = ReadyToExecute;
-                boolean isProgressShard = progressKey != null && handles(txnId.epoch, progressKey);
-                commandStore.progressLog().readyToExecute(txnId, isProgressShard, isProgressShard && progressKey.equals(homeKey));
-                if (notifyListeners)
-                    listeners.forEach(this);
+                status(ReadyToExecute);
+                logger.trace("{}: set to ReadyToExecute", txnId());
+                boolean isProgressShard = progressKey() != null && handles(txnId().epoch, progressKey());
+                commandStore().progressLog().readyToExecute(this, isProgressShard, isProgressShard && progressKey().equals(homeKey()));
+                notifyListeners();
                 break;
             case Executed:
-                writes.apply(commandStore);
-                status = Applied;
-                if (notifyListeners)
-                    listeners.forEach(this);
+                logger.trace("{}: applying", txnId());
+                if (notifyListenersOnNoop) notifyListeners();
+                return apply();
         }
+        return Writes.SUCCESS;
     }
 
     /**
      * @param dependency is either committed or invalidated
      */
-    private void updatePredecessor(Command dependency)
+    private void updatePredecessor(PartialCommand dependency)
     {
         Preconditions.checkState(dependency.hasBeen(Committed));
         if (dependency.hasBeen(Invalidated))
         {
+            logger.trace("{}: {} is invalidated. Stop listening and removing from waiting on commit set.", txnId(), dependency.txnId());
             dependency.removeListener(this);
-            if (waitingOnCommit.remove(dependency.txnId) != null && waitingOnCommit.isEmpty())
-                waitingOnCommit = null;
+            removeWaitingOnCommit(dependency);
         }
-        else if (dependency.executeAt.compareTo(executeAt) > 0)
+        else if (dependency.executeAt().compareTo(executeAt()) > 0)
         {
             // cannot be a predecessor if we execute later
+            logger.trace("{}: {} executes after us. Stop listening.", txnId(), dependency.txnId());
             dependency.removeListener(this);
         }
         else if (dependency.hasBeen(Applied))
         {
-            waitingOnApply.remove(dependency.executeAt);
+            logger.trace("{}: {} has been applied. Stop listening and removing from waiting on apply set.", txnId(), dependency.txnId());
+            removeWaitingOnApply(dependency);
             dependency.removeListener(this);
         }
         else
         {
-            waitingOnApply.putIfAbsent(dependency.executeAt, dependency);
+            logger.trace("{}: adding {} to waiting on apply set.", txnId(), dependency.txnId());
+            addWaitingOnApplyIfAbsent(dependency);
         }
     }
 
     // TEMPORARY: once we can invalidate commands that have not been witnessed on any shard, we do not need to know the home shard
     static class BlockedBy
     {
-        final TxnId txnId;
+        final PartialCommand command;
         final Keys someKeys;
 
-        BlockedBy(TxnId txnId, Keys someKeys)
+        BlockedBy(PartialCommand command, Keys someKeys)
         {
-            this.txnId = txnId;
+            this.command = command;
             this.someKeys = someKeys;
         }
     }
 
     public BlockedBy blockedBy()
     {
         Command prev = this;
-        Command cur = directlyBlockedBy();
+        PartialCommand cur = directlyBlockedBy();
         if (cur == null)
             return null;
 
-        Command next;
-        while (null != (next = cur.directlyBlockedBy()))
-        {
-            prev = cur;
-            cur = next;
-        }
+//        Command next;

Review Comment:
   can you remove?  no need to keep the code commented out.



##########
accord-core/src/main/java/accord/utils/ReducingFuture.java:
##########
@@ -0,0 +1,57 @@
+package accord.utils;
+
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiFunction;
+
+public class ReducingFuture<V> extends AsyncPromise<V>
+{
+    private static final AtomicIntegerFieldUpdater<ReducingFuture> PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, "pending");
+    private final List<? extends Future<V>> futures;
+    private final BiFunction<V, V, V> reducer;
+    private volatile int pending;
+
+    protected ReducingFuture(List<? extends Future<V>> futures, BiFunction<V, V, V> reducer)
+    {
+        this.futures = futures;
+        this.reducer = reducer;
+        this.pending = futures.size();
+        if (futures.size() == 0)
+            trySuccess(null);

Review Comment:
   This also gets back to the `zero` point I made in `reduce`, `org.apache.cassandra.service.accord.db.AccordData#merge(accord.api.Data)` doesn't handle `null` so its not clear from the API that `null` will be "empty".  In this context we would call `Txn.read` and not actually read anything (no command store intersects the key), so this would be a bug... by changing to `null` we push this bug into the user of the future who then has to figure out that this happened, which I think will NPE in `accord.messages.ReadData.LocalRead#readComplete` or `org.apache.cassandra.io.IVersionedSerializer#serialize` depending on ordering...



##########
accord-core/src/main/java/accord/utils/ReducingFuture.java:
##########
@@ -0,0 +1,57 @@
+package accord.utils;
+
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiFunction;
+
+public class ReducingFuture<V> extends AsyncPromise<V>
+{
+    private static final AtomicIntegerFieldUpdater<ReducingFuture> PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, "pending");
+    private final List<? extends Future<V>> futures;
+    private final BiFunction<V, V, V> reducer;
+    private volatile int pending;
+
+    protected ReducingFuture(List<? extends Future<V>> futures, BiFunction<V, V, V> reducer)
+    {
+        this.futures = futures;
+        this.reducer = reducer;
+        this.pending = futures.size();
+        if (futures.size() == 0)
+            trySuccess(null);
+        futures.forEach(f -> f.addListener(this::operationComplete));
+    }
+
+    private <F extends io.netty.util.concurrent.Future<?>> void operationComplete(F future) throws Exception
+    {
+        if (isDone())
+            return;
+
+        if (!future.isSuccess())
+        {
+            tryFailure(future.cause());
+        }
+        else if (PENDING_UPDATER.decrementAndGet(this) == 0)
+        {
+            V result = futures.get(0).getNow();
+            for (int i=1, mi=futures.size(); i<mi; i++)
+                result = reducer.apply(result, futures.get(i).getNow());
+
+            trySuccess(result);
+        }
+    }
+
+    public static <T> Future<T> reduce(List<? extends Future<T>> futures, BiFunction<T, T, T> reducer)
+    {
+        if (futures.isEmpty())
+            return ImmediateFuture.success(null);

Review Comment:
   `reduce` should take a `zero` (called `identity` in `Stream` API) argument if we want to allow empty, else we should reject as invalid... The 3 call paths ATM just happen to return `null` but that isn't universal (and this class is trying to be so).



##########
accord-core/src/main/java/accord/messages/Apply.java:
##########
@@ -54,13 +62,42 @@ public Apply(Node.Id to, Topologies topologies, TxnId txnId, Txn txn, Key homeKe
         this.result = result;
     }
 
+    @VisibleForImplementation
+    public Apply(Keys scope, long waitForEpoch, TxnId txnId, Txn txn, Key homeKey, Timestamp executeAt, Deps deps, Writes writes, Result result)
+    {
+        super(scope, waitForEpoch);
+        this.txnId = txnId;
+        this.txn = txn;
+        this.homeKey = homeKey;
+        this.executeAt = executeAt;
+        this.deps = deps;
+        this.writes = writes;
+        this.result = result;
+    }
+
     public void process(Node node, Id replyToNode, ReplyContext replyContext)
     {
-        Key progressKey = node.trySelectProgressKey(txnId, txn.keys, homeKey);
-        node.forEachLocalSince(scope(), executeAt,
-                               instance -> instance.command(txnId).apply(txn, homeKey, progressKey, executeAt, deps, writes, result));
-        // note, we do not also commit here if txnId.epoch != executeAt.epoch, as the scope() for a commit would be different
-        node.reply(replyToNode, replyContext, ApplyOk.INSTANCE);
+        Key progressKey = node.trySelectProgressKey(txnId, txn.keys(), homeKey);
+        List<Future<Void>> futures = node.mapLocalSince(this, scope(), executeAt, instance -> instance.command(txnId).apply(txn, homeKey, progressKey, executeAt, deps, writes, result));
+
+        ReducingFuture.reduce(futures, (l, r) -> null).addCallback((unused, failure) -> {
+            if (failure == null)

Review Comment:
   I am questioning what we can do about the failure other than log...  If `org.apache.cassandra.service.accord.db.AccordWrite#apply` or `accord.local.CommandStore#process(accord.local.PreLoadContext, java.util.function.Function<? super accord.local.CommandStore,T>)` fails we would get a failure log...



##########
accord-core/src/main/java/accord/primitives/Deps.java:
##########
@@ -388,7 +390,8 @@ public static <T> Deps merge(List<T> merge, Function<T, Deps> getter)
     // Lazy loaded in ensureTxnIdToKey()
     int[] txnIdToKey; // TxnId -> [Key]
 
-    Deps(Keys keys, TxnId[] txnIds, int[] keyToTxnId)
+    @VisibleForImplementation

Review Comment:
   I am 100% cool with `unsafe` static method as it makes the code clear.



##########
accord-core/src/main/java/accord/api/Read.java:
##########
@@ -29,5 +32,5 @@
 public interface Read
 {
     Keys keys();
-    Data read(Key key, Timestamp executeAt, DataStore store);
+    Future<Data> read(Key key, boolean forWriteTxn, CommandStore commandStore, Timestamp executeAt, DataStore store);

Review Comment:
   why was `forWriteTxn` added?



##########
accord-core/src/main/java/accord/utils/ReducingFuture.java:
##########
@@ -0,0 +1,57 @@
+package accord.utils;
+
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiFunction;
+
+public class ReducingFuture<V> extends AsyncPromise<V>
+{
+    private static final AtomicIntegerFieldUpdater<ReducingFuture> PENDING_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ReducingFuture.class, "pending");
+    private final List<? extends Future<V>> futures;
+    private final BiFunction<V, V, V> reducer;
+    private volatile int pending;
+
+    protected ReducingFuture(List<? extends Future<V>> futures, BiFunction<V, V, V> reducer)
+    {
+        this.futures = futures;
+        this.reducer = reducer;
+        this.pending = futures.size();
+        if (futures.size() == 0)
+            trySuccess(null);

Review Comment:
   rather than duplicate the logic, can `accord.txn.Txn.ReadFuture#ReadFuture` be rewritten to use `accord.utils.ReducingFuture#reduce`?  It looks like the whole reason the `ReadFuture` exists is because `org.apache.cassandra.service.accord.AccordCommand#read` has a cache and wants to check the `scope`... but we know the `scope` when when we are putting into the cache, so we could avoid having `ReadFuture` and move this logic to the cache in C*.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org