You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/10/26 10:33:59 UTC
[ignite-3] branch main updated: IGNITE-20741 Fix incorrect timestamps when notifying watches on MetaStorage recovery (#2754)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6b4971dade IGNITE-20741 Fix incorrect timestamps when notifying watches on MetaStorage recovery (#2754)
6b4971dade is described below
commit 6b4971dadef93ca40042899223c136af0489d37b
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Oct 26 14:33:53 2023 +0400
IGNITE-20741 Fix incorrect timestamps when notifying watches on MetaStorage recovery (#2754)
---
.../internal/catalog/CatalogManagerImpl.java | 1 +
.../testframework/BaseIgniteAbstractTest.java | 2 +-
.../metastorage/impl/ItMetaStorageWatchTest.java | 99 +++++++++++++++++++++-
.../server/persistence/RocksDbKeyValueStorage.java | 5 +-
4 files changed, 102 insertions(+), 5 deletions(-)
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 638d119ab7..87bfafb5f1 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -529,6 +529,7 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata
return new Catalog(
update.version(),
+ // Remove this maxing when https://issues.apache.org/jira/browse/IGNITE-20499 is fixed and DelayDuration is truly constant.
Math.max(activationTimestamp, prevVersionActivationTimestamp),
catalog.objectIdGenState(),
catalog.zones(),
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
index 7ab1133e35..90e68ca971 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
@@ -39,7 +39,7 @@ import org.mockito.Mockito;
* Ignite base test class.
*/
@ExtendWith(SystemPropertiesExtension.class)
-@WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "25")
+@WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "100")
public abstract class BaseIgniteAbstractTest {
/** Logger. */
protected final IgniteLogger log = Loggers.forClass(getClass());
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index df7017cef2..e8e501ebc3 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -18,11 +18,15 @@
package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -31,12 +35,16 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -52,9 +60,11 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
@@ -363,11 +373,11 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest {
public CompletableFuture<Void> onUpdate(WatchEvent event) {
List<String> keys = event.entryEvents().stream()
.map(e -> new String(e.newEntry().key(), StandardCharsets.UTF_8))
- .collect(Collectors.toList());
+ .collect(toList());
List<String> values = event.entryEvents().stream()
.map(e -> new String(e.newEntry().value(), StandardCharsets.UTF_8))
- .collect(Collectors.toList());
+ .collect(toList());
assertThat(keys, containsInAnyOrder("bar", "baz"));
assertThat(values, containsInAnyOrder("one", "two"));
@@ -408,4 +418,87 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest {
assertTrue(exactLatch.await(10, TimeUnit.SECONDS));
assertTrue(prefixLatch.await(10, TimeUnit.SECONDS));
}
+
+ /**
+ * Tests that missed metastorage events are replayed with correct timestamps.
+ */
+ @Test
+ void updatesAreReplayedWithCorrectTimestamps() throws Exception {
+ int numNodes = 3;
+
+ startCluster(numNodes);
+
+ List<RevisionAndTimestamp> seenRevisionsAndTimestamps = new CopyOnWriteArrayList<>();
+
+ for (Node node : nodes) {
+ node.metaStorageManager.registerPrefixWatch(new ByteArray("prefix"), new WatchListener() {
+ @Override
+ public CompletableFuture<Void> onUpdate(WatchEvent event) {
+ seenRevisionsAndTimestamps.add(new RevisionAndTimestamp(event.revision(), event.timestamp()));
+
+ return completedFuture(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ fail();
+ }
+ });
+ }
+
+ MetaStorageManager metaStorageManager0 = nodes.get(0).metaStorageManager;
+
+ ByteArray key1 = new ByteArray("prefix.1");
+ ByteArray key2 = new ByteArray("prefix.2");
+
+ assertThat(metaStorageManager0.put(key1, new byte[0]), willCompleteSuccessfully());
+ assertThat(metaStorageManager0.put(key2, new byte[0]), willCompleteSuccessfully());
+
+ nodes.forEach(node -> assertThat("Watches were not deployed", node.metaStorageManager.deployWatches(), willCompleteSuccessfully()));
+
+ waitForCondition(() -> seenRevisionsAndTimestamps.size() == numNodes * 2, TimeUnit.SECONDS.toMillis(10));
+
+ // Each revision must be accompanied with the same timestamp on each node.
+ Set<RevisionAndTimestamp> revsAndTssSet = new HashSet<>(seenRevisionsAndTimestamps);
+ assertThat(revsAndTssSet, hasSize(2));
+
+ Map<Long, HybridTimestamp> revToTs = revsAndTssSet.stream()
+ .collect(toMap(rvAndTs -> rvAndTs.revision, rvAndTs -> rvAndTs.timestamp));
+
+ assertThat(revToTs.values().stream().distinct().count(), is(2L));
+
+ // Make sure that timestamps from WatchEvents are same as in the storage.
+ Entry entry1 = metaStorageManager0.getLocally(key1, Long.MAX_VALUE);
+ Entry entry2 = metaStorageManager0.getLocally(key2, Long.MAX_VALUE);
+
+ assertThat(revToTs.get(entry1.revision()), is(metaStorageManager0.timestampByRevision(entry1.revision())));
+ assertThat(revToTs.get(entry2.revision()), is(metaStorageManager0.timestampByRevision(entry2.revision())));
+ }
+
+ private static class RevisionAndTimestamp {
+ private final long revision;
+ private final HybridTimestamp timestamp;
+
+ private RevisionAndTimestamp(long revision, HybridTimestamp timestamp) {
+ this.revision = revision;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RevisionAndTimestamp that = (RevisionAndTimestamp) o;
+ return revision == that.revision && Objects.equals(timestamp, that.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(revision, timestamp);
+ }
+ }
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d691d73c14..e08fccc7f8 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1534,12 +1534,15 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
watchProcessor.notifyWatches(updatedEntriesCopy, ts);
updatedEntries.clear();
+
+ ts = timestampByRevision(revision);
}
lastSeenRevision = revision;
}
if (ts == null) {
+ // This will only execute on first iteration.
ts = timestampByRevision(revision);
}
@@ -1548,7 +1551,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
RocksUtils.checkIterator(it);
- // Notify about the events left after finishing the cycle above.
+ // Notify about the events left after finishing the loop above.
if (!updatedEntries.isEmpty()) {
assert ts != null;