You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2022/12/01 22:15:01 UTC
[cassandra] branch cep-15-accord updated: Invalidation fixes/improvements - Integrate accord-core changes for CASSANDRA-18057
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new a5fd6b3bbb Invalidation fixes/improvements - Integrate accord-core changes for CASSANDRA-18057
a5fd6b3bbb is described below
commit a5fd6b3bbb83661c12e9c08a16ba3601e2302c70
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Mon Nov 21 12:22:38 2022 +0000
Invalidation fixes/improvements
- Integrate accord-core changes for CASSANDRA-18057
patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18057
---
.build/include-accord.sh | 4 +-
.../cassandra/service/accord/AccordCommand.java | 31 +++++++--
.../service/accord/AccordCommandStore.java | 6 ++
.../cassandra/service/accord/AccordKeyspace.java | 11 ++-
.../service/accord/AccordPartialCommand.java | 4 +-
.../service/accord/async/AsyncWriter.java | 5 +-
.../accord/serializers/AcceptSerializers.java | 32 +++++----
.../serializers/BeginInvalidationSerializers.java | 81 ++++++----------------
.../service/accord/AccordCommandTest.java | 3 +-
.../service/accord/async/AsyncWriterTest.java | 6 +-
10 files changed, 96 insertions(+), 87 deletions(-)
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index 0dc01bf2ba..ea1e0544df 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -24,8 +24,8 @@ set -o nounset
bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
-accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='trunk'
+accord_repo='https://github.com/belliottsmith/cassandra-accord.git'
+accord_branch='bugfix-invalidation'
accord_src="$bin/cassandra-accord"
checkout() {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index e7e526495f..45885f657d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -18,11 +18,14 @@
package org.apache.cassandra.service.accord;
+import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -115,6 +118,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
public final StoredValue<RoutingKey> homeKey;
public final StoredValue<RoutingKey> progressKey;
public final StoredValue<PartialTxn> partialTxn;
+ public final StoredValue<Txn.Kind> kind; // TODO: store this in TxnId
public final StoredValue<Ballot> promised;
public final StoredValue<Ballot> accepted;
public final StoredValue<Timestamp> executeAt;
@@ -141,6 +145,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
progressKey = new StoredValue<>(rw());
route = new StoredValue<>(rw());
partialTxn = new StoredValue<>(rw());
+ kind = new StoredValue<>(rw());
promised = new StoredValue<>(rw());
accepted = new StoredValue<>(rw());
executeAt = new StoredValue<>(rw());
@@ -235,6 +240,7 @@ public class AccordCommand extends Command implements AccordState<TxnId>
progressKey.set(null);
route.set(null);
partialTxn.set(null);
+ kind.set(null);
executeAt.load(null);
promised.set(Ballot.ZERO);
accepted.set(Ballot.ZERO);
@@ -525,7 +531,13 @@ public class AccordCommand extends Command implements AccordState<TxnId>
@Override
public Txn.Kind kind()
{
- return partialTxn.get().kind();
+ return kind.get();
+ }
+
+ @Override
+ public void setKind(Txn.Kind kind)
+ {
+ this.kind.set(kind);
}
@Override
@@ -762,7 +774,6 @@ public class AccordCommand extends Command implements AccordState<TxnId>
waitingOnCommit.blindAdd(txnId);
}
- @Override
public boolean isWaitingOnCommit()
{
return !waitingOnCommit.getView().isEmpty();
@@ -788,7 +799,6 @@ public class AccordCommand extends Command implements AccordState<TxnId>
waitingOnApply.blindPut(executeAt, txnId);
}
- @Override
public boolean isWaitingOnApply()
{
return !waitingOnApply.getView().isEmpty();
@@ -802,10 +812,21 @@ public class AccordCommand extends Command implements AccordState<TxnId>
}
@Override
- public TxnId firstWaitingOnApply()
+ public boolean isWaitingOnDependency()
+ {
+ return isWaitingOnCommit() || isWaitingOnApply();
+ }
+
+ @Override
+ public TxnId firstWaitingOnApply(@Nullable TxnId ifExecutesBefore)
{
if (!isWaitingOnApply())
return null;
- return waitingOnApply.getView().firstEntry().getValue();
+
+ Map.Entry<Timestamp, TxnId> first = waitingOnApply.getView().firstEntry();
+ if (ifExecutesBefore == null || first.getKey().compareTo(ifExecutesBefore) < 0)
+ return first.getValue();
+
+ return null;
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 12b84a40c4..c7348a149f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -325,6 +325,12 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore
return time.uniqueNow(max);
}
+ @Override
+ public NodeTimeService time()
+ {
+ return time;
+ }
+
public Timestamp maxConflict(Keys keys)
{
// TODO: efficiency
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index e17e7c01e8..8677dd8fea 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -46,6 +46,7 @@ import accord.primitives.Ballot;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Timestamp;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.DeterministicIdentitySet;
@@ -142,6 +143,7 @@ public class AccordKeyspace
+ "route blob,"
+ "durability int,"
+ "txn blob,"
+ + "kind int,"
+ format("execute_at %s,", TIMESTAMP_TUPLE)
+ format("promised_ballot %s,", TIMESTAMP_TUPLE)
+ format("accepted_ballot %s,", TIMESTAMP_TUPLE)
@@ -189,6 +191,7 @@ public class AccordKeyspace
static final ColumnMetadata route = getColumn(Commands, "route");
static final ColumnMetadata durability = getColumn(Commands, "durability");
static final ColumnMetadata txn = getColumn(Commands, "txn");
+ static final ColumnMetadata kind = getColumn(Commands, "kind");
static final ColumnMetadata execute_at = getColumn(Commands, "execute_at");
static final ColumnMetadata promised_ballot = getColumn(Commands, "promised_ballot");
static final ColumnMetadata accepted_ballot = getColumn(Commands, "accepted_ballot");
@@ -447,6 +450,9 @@ public class AccordKeyspace
if (command.partialTxn.hasModifications())
builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.partialTxn.get(), CommandsSerializers.partialTxn)));
+ if (command.kind.hasModifications())
+ builder.addCell(live(CommandsColumns.kind, timestampMicros, accessor.valueOf(command.kind.get().ordinal())));
+
if (command.executeAt.hasModifications())
builder.addCell(live(CommandsColumns.execute_at, timestampMicros, serializeTimestamp(command.executeAt.get())));
@@ -457,7 +463,7 @@ public class AccordKeyspace
builder.addCell(live(CommandsColumns.accepted_ballot, timestampMicros, serializeTimestamp(command.accepted.get())));
if (command.partialDeps.hasModifications())
- builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serialize(command.partialDeps.get(), CommandsSerializers.partialDeps)));
+ builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serializeOrNull(command.partialDeps.get(), CommandsSerializers.partialDeps)));
if (command.writes.hasModifications())
builder.addCell(live(CommandsColumns.writes, timestampMicros, serialize(command.writes.get(), CommandsSerializers.writes)));
@@ -593,10 +599,11 @@ public class AccordKeyspace
// TODO: something less brittle than ordinal, more efficient than values()
command.durability.load(Status.Durability.values()[row.getInt("durability", 0)]);
command.partialTxn.load(deserializeOrNull(row.getBlob("txn"), CommandsSerializers.partialTxn));
+ command.kind.load(row.has("kind") ? Txn.Kind.values()[row.getInt("kind")] : null);
command.executeAt.load(deserializeTimestampOrNull(row, "execute_at", Timestamp::new));
command.promised.load(deserializeTimestampOrNull(row, "promised_ballot", Ballot::new));
command.accepted.load(deserializeTimestampOrNull(row, "accepted_ballot", Ballot::new));
- command.partialDeps.load(deserializeWithVersionOr(row, "dependencies", CommandsSerializers.partialDeps, () -> PartialDeps.NONE));
+ command.partialDeps.load(deserializeOrNull(row.getBlob("dependencies"), CommandsSerializers.partialDeps));
command.writes.load(deserializeWithVersionOr(row, "writes", CommandsSerializers.writes, () -> null));
command.result.load(deserializeWithVersionOr(row, "result", CommandsSerializers.result, () -> null));
command.waitingOnCommit.load(deserializeTxnIdNavigableSet(row, "waiting_on_commit"));
diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index e0384489c2..779ce66f19 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -64,7 +64,9 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
public AccordPartialCommand(Key key, Command command)
{
- this(command.txnId(), command.executeAt(), command.partialDeps().txnIds(key), command.status(), command.kind());
+ this(command.txnId(), command.executeAt(),
+ command.partialDeps() == null ? Collections.emptyList() : command.partialDeps().txnIds(key),
+ command.status(), command.kind());
}
public TxnId txnId()
diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index 8b20347d77..64f6bdeab0 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -254,7 +254,10 @@ public class AsyncWriter
}
// There won't be a txn to denormalize against until the command has been preaccepted
- if (command.known().hasTxn && AccordPartialCommand.serializer.needsUpdate(command))
+ // TODO (now): this maybe insufficient for correctness? on Accept we use the explicitly provided keys to register
+ // the transaction here. It's possible a sequence of two Accept, with second taking a higher timestamp
+ // might not reflect the update timestamp in the map? Probably best addressed following Blake's refactor.
+ if (command.known().isDefinitionKnown() && AccordPartialCommand.serializer.needsUpdate(command))
{
for (Key key : command.partialTxn().keys())
{
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index c2d5d88928..a635ccc671 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -21,8 +21,6 @@ package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
import accord.messages.Accept;
-import accord.messages.Accept.AcceptNack;
-import accord.messages.Accept.AcceptOk;
import accord.messages.Accept.AcceptReply;
import accord.primitives.PartialRoute;
import accord.primitives.TxnId;
@@ -108,15 +106,22 @@ public class AcceptSerializers
{
default: throw new AssertionError();
case Success:
- out.writeByte(1);
- DepsSerializer.partialDeps.serialize(((AcceptOk)reply).deps, out, version);
+ if (reply.deps != null)
+ {
+ out.writeByte(1);
+ DepsSerializer.partialDeps.serialize(reply.deps, out, version);
+ }
+ else
+ {
+ out.writeByte(2);
+ }
break;
case Redundant:
- out.writeByte(2);
+ out.writeByte(3);
break;
case RejectedBallot:
- out.writeByte(3);
- CommandSerializers.ballot.serialize(((AcceptNack) reply).supersededBy, out, version);
+ out.writeByte(4);
+ CommandSerializers.ballot.serialize(reply.supersededBy, out, version);
}
}
@@ -128,11 +133,13 @@ public class AcceptSerializers
{
default: throw new IllegalStateException("Unexpected AcceptNack type: " + type);
case 1:
- return new AcceptOk(DepsSerializer.partialDeps.deserialize(in, version));
+ return new AcceptReply(DepsSerializer.partialDeps.deserialize(in, version));
case 2:
- return AcceptNack.REDUNDANT;
+ return AcceptReply.ACCEPT_INVALIDATE;
case 3:
- return new AcceptNack(RejectedBallot, CommandSerializers.ballot.deserialize(in, version));
+ return AcceptReply.REDUNDANT;
+ case 4:
+ return new AcceptReply(CommandSerializers.ballot.deserialize(in, version));
}
}
@@ -144,12 +151,13 @@ public class AcceptSerializers
{
default: throw new AssertionError();
case Success:
- size += DepsSerializer.partialDeps.serializedSize(((AcceptOk)reply).deps, version);
+ if (reply.deps != null)
+ size += DepsSerializer.partialDeps.serializedSize(reply.deps, version);
break;
case Redundant:
break;
case RejectedBallot:
- size += CommandSerializers.ballot.serializedSize(((AcceptNack) reply).supersededBy, version);
+ size += CommandSerializers.ballot.serializedSize(reply.supersededBy, version);
}
return size;
}
diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index d4ebe727f9..79d9b1b2b0 100644
--- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -24,8 +24,6 @@ import accord.api.RoutingKey;
import accord.local.SaveStatus;
import accord.local.Status;
import accord.messages.BeginInvalidation;
-import accord.messages.BeginInvalidation.InvalidateNack;
-import accord.messages.BeginInvalidation.InvalidateOk;
import accord.messages.BeginInvalidation.InvalidateReply;
import accord.primitives.AbstractRoute;
import accord.primitives.Ballot;
@@ -46,7 +44,7 @@ public class BeginInvalidationSerializers
public void serialize(BeginInvalidation begin, DataOutputPlus out, int version) throws IOException
{
CommandSerializers.txnId.serialize(begin.txnId, out, version);
- KeySerializers.routingKey.serialize(begin.someKey, out, version);
+ KeySerializers.routingKeys.serialize(begin.someKeys, out, version);
CommandSerializers.ballot.serialize(begin.ballot, out, version);
}
@@ -54,7 +52,7 @@ public class BeginInvalidationSerializers
public BeginInvalidation deserialize(DataInputPlus in, int version) throws IOException
{
return new BeginInvalidation(CommandSerializers.txnId.deserialize(in, version),
- KeySerializers.routingKey.deserialize(in, version),
+ KeySerializers.routingKeys.deserialize(in, version),
CommandSerializers.ballot.deserialize(in, version));
}
@@ -62,80 +60,45 @@ public class BeginInvalidationSerializers
public long serializedSize(BeginInvalidation begin, int version)
{
return CommandSerializers.txnId.serializedSize(begin.txnId, version)
- + KeySerializers.routingKey.serializedSize(begin.someKey, version)
+ + KeySerializers.routingKeys.serializedSize(begin.someKeys, version)
+ CommandSerializers.ballot.serializedSize(begin.ballot, version);
}
};
public static final IVersionedSerializer<InvalidateReply> reply = new IVersionedSerializer<InvalidateReply>()
{
- void serializeOk(InvalidateOk ok, DataOutputPlus out, int version) throws IOException
- {
- CommandSerializers.saveStatus.serialize(ok.status, out, version);
- serializeNullable(KeySerializers.abstractRoute, ok.route, out, version);
- serializeNullable(KeySerializers.routingKey, ok.homeKey, out, version);
- }
-
- InvalidateOk deserializeOk(DataInputPlus in, int version) throws IOException
- {
- SaveStatus status = CommandSerializers.saveStatus.deserialize(in, version);
- AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version);
- RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
- return new InvalidateOk(status, route, homeKey);
- }
-
- long serializedOkSize(InvalidateOk ok, int version)
- {
- return CommandSerializers.saveStatus.serializedSize(ok.status, version)
- + serializedSizeNullable(KeySerializers.abstractRoute, ok.route, version)
- + serializedSizeNullable(KeySerializers.routingKey, ok.homeKey, version);
- }
-
- void serializeNack(InvalidateNack nack, DataOutputPlus out, int version) throws IOException
- {
- CommandSerializers.ballot.serialize(nack.supersededBy, out, version);
- serializeNullable(KeySerializers.routingKey, nack.homeKey, out, version);
- }
-
- InvalidateNack deserializeNack(DataInputPlus in, int version) throws IOException
- {
- Ballot supersededBy = CommandSerializers.ballot.deserialize(in, version);
- RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
- return new InvalidateNack(supersededBy, homeKey);
- }
-
- long serializedNackSize(InvalidateNack nack, int version)
- {
- return CommandSerializers.ballot.serializedSize(nack.supersededBy, version)
- + serializedSizeNullable(KeySerializers.routingKey, nack.homeKey, version);
- }
-
@Override
public void serialize(InvalidateReply reply, DataOutputPlus out, int version) throws IOException
{
- out.writeBoolean(reply.isOk());
- if (!reply.isOk())
- serializeNack((InvalidateNack) reply, out, version);
- else
- serializeOk((InvalidateOk) reply, out, version);
+ serializeNullable(CommandSerializers.ballot, reply.supersededBy, out, version);
+ CommandSerializers.ballot.serialize(reply.accepted, out, version);
+ CommandSerializers.status.serialize(reply.status, out, version);
+ out.writeBoolean(reply.acceptedFastPath);
+ serializeNullable(KeySerializers.abstractRoute, reply.route, out, version);
+ serializeNullable(KeySerializers.routingKey, reply.homeKey, out, version);
}
@Override
public InvalidateReply deserialize(DataInputPlus in, int version) throws IOException
{
- boolean isOk = in.readBoolean();
- if (!isOk)
- return deserializeNack(in, version);
-
- return deserializeOk(in, version);
+ Ballot supersededBy = deserializeNullable(CommandSerializers.ballot, in, version);
+ Ballot accepted = CommandSerializers.ballot.deserialize(in, version);
+ Status status = CommandSerializers.status.deserialize(in, version);
+ boolean acceptedFastPath = in.readBoolean();
+ AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version);
+ RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version);
+ return new InvalidateReply(supersededBy, accepted, status, acceptedFastPath, route, homeKey);
}
@Override
public long serializedSize(InvalidateReply reply, int version)
{
- return TypeSizes.sizeof(reply.isOk())
- + (reply.isOk() ? serializedOkSize((InvalidateOk) reply, version)
- : serializedNackSize((InvalidateNack) reply, version));
+ return serializedSizeNullable(CommandSerializers.ballot, reply.supersededBy, version)
+ + CommandSerializers.ballot.serializedSize(reply.accepted, version)
+ + CommandSerializers.status.serializedSize(reply.status, version)
+ + TypeSizes.BOOL_SIZE
+ + serializedSizeNullable(KeySerializers.abstractRoute, reply.route, version)
+ + serializedSizeNullable(KeySerializers.routingKey, reply.homeKey, version);
}
};
}
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index e48cda6e84..54ac4736ac 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -130,8 +130,7 @@ public class AccordCommandTest
commandStore.execute(accept, instance -> {
Accept.AcceptReply reply = accept.apply(instance);
Assert.assertTrue(reply.isOk());
- Accept.AcceptOk ok = (Accept.AcceptOk) reply;
- Assert.assertTrue(ok.deps.isEmpty());
+ Assert.assertTrue(reply.deps.isEmpty());
}).get();
commandStore.execute(accept, instance -> {
diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 28c2508491..37d1b5f764 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -109,7 +109,7 @@ public class AsyncWriterTest
blocking = AccordKeyspace.loadCommand(commandStore, blockingId);
Assert.assertTrue(blocking.blockingApplyOn.getView().contains(waitingId));
- // now change the blocking command and check it's changes are reflected in the waiting command
+ // now change the blocking command and check its changes are reflected in the waiting command
context = new AsyncContext();
blocking.setStatus(Status.ReadyToExecute);
context.commands.add(blocking);
@@ -120,7 +120,7 @@ public class AsyncWriterTest
execute(commandStore, () -> {
AsyncContext ctx = new AsyncContext();
commandStore.setContext(ctx);
- TxnId blockingSummary = waitingFinal.firstWaitingOnApply();
+ TxnId blockingSummary = waitingFinal.firstWaitingOnApply(null);
Assert.assertEquals(blockingId, blockingSummary);
commandStore.unsetContext(ctx);
});
@@ -231,7 +231,7 @@ public class AsyncWriterTest
// remove listener from PartialCommand
commandStore.execute(contextFor(waitingId), cs -> {
Command waiting = cs.command(waitingId);
- TxnId blocking = ((AccordCommand)waiting).firstWaitingOnApply();
+ TxnId blocking = ((AccordCommand)waiting).firstWaitingOnApply(null);
Assert.assertNotNull(blocking);
Assert.assertEquals(blockingId, blocking);
});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org