You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/08/29 11:59:40 UTC
[ignite-3] branch main updated: IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506)
This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e26fc8016d IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506)
e26fc8016d is described below
commit e26fc8016ddd4ebb99596909157f2f0cdb925ebf
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Tue Aug 29 15:59:32 2023 +0400
IGNITE-20290 Command reordering wrt safe time in MetaStorage (#2506)
* No mechanism exists to forbid reordering of idle safe time propagation commands wrt other MetaStorage write commands, it's added in this commit
* Idle safe time commands are triggered too early, this is fixed
---
.../impl/ItMetaStorageManagerImplTest.java | 52 +++++++++++++++++++++-
.../ItMetaStorageMultipleNodesAbstractTest.java | 2 +-
...MetaStorageSafeTimePropagationAbstractTest.java | 13 ++++--
.../metastorage/command/SyncTimeCommand.java | 14 +-----
.../metastorage/impl/MetaStorageManagerImpl.java | 35 ++++++++++++---
.../metastorage/impl/MetaStorageServiceImpl.java | 2 +-
.../metastorage/server/KeyValueStorage.java | 7 +++
.../server/OnRevisionAppliedCallback.java | 15 +++++--
.../metastorage/server/WatchProcessor.java | 21 ++++++++-
.../server/persistence/RocksDbKeyValueStorage.java | 13 ++++++
.../server/raft/MetaStorageWriteHandler.java | 30 ++++++++-----
.../server/BasicOperationsKeyValueStorageTest.java | 28 ++++++++++--
.../metastorage/server/WatchProcessorTest.java | 10 ++---
.../server/SimpleInMemoryKeyValueStorage.java | 11 +++++
14 files changed, 202 insertions(+), 51 deletions(-)
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index d2dbc13721..091ed4c1fe 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -40,6 +41,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
@@ -57,6 +61,7 @@ import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -95,7 +100,7 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
void setUp(
TestInfo testInfo,
@InjectConfiguration RaftConfiguration raftConfiguration,
- @InjectConfiguration MetaStorageConfiguration metaStorageConfiguration
+ @InjectConfiguration("mock.idleSyncTimeInterval = 100") MetaStorageConfiguration metaStorageConfiguration
) {
var addr = new NetworkAddress("localhost", 10_000);
@@ -262,4 +267,49 @@ public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
assertThat(revisionCapture.getAllValues(), is(List.of(revision + 1)));
}
+
+ /**
+ * Tests that idle safe time propagation does not advance safe time while watches of a normal command are being executed.
+ */
+ @Test
+ void testIdleSafeTimePropagationAndNormalSafeTimePropagationInteraction(TestInfo testInfo) throws Exception {
+ var key = new ByteArray("foo");
+ byte[] value = "bar".getBytes(UTF_8);
+
+ AtomicBoolean watchCompleted = new AtomicBoolean(false);
+ CompletableFuture<HybridTimestamp> watchEventTsFuture = new CompletableFuture<>();
+
+ metaStorageManager.registerExactWatch(key, new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ watchEventTsFuture.complete(event.timestamp());
+
+ // The future will set the flag and complete after 300ms to allow idle safe time mechanism (which ticks each 100ms)
+ // to advance SafeTime (if there is still a bug for which this test is written).
+ return waitFor(300, TimeUnit.MILLISECONDS)
+ .whenComplete((res, ex) -> watchCompleted.set(true));
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ }
+ });
+
+ metaStorageManager.put(key, value);
+
+ ClusterTime clusterTime = metaStorageManager.clusterTime();
+
+ assertThat(watchEventTsFuture, willSucceedIn(5, TimeUnit.SECONDS));
+
+ HybridTimestamp watchEventTs = watchEventTsFuture.join();
+ assertThat(clusterTime.waitFor(watchEventTs), willCompleteSuccessfully());
+
+ assertThat("Safe time is advanced too early", watchCompleted.get(), is(true));
+ }
+
+ private static CompletableFuture<Void> waitFor(int timeout, TimeUnit unit) {
+ return new CompletableFuture<Void>()
+ .orTimeout(timeout, unit)
+ .exceptionally(ex -> null);
+ }
}
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index 1e039b2bf8..082c3b27f1 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -234,7 +234,7 @@ public abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
private final List<Node> nodes = new ArrayList<>();
- private Node startNode(TestInfo testInfo) throws NodeStoppingException {
+ private Node startNode(TestInfo testInfo) {
var nodeFinder = new StaticNodeFinder(List.of(new NetworkAddress("localhost", 10_000)));
ClusterService clusterService = ClusterServiceTestUtils.clusterService(testInfo, 10_000 + nodes.size(), nodeFinder);
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
index 47b8ad923a..1baafe9b59 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorageTest;
+import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.junit.jupiter.api.AfterEach;
@@ -45,10 +46,16 @@ public abstract class ItMetaStorageSafeTimePropagationAbstractTest extends Abstr
@BeforeEach
public void startWatches() {
- storage.startWatches(1, (e, t) -> {
- time.updateSafeTime(t);
+ storage.startWatches(1, new OnRevisionAppliedCallback() {
+ @Override
+ public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+ time.updateSafeTime(newSafeTime);
+ }
- return CompletableFuture.completedFuture(null);
+ @Override
+ public CompletableFuture<Void> onRevisionApplied(WatchEvent e) {
+ return CompletableFuture.completedFuture(null);
+ }
});
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
index 4032f0f8f2..fadd2deee6 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/command/SyncTimeCommand.java
@@ -17,25 +17,13 @@
package org.apache.ignite.internal.metastorage.command;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.network.annotations.Transferable;
/**
* Command that initiates idle safe time synchronization.
*/
@Transferable(MetastorageCommandsMessageGroup.SYNC_TIME)
-public interface SyncTimeCommand extends WriteCommand {
- /** New safe time. */
- long safeTimeLong();
-
+public interface SyncTimeCommand extends MetaStorageWriteCommand {
/** Term of the initiator. */
long initiatorTerm();
-
- /** New safe time. */
- default HybridTimestamp safeTime() {
- return hybridTimestamp(safeTimeLong());
- }
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 8d810e6044..b3c41997cf 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -449,7 +450,17 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
return recoveryFinishedFuture
.thenAccept(revision -> inBusyLock(busyLock, () -> {
// Meta Storage contract states that all updated entries under a particular revision must be stored in the Vault.
- storage.startWatches(revision + 1, this::onRevisionApplied);
+ storage.startWatches(revision + 1, new OnRevisionAppliedCallback() {
+ @Override
+ public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+ MetaStorageManagerImpl.this.onSafeTimeAdvanced(newSafeTime);
+ }
+
+ @Override
+ public CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent) {
+ return MetaStorageManagerImpl.this.onRevisionApplied(watchEvent);
+ }
+ });
}))
.whenComplete((v, e) -> {
if (e == null) {
@@ -817,20 +828,32 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
}
}
+ private void onSafeTimeAdvanced(HybridTimestamp time) {
+ assert time != null;
+
+ if (!busyLock.enterBusy()) {
+ LOG.info("Skipping advancing Safe Time because the node is stopping");
+
+ return;
+ }
+
+ try {
+ clusterTime.updateSafeTime(time);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/**
* Saves processed Meta Storage revision and corresponding entries to the Vault.
*/
- private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time) {
- assert time != null;
-
+ private CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent) {
if (!busyLock.enterBusy()) {
LOG.info("Skipping applying MetaStorage revision because the node is stopping");
return completedFuture(null);
}
- clusterTime.updateSafeTime(time);
-
try {
CompletableFuture<Void> saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
index 4f32decdff..5637cb274a 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java
@@ -279,7 +279,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
*/
public CompletableFuture<Void> syncTime(HybridTimestamp safeTime, long term) {
SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand()
- .safeTimeLong(safeTime.longValue())
+ .initiatorTimeLong(safeTime.longValue())
.initiatorTerm(term)
.build();
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 5b29bcab25..de625f477e 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -312,4 +312,11 @@ public interface KeyValueStorage extends ManuallyCloseable {
/** Explicitly notifies revision update listeners. */
CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision);
+
+ /**
+ * Advances MetaStorage Safe Time to a new value without creating a new revision.
+ *
+ * @param newSafeTime New Safe Time value.
+ */
+ void advanceSafeTime(HybridTimestamp newSafeTime);
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 93a2fc1c37..b514048950 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -22,16 +22,23 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.WatchEvent;
/**
- * Interface for declaring callbacks that get called after all Meta Storage watches have been notified of a particular revision.
+ * Interface for declaring callbacks that get called after all Meta Storage watches have been notified of a particular revision
+ * and/or when SafeTime gets advanced.
*/
-@FunctionalInterface
public interface OnRevisionAppliedCallback {
+ /**
+ * Invoked whenever MetaStorage Safe Time gets advanced (either because a write command is applied,
+ * together with all watches that process it, or because idle safe time mechanism advanced Safe Time).
+ *
+ * @param newSafeTime New safe time value.
+ */
+ void onSafeTimeAdvanced(HybridTimestamp newSafeTime);
+
/**
* Notifies of completion of processing of Meta Storage watches for a particular revision.
*
* @param watchEvent Event with modified Meta Storage entries processed by at least one Watch.
- * @param newSafeTime Safe time of the applied revision.
* @return Future that represents the state of the execution of the callback.
*/
- CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp newSafeTime);
+ CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index f334dff57b..5a58278f48 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -122,7 +122,7 @@ public class WatchProcessor implements ManuallyCloseable {
}
/**
- * Notifies registered watch about an update event.
+ * Notifies registered watches about an update event.
*/
public void notifyWatches(List<Entry> updatedEntries, HybridTimestamp time) {
assert time != null;
@@ -244,7 +244,9 @@ public class WatchProcessor implements ManuallyCloseable {
var event = new WatchEvent(acceptedEntries, revision, time);
- return revisionCallback.onRevisionApplied(event, time)
+ revisionCallback.onSafeTimeAdvanced(time);
+
+ return revisionCallback.onRevisionApplied(event)
.whenComplete((ignored, e) -> {
if (e != null) {
LOG.error("Error occurred when notifying watches", e);
@@ -257,6 +259,21 @@ public class WatchProcessor implements ManuallyCloseable {
}
}
+ /**
+ * Advances safe time without notifying watches (as there is no new revision).
+ */
+ public void advanceSafeTime(HybridTimestamp time) {
+ assert time != null;
+
+ notificationFuture = notificationFuture
+ .thenRunAsync(() -> revisionCallback.onSafeTimeAdvanced(time), watchExecutor)
+ .whenComplete((ignored, e) -> {
+ if (e != null) {
+ LOG.error("Error occurred when notifying safe time advanced callback", e);
+ }
+ });
+ }
+
@Override
public void close() {
notificationFuture.cancel(true);
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index abb2d85848..d691d73c14 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1664,4 +1664,17 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) {
return watchProcessor.notifyUpdateRevisionListeners(newRevision);
}
+
+ @Override
+ public void advanceSafeTime(HybridTimestamp newSafeTime) {
+ rwLock.writeLock().lock();
+
+ try {
+ if (recoveryStatus.get() == RecoveryStatus.DONE) {
+ watchProcessor.advanceSafeTime(newSafeTime);
+ }
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
index 8f836a1963..a8f98b58ba 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java
@@ -83,18 +83,18 @@ public class MetaStorageWriteHandler {
if (command instanceof MetaStorageWriteCommand) {
var cmdWithTime = (MetaStorageWriteCommand) command;
- HybridTimestamp safeTime = cmdWithTime.safeTime();
+ if (command instanceof SyncTimeCommand) {
+ var syncTimeCommand = (SyncTimeCommand) command;
- handleWriteWithTime(clo, cmdWithTime, safeTime);
- } else if (command instanceof SyncTimeCommand) {
- var syncTimeCommand = (SyncTimeCommand) command;
+ // Ignore the command if it has been sent by a stale leader.
+ if (clo.term() != syncTimeCommand.initiatorTerm()) {
+ clo.result(null);
- // Ignore the command if it has been sent by a stale leader.
- if (clo.term() == syncTimeCommand.initiatorTerm()) {
- clusterTime.updateSafeTime(syncTimeCommand.safeTime());
+ return;
+ }
}
- clo.result(null);
+ handleWriteWithTime(clo, cmdWithTime);
} else {
assert false : "Command was not found [cmd=" + command + ']';
}
@@ -118,9 +118,10 @@ public class MetaStorageWriteHandler {
*
* @param clo Command closure.
* @param command Command.
- * @param opTime Command's time.
*/
- private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command, HybridTimestamp opTime) {
+ private void handleWriteWithTime(CommandClosure<WriteCommand> clo, MetaStorageWriteCommand command) {
+ HybridTimestamp opTime = command.safeTime();
+
if (command instanceof PutCommand) {
PutCommand putCmd = (PutCommand) command;
@@ -177,6 +178,10 @@ public class MetaStorageWriteHandler {
MultiInvokeCommand cmd = (MultiInvokeCommand) command;
clo.result(storage.invoke(toIf(cmd.iif()), opTime));
+ } else if (command instanceof SyncTimeCommand) {
+ storage.advanceSafeTime(command.safeTime());
+
+ clo.result(null);
}
}
@@ -286,7 +291,10 @@ public class MetaStorageWriteHandler {
}
}
- void beforeApply(Command command) {
+ // TODO: IGNITE-20290 - This is insufficient, we must do this in single thread before saving the command to the RAFT log.
+ // Synchronized to make sure no reodering happens as RaftGroupListener#beforeApply() might be invoked in different threads
+ // for different commands.
+ synchronized void beforeApply(Command command) {
if (command instanceof MetaStorageWriteCommand) {
// Initiator sends us a timestamp to adjust to.
// Alter command by setting safe time based on the adjusted clock.
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
index 198ade0395..5486b2e81a 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/BasicOperationsKeyValueStorageTest.java
@@ -1969,7 +1969,17 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
long appliedRevision = storage.revision();
- storage.startWatches(1, (event, ts) -> completedFuture(null));
+ storage.startWatches(1, new OnRevisionAppliedCallback() {
+ @Override
+ public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+ // No-op.
+ }
+
+ @Override
+ public CompletableFuture<Void> onRevisionApplied(WatchEvent event) {
+ return completedFuture(null);
+ }
+ });
CompletableFuture<byte[]> fut = new CompletableFuture<>();
@@ -2308,7 +2318,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
OnRevisionAppliedCallback mockCallback = mock(OnRevisionAppliedCallback.class);
- when(mockCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null));
+ when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
storage.startWatches(1, mockCallback);
@@ -2320,7 +2330,7 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
verify(mockListener3, timeout(10_000)).onUpdate(any());
- verify(mockCallback, never()).onRevisionApplied(any(), any());
+ verify(mockCallback, never()).onRevisionApplied(any());
}
@Test
@@ -2505,7 +2515,17 @@ public abstract class BasicOperationsKeyValueStorageTest extends AbstractKeyValu
}
});
- storage.startWatches(1, (event, ts) -> completedFuture(null));
+ storage.startWatches(1, new OnRevisionAppliedCallback() {
+ @Override
+ public void onSafeTimeAdvanced(HybridTimestamp newSafeTime) {
+ // No-op.
+ }
+
+ @Override
+ public CompletableFuture<Void> onRevisionApplied(WatchEvent event) {
+ return completedFuture(null);
+ }
+ });
return resultFuture;
}
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index fcbf626497..745e0f2143 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -56,7 +56,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
@BeforeEach
void setUp() {
- when(revisionCallback.onRevisionApplied(any(), any())).thenReturn(completedFuture(null));
+ when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
watchProcessor.setRevisionCallback(revisionCallback);
}
@@ -90,7 +90,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
- verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture(), any());
+ verify(revisionCallback, timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
WatchEvent event = watchEventCaptor.getValue();
@@ -120,7 +120,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
verify(listener1, timeout(1_000)).onUpdate(event);
- verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
+ verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
ts = new HybridTimestamp(2, 3);
@@ -130,7 +130,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
verify(listener2, timeout(1_000)).onUpdate(event);
- verify(revisionCallback, timeout(1_000)).onRevisionApplied(event, ts);
+ verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
}
/**
@@ -156,7 +156,7 @@ public class WatchProcessorTest extends BaseIgniteAbstractTest {
verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new EntryEvent(oldEntry(entry2), entry2)));
verify(listener2, timeout(1_000)).onError(any(IllegalStateException.class));
- verify(revisionCallback, never()).onRevisionApplied(any(), any());
+ verify(revisionCallback, never()).onRevisionApplied(any());
}
/**
diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 6760682702..36980dc87e 100644
--- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -888,4 +888,15 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long newRevision) {
return watchProcessor.notifyUpdateRevisionListeners(newRevision);
}
+
+ @Override
+ public void advanceSafeTime(HybridTimestamp newSafeTime) {
+ synchronized (mux) {
+ if (!areWatchesEnabled) {
+ return;
+ }
+
+ watchProcessor.advanceSafeTime(newSafeTime);
+ }
+ }
}