You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/10/26 10:33:59 UTC

[ignite-3] branch main updated: IGNITE-20741 Fix incorrect timestamps when notifying watches on MetaStorage recovery (#2754)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b4971dade IGNITE-20741 Fix incorrect timestamps when notifying watches on MetaStorage recovery (#2754)
6b4971dade is described below

commit 6b4971dadef93ca40042899223c136af0489d37b
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Oct 26 14:33:53 2023 +0400

    IGNITE-20741 Fix incorrect timestamps when notifying watches on MetaStorage recovery (#2754)
---
 .../internal/catalog/CatalogManagerImpl.java       |  1 +
 .../testframework/BaseIgniteAbstractTest.java      |  2 +-
 .../metastorage/impl/ItMetaStorageWatchTest.java   | 99 +++++++++++++++++++++-
 .../server/persistence/RocksDbKeyValueStorage.java |  5 +-
 4 files changed, 102 insertions(+), 5 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 638d119ab7..87bfafb5f1 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -529,6 +529,7 @@ public class CatalogManagerImpl extends AbstractEventProducer<CatalogEvent, Cata
 
         return new Catalog(
                 update.version(),
+                // Remove this maxing when https://issues.apache.org/jira/browse/IGNITE-20499 is fixed and DelayDuration is truly constant.
                 Math.max(activationTimestamp, prevVersionActivationTimestamp),
                 catalog.objectIdGenState(),
                 catalog.zones(),
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
index 7ab1133e35..90e68ca971 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
@@ -39,7 +39,7 @@ import org.mockito.Mockito;
  * Ignite base test class.
  */
 @ExtendWith(SystemPropertiesExtension.class)
-@WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "25")
+@WithSystemProperty(key = IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY, value = "100")
 public abstract class BaseIgniteAbstractTest {
     /** Logger. */
     protected final IgniteLogger log = Loggers.forClass(getClass());
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index df7017cef2..e8e501ebc3 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -18,11 +18,15 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -31,12 +35,16 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.ClusterInitializer;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -52,9 +60,11 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
@@ -363,11 +373,11 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest {
                 public CompletableFuture<Void> onUpdate(WatchEvent event) {
                     List<String> keys = event.entryEvents().stream()
                             .map(e -> new String(e.newEntry().key(), StandardCharsets.UTF_8))
-                            .collect(Collectors.toList());
+                            .collect(toList());
 
                     List<String> values = event.entryEvents().stream()
                             .map(e -> new String(e.newEntry().value(), StandardCharsets.UTF_8))
-                            .collect(Collectors.toList());
+                            .collect(toList());
 
                     assertThat(keys, containsInAnyOrder("bar", "baz"));
                     assertThat(values, containsInAnyOrder("one", "two"));
@@ -408,4 +418,87 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest {
         assertTrue(exactLatch.await(10, TimeUnit.SECONDS));
         assertTrue(prefixLatch.await(10, TimeUnit.SECONDS));
     }
+
+    /**
+     * Tests that missed metastorage events are replayed with correct timestamps.
+     */
+    @Test
+    void updatesAreReplayedWithCorrectTimestamps() throws Exception {
+        int numNodes = 3;
+
+        startCluster(numNodes);
+
+        List<RevisionAndTimestamp> seenRevisionsAndTimestamps = new CopyOnWriteArrayList<>();
+
+        for (Node node : nodes) {
+            node.metaStorageManager.registerPrefixWatch(new ByteArray("prefix"), new WatchListener() {
+                @Override
+                public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                    seenRevisionsAndTimestamps.add(new RevisionAndTimestamp(event.revision(), event.timestamp()));
+
+                    return completedFuture(null);
+                }
+
+                @Override
+                public void onError(Throwable e) {
+                    fail();
+                }
+            });
+        }
+
+        MetaStorageManager metaStorageManager0 = nodes.get(0).metaStorageManager;
+
+        ByteArray key1 = new ByteArray("prefix.1");
+        ByteArray key2 = new ByteArray("prefix.2");
+
+        assertThat(metaStorageManager0.put(key1, new byte[0]), willCompleteSuccessfully());
+        assertThat(metaStorageManager0.put(key2, new byte[0]), willCompleteSuccessfully());
+
+        nodes.forEach(node -> assertThat("Watches were not deployed", node.metaStorageManager.deployWatches(), willCompleteSuccessfully()));
+
+        waitForCondition(() -> seenRevisionsAndTimestamps.size() == numNodes * 2, TimeUnit.SECONDS.toMillis(10));
+
+        // Each revision must be accompanied with the same timestamp on each node.
+        Set<RevisionAndTimestamp> revsAndTssSet = new HashSet<>(seenRevisionsAndTimestamps);
+        assertThat(revsAndTssSet, hasSize(2));
+
+        Map<Long, HybridTimestamp> revToTs = revsAndTssSet.stream()
+                .collect(toMap(rvAndTs -> rvAndTs.revision, rvAndTs -> rvAndTs.timestamp));
+
+        assertThat(revToTs.values().stream().distinct().count(), is(2L));
+
+        // Make sure that timestamps from WatchEvents are same as in the storage.
+        Entry entry1 = metaStorageManager0.getLocally(key1, Long.MAX_VALUE);
+        Entry entry2 = metaStorageManager0.getLocally(key2, Long.MAX_VALUE);
+
+        assertThat(revToTs.get(entry1.revision()), is(metaStorageManager0.timestampByRevision(entry1.revision())));
+        assertThat(revToTs.get(entry2.revision()), is(metaStorageManager0.timestampByRevision(entry2.revision())));
+    }
+
+    private static class RevisionAndTimestamp {
+        private final long revision;
+        private final HybridTimestamp timestamp;
+
+        private RevisionAndTimestamp(long revision, HybridTimestamp timestamp) {
+            this.revision = revision;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            RevisionAndTimestamp that = (RevisionAndTimestamp) o;
+            return revision == that.revision && Objects.equals(timestamp, that.timestamp);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(revision, timestamp);
+        }
+    }
 }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d691d73c14..e08fccc7f8 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1534,12 +1534,15 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
                         watchProcessor.notifyWatches(updatedEntriesCopy, ts);
 
                         updatedEntries.clear();
+
+                        ts = timestampByRevision(revision);
                     }
 
                     lastSeenRevision = revision;
                 }
 
                 if (ts == null) {
+                    // This will only execute on first iteration.
                     ts = timestampByRevision(revision);
                 }
 
@@ -1548,7 +1551,7 @@ public class RocksDbKeyValueStorage implements KeyValueStorage {
 
             RocksUtils.checkIterator(it);
 
-            // Notify about the events left after finishing the cycle above.
+            // Notify about the events left after finishing the loop above.
             if (!updatedEntries.isEmpty()) {
                 assert ts != null;