You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/12/01 22:15:01 UTC

[cassandra] branch cep-15-accord updated: Invalidation fixes/improvements - Integrate accord-core changes for CASSANDRA-18057

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new a5fd6b3bbb Invalidation fixes/improvements  - Integrate accord-core changes for CASSANDRA-18057
a5fd6b3bbb is described below

commit a5fd6b3bbb83661c12e9c08a16ba3601e2302c70
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Mon Nov 21 12:22:38 2022 +0000

    Invalidation fixes/improvements
     - Integrate accord-core changes for CASSANDRA-18057
    
    patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18057
---
 .build/include-accord.sh                           |  4 +-
 .../cassandra/service/accord/AccordCommand.java    | 31 +++++++--
 .../service/accord/AccordCommandStore.java         |  6 ++
 .../cassandra/service/accord/AccordKeyspace.java   | 11 ++-
 .../service/accord/AccordPartialCommand.java       |  4 +-
 .../service/accord/async/AsyncWriter.java          |  5 +-
 .../accord/serializers/AcceptSerializers.java      | 32 +++++----
 .../serializers/BeginInvalidationSerializers.java  | 81 ++++++----------------
 .../service/accord/AccordCommandTest.java          |  3 +-
 .../service/accord/async/AsyncWriterTest.java      |  6 +-
 10 files changed, 96 insertions(+), 87 deletions(-)

diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index 0dc01bf2ba..ea1e0544df 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -24,8 +24,8 @@ set -o nounset
 
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
-accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='trunk'
+accord_repo='https://github.com/belliottsmith/cassandra-accord.git'
+accord_branch='bugfix-invalidation'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index e7e526495f..45885f657d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -18,11 +18,14 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.annotation.Nullable;
+
 import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
@@ -115,6 +118,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
     public final StoredValue<RoutingKey> homeKey;
     public final StoredValue<RoutingKey> progressKey;
     public final StoredValue<PartialTxn> partialTxn;
+    public final StoredValue<Txn.Kind> kind; // TODO: store this in TxnId
     public final StoredValue<Ballot> promised;
     public final StoredValue<Ballot> accepted;
     public final StoredValue<Timestamp> executeAt;
@@ -141,6 +145,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
         progressKey = new StoredValue<>(rw());
         route = new StoredValue<>(rw());
         partialTxn = new StoredValue<>(rw());
+        kind = new StoredValue<>(rw());
         promised = new StoredValue<>(rw());
         accepted = new StoredValue<>(rw());
         executeAt = new StoredValue<>(rw());
@@ -235,6 +240,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
         progressKey.set(null);
         route.set(null);
         partialTxn.set(null);
+        kind.set(null);
         executeAt.load(null);
         promised.set(Ballot.ZERO);
         accepted.set(Ballot.ZERO);
@@ -525,7 +531,13 @@ public class AccordCommand extends Command implements AccordState<TxnId>
     @Override
     public Txn.Kind kind()
     {
-        return partialTxn.get().kind();
+        return kind.get();
+    }
+
+    @Override
+    public void setKind(Txn.Kind kind)
+    {
+        this.kind.set(kind);
     }
 
     @Override
@@ -762,7 +774,6 @@ public class AccordCommand extends Command implements AccordState<TxnId>
         waitingOnCommit.blindAdd(txnId);
     }
 
-    @Override
     public boolean isWaitingOnCommit()
     {
         return !waitingOnCommit.getView().isEmpty();
@@ -788,7 +799,6 @@ public class AccordCommand extends Command implements AccordState<TxnId>
         waitingOnApply.blindPut(executeAt, txnId);
     }
 
-    @Override
     public boolean isWaitingOnApply()
     {
         return !waitingOnApply.getView().isEmpty();
@@ -802,10 +812,21 @@ public class AccordCommand extends Command implements AccordState<TxnId>
     }
 
     @Override
-    public TxnId firstWaitingOnApply()
+    public boolean isWaitingOnDependency()
+    {
+        return isWaitingOnCommit() || isWaitingOnApply();
+    }
+
+    @Override
+    public TxnId firstWaitingOnApply(@Nullable TxnId ifExecutesBefore)
     {
         if (!isWaitingOnApply())
             return null;
-        return waitingOnApply.getView().firstEntry().getValue();
+
+        Map.Entry<Timestamp, TxnId> first = waitingOnApply.getView().firstEntry();
+        if (ifExecutesBefore == null || first.getKey().compareTo(ifExecutesBefore) < 0)
+            return first.getValue();
+
+        return null;
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 12b84a40c4..c7348a149f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -325,6 +325,12 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
         return time.uniqueNow(max);
     }
 
+    @Override
+    public NodeTimeService time()
+    {
+        return time;
+    }
+
     public Timestamp maxConflict(Keys keys)
     {
         // TODO: efficiency
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index e17e7c01e8..8677dd8fea 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -46,6 +46,7 @@ import accord.primitives.Ballot;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Timestamp;
+import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.DeterministicIdentitySet;
@@ -142,6 +143,7 @@ public class AccordKeyspace
               + "route blob,"
               + "durability int,"
               + "txn blob,"
+              + "kind int,"
               + format("execute_at %s,", TIMESTAMP_TUPLE)
               + format("promised_ballot %s,", TIMESTAMP_TUPLE)
               + format("accepted_ballot %s,", TIMESTAMP_TUPLE)
@@ -189,6 +191,7 @@ public class AccordKeyspace
         static final ColumnMetadata route = getColumn(Commands, "route");
         static final ColumnMetadata durability = getColumn(Commands, "durability");
         static final ColumnMetadata txn = getColumn(Commands, "txn");
+        static final ColumnMetadata kind = getColumn(Commands, "kind");
         static final ColumnMetadata execute_at = getColumn(Commands, "execute_at");
         static final ColumnMetadata promised_ballot = getColumn(Commands, "promised_ballot");
         static final ColumnMetadata accepted_ballot = getColumn(Commands, "accepted_ballot");
@@ -447,6 +450,9 @@ public class AccordKeyspace
             if (command.partialTxn.hasModifications())
                 builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.partialTxn.get(), CommandsSerializers.partialTxn)));
 
+            if (command.kind.hasModifications())
+                builder.addCell(live(CommandsColumns.kind, timestampMicros, accessor.valueOf(command.kind.get().ordinal())));
+
             if (command.executeAt.hasModifications())
                 builder.addCell(live(CommandsColumns.execute_at, timestampMicros, serializeTimestamp(command.executeAt.get())));
 
@@ -457,7 +463,7 @@ public class AccordKeyspace
                 builder.addCell(live(CommandsColumns.accepted_ballot, timestampMicros, serializeTimestamp(command.accepted.get())));
 
             if (command.partialDeps.hasModifications())
-                builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serialize(command.partialDeps.get(), CommandsSerializers.partialDeps)));
+                builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serializeOrNull(command.partialDeps.get(), CommandsSerializers.partialDeps)));
 
             if (command.writes.hasModifications())
                 builder.addCell(live(CommandsColumns.writes, timestampMicros, serialize(command.writes.get(), CommandsSerializers.writes)));
@@ -593,10 +599,11 @@ public class AccordKeyspace
             // TODO: something less brittle than ordinal, more efficient than values()
             command.durability.load(Status.Durability.values()[row.getInt("durability", 0)]);
             command.partialTxn.load(deserializeOrNull(row.getBlob("txn"), CommandsSerializers.partialTxn));
+            command.kind.load(row.has("kind") ? Txn.Kind.values()[row.getInt("kind")] : null);
             command.executeAt.load(deserializeTimestampOrNull(row, "execute_at", Timestamp::new));
             command.promised.load(deserializeTimestampOrNull(row, "promised_ballot", Ballot::new));
             command.accepted.load(deserializeTimestampOrNull(row, "accepted_ballot", Ballot::new));
-            command.partialDeps.load(deserializeWithVersionOr(row, "dependencies", CommandsSerializers.partialDeps, () -> PartialDeps.NONE));
+            command.partialDeps.load(deserializeOrNull(row.getBlob("dependencies"), CommandsSerializers.partialDeps));
             command.writes.load(deserializeWithVersionOr(row, "writes", CommandsSerializers.writes, () -> null));
             command.result.load(deserializeWithVersionOr(row, "result", CommandsSerializers.result, () -> null));
             command.waitingOnCommit.load(deserializeTxnIdNavigableSet(row, "waiting_on_commit"));
diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index e0384489c2..779ce66f19 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -64,7 +64,9 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
 
     public AccordPartialCommand(Key key, Command command)
     {
-        this(command.txnId(), command.executeAt(), command.partialDeps().txnIds(key), command.status(), command.kind());
+        this(command.txnId(), command.executeAt(),
+             command.partialDeps() == null ? Collections.emptyList() : command.partialDeps().txnIds(key),
+             command.status(), command.kind());
     }
 
     public TxnId txnId()
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index 8b20347d77..64f6bdeab0 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -254,7 +254,10 @@ public class AsyncWriter
         }
 
         // There won't be a txn to denormalize against until the command has been preaccepted
-        if (command.known().hasTxn && AccordPartialCommand.serializer.needsUpdate(command))
+        // TODO (now): this maybe insufficient for correctness? on Accept we use the explicitly provided keys to register
+        //             the transaction here. It's possible a sequence of two Accept, with second taking a higher timestamp
+        //             might not reflect the update timestamp in the map? Probably best addressed following Blake's refactor.
+        if (command.known().isDefinitionKnown() && AccordPartialCommand.serializer.needsUpdate(command))
         {
             for (Key key : command.partialTxn().keys())
             {
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index c2d5d88928..a635ccc671 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -21,8 +21,6 @@ package org.apache.cassandra.service.accord.serializers;
 import java.io.IOException;
 
 import accord.messages.Accept;
-import accord.messages.Accept.AcceptNack;
-import accord.messages.Accept.AcceptOk;
 import accord.messages.Accept.AcceptReply;
 import accord.primitives.PartialRoute;
 import accord.primitives.TxnId;
@@ -108,15 +106,22 @@ public class AcceptSerializers
             {
                 default: throw new AssertionError();
                 case Success:
-                    out.writeByte(1);
-                    DepsSerializer.partialDeps.serialize(((AcceptOk)reply).deps, out, version);
+                    if (reply.deps != null)
+                    {
+                        out.writeByte(1);
+                        DepsSerializer.partialDeps.serialize(reply.deps, out, version);
+                    }
+                    else
+                    {
+                        out.writeByte(2);
+                    }
                     break;
                 case Redundant:
-                    out.writeByte(2);
+                    out.writeByte(3);
                     break;
                 case RejectedBallot:
-                    out.writeByte(3);
-                    CommandSerializers.ballot.serialize(((AcceptNack) reply).supersededBy, out, version);
+                    out.writeByte(4);
+                    CommandSerializers.ballot.serialize(reply.supersededBy, out, version);
             }
         }
 
@@ -128,11 +133,13 @@ public class AcceptSerializers
             {
                 default: throw new IllegalStateException("Unexpected AcceptNack type: " + type);
                 case 1:
-                    return new AcceptOk(DepsSerializer.partialDeps.deserialize(in, version));
+                    return new AcceptReply(DepsSerializer.partialDeps.deserialize(in, version));
                 case 2:
-                    return AcceptNack.REDUNDANT;
+                    return AcceptReply.ACCEPT_INVALIDATE;
                 case 3:
-                    return new AcceptNack(RejectedBallot, CommandSerializers.ballot.deserialize(in, version));
+                    return AcceptReply.REDUNDANT;
+                case 4:
+                    return new AcceptReply(CommandSerializers.ballot.deserialize(in, version));
             }
         }
 
@@ -144,12 +151,13 @@ public class AcceptSerializers
             {
                 default: throw new AssertionError();
                 case Success:
-                    size += DepsSerializer.partialDeps.serializedSize(((AcceptOk)reply).deps, version);
+                    if (reply.deps != null)
+                        size += DepsSerializer.partialDeps.serializedSize(reply.deps, version);
                     break;
                 case Redundant:
                     break;
                 case RejectedBallot:
-                    size += CommandSerializers.ballot.serializedSize(((AcceptNack) reply).supersededBy, version);
+                    size += CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
             }
             return size;
         }
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index d4ebe727f9..79d9b1b2b0 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -24,8 +24,6 @@ import accord.api.RoutingKey;
 import accord.local.SaveStatus;
 import accord.local.Status;
 import accord.messages.BeginInvalidation;
-import accord.messages.BeginInvalidation.InvalidateNack;
-import accord.messages.BeginInvalidation.InvalidateOk;
 import accord.messages.BeginInvalidation.InvalidateReply;
 import accord.primitives.AbstractRoute;
 import accord.primitives.Ballot;
@@ -46,7 +44,7 @@ public class BeginInvalidationSerializers
         public void serialize(BeginInvalidation begin, DataOutputPlus out, int version) throws IOException
         {
             CommandSerializers.txnId.serialize(begin.txnId, out, version);
-            KeySerializers.routingKey.serialize(begin.someKey, out, version);
+            KeySerializers.routingKeys.serialize(begin.someKeys, out, version);
             CommandSerializers.ballot.serialize(begin.ballot, out, version);
         }
 
@@ -54,7 +52,7 @@ public class BeginInvalidationSerializers
         public BeginInvalidation deserialize(DataInputPlus in, int version) throws IOException
         {
             return new BeginInvalidation(CommandSerializers.txnId.deserialize(in, version),
-                                       KeySerializers.routingKey.deserialize(in, version),
+                                       KeySerializers.routingKeys.deserialize(in, version),
                                        CommandSerializers.ballot.deserialize(in, version));
         }
 
@@ -62,80 +60,45 @@ public class BeginInvalidationSerializers
         public long serializedSize(BeginInvalidation begin, int version)
         {
             return CommandSerializers.txnId.serializedSize(begin.txnId, version)
-                   + KeySerializers.routingKey.serializedSize(begin.someKey, version)
+                   + KeySerializers.routingKeys.serializedSize(begin.someKeys, version)
                    + CommandSerializers.ballot.serializedSize(begin.ballot, version);
         }
     };
 
     public static final IVersionedSerializer<InvalidateReply> reply = new IVersionedSerializer<InvalidateReply>()
     {
-        void serializeOk(InvalidateOk ok, DataOutputPlus out, int version) throws IOException
-        {
-            CommandSerializers.saveStatus.serialize(ok.status, out, version);
-            serializeNullable(KeySerializers.abstractRoute, ok.route, out, version);
-            serializeNullable(KeySerializers.routingKey, ok.homeKey, out, version);
-        }
-
-        InvalidateOk deserializeOk(DataInputPlus in, int version) throws IOException
-        {
-            SaveStatus status = CommandSerializers.saveStatus.deserialize(in, version);
-            AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version);
-            RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
-            return new InvalidateOk(status, route, homeKey);
-        }
-
-        long serializedOkSize(InvalidateOk ok, int version)
-        {
-            return CommandSerializers.saveStatus.serializedSize(ok.status, version)
-                   + serializedSizeNullable(KeySerializers.abstractRoute, ok.route, version)
-                   + serializedSizeNullable(KeySerializers.routingKey, ok.homeKey, version);
-        }
-
-        void serializeNack(InvalidateNack nack, DataOutputPlus out, int version) throws IOException
-        {
-            CommandSerializers.ballot.serialize(nack.supersededBy, out, version);
-            serializeNullable(KeySerializers.routingKey, nack.homeKey, out, version);
-        }
-
-        InvalidateNack deserializeNack(DataInputPlus in, int version) throws IOException
-        {
-            Ballot supersededBy = CommandSerializers.ballot.deserialize(in, version);
-            RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
-            return new InvalidateNack(supersededBy, homeKey);
-        }
-
-        long serializedNackSize(InvalidateNack nack, int version)
-        {
-            return CommandSerializers.ballot.serializedSize(nack.supersededBy, version)
-                   + serializedSizeNullable(KeySerializers.routingKey, nack.homeKey, version);
-        }
-
         @Override
         public void serialize(InvalidateReply reply, DataOutputPlus out, int version) throws IOException
         {
-            out.writeBoolean(reply.isOk());
-            if (!reply.isOk())
-                serializeNack((InvalidateNack) reply, out, version);
-            else
-                serializeOk((InvalidateOk) reply, out, version);
+            serializeNullable(CommandSerializers.ballot, reply.supersededBy, out, version);
+            CommandSerializers.ballot.serialize(reply.accepted, out, version);
+            CommandSerializers.status.serialize(reply.status, out, version);
+            out.writeBoolean(reply.acceptedFastPath);
+            serializeNullable(KeySerializers.abstractRoute, reply.route, out, version);
+            serializeNullable(KeySerializers.routingKey, reply.homeKey, out, version);
         }
 
         @Override
         public InvalidateReply deserialize(DataInputPlus in, int version) throws IOException
         {
-            boolean isOk = in.readBoolean();
-            if (!isOk)
-                return deserializeNack(in, version);
-
-            return deserializeOk(in, version);
+            Ballot supersededBy = deserializeNullable(CommandSerializers.ballot, in, version);
+            Ballot accepted = CommandSerializers.ballot.deserialize(in, version);
+            Status status = CommandSerializers.status.deserialize(in, version);
+            boolean acceptedFastPath = in.readBoolean();
+            AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version);
+            RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
+            return new InvalidateReply(supersededBy, accepted, status, acceptedFastPath, route, homeKey);
         }
 
         @Override
         public long serializedSize(InvalidateReply reply, int version)
         {
-            return TypeSizes.sizeof(reply.isOk())
-                   + (reply.isOk() ? serializedOkSize((InvalidateOk) reply, version)
-                                   : serializedNackSize((InvalidateNack) reply, version));
+            return serializedSizeNullable(CommandSerializers.ballot, reply.supersededBy, version)
+                    + CommandSerializers.ballot.serializedSize(reply.accepted, version)
+                    + CommandSerializers.status.serializedSize(reply.status, version)
+                    + TypeSizes.BOOL_SIZE
+                    + serializedSizeNullable(KeySerializers.abstractRoute, reply.route, version)
+                    + serializedSizeNullable(KeySerializers.routingKey, reply.homeKey, version);
         }
     };
 }
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index e48cda6e84..54ac4736ac 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -130,8 +130,7 @@ public class AccordCommandTest
         commandStore.execute(accept, instance -> {
             Accept.AcceptReply reply = accept.apply(instance);
             Assert.assertTrue(reply.isOk());
-            Accept.AcceptOk ok = (Accept.AcceptOk) reply;
-            Assert.assertTrue(ok.deps.isEmpty());
+            Assert.assertTrue(reply.deps.isEmpty());
         }).get();
 
         commandStore.execute(accept, instance -> {
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 28c2508491..37d1b5f764 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -109,7 +109,7 @@ public class AsyncWriterTest
         blocking = AccordKeyspace.loadCommand(commandStore, blockingId);
         Assert.assertTrue(blocking.blockingApplyOn.getView().contains(waitingId));
 
-        // now change the blocking command and check it's changes are reflected in the waiting command
+        // now change the blocking command and check its changes are reflected in the waiting command
         context = new AsyncContext();
         blocking.setStatus(Status.ReadyToExecute);
         context.commands.add(blocking);
@@ -120,7 +120,7 @@ public class AsyncWriterTest
         execute(commandStore, () -> {
             AsyncContext ctx = new AsyncContext();
             commandStore.setContext(ctx);
-            TxnId blockingSummary = waitingFinal.firstWaitingOnApply();
+            TxnId blockingSummary = waitingFinal.firstWaitingOnApply(null);
             Assert.assertEquals(blockingId, blockingSummary);
             commandStore.unsetContext(ctx);
         });
@@ -231,7 +231,7 @@ public class AsyncWriterTest
         // remove listener from PartialCommand
         commandStore.execute(contextFor(waitingId), cs -> {
             Command waiting = cs.command(waitingId);
-            TxnId blocking = ((AccordCommand)waiting).firstWaitingOnApply();
+            TxnId blocking = ((AccordCommand)waiting).firstWaitingOnApply(null);
             Assert.assertNotNull(blocking);
             Assert.assertEquals(blockingId, blocking);
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org