You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "sashapolo (via GitHub)" <gi...@apache.org> on 2023/05/24 13:41:51 UTC

[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2098: IGNITE-19532 Happens-before for safe time propagation

sashapolo commented on code in PR #2098:
URL: https://github.com/apache/ignite-3/pull/2098#discussion_r1204110681


##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java:
##########
@@ -31,5 +32,5 @@ public interface OnRevisionAppliedCallback {
      * @param watchEvent Event with modified Meta Storage entries processed at least one Watch.
      * @return Future that represents the state of the execution of the callback.
      */
-    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
+    CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent, HybridTimestamp time);

Review Comment:
   Please update the javadoc



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1329,7 +1345,7 @@ private void queueWatchEvent() {
                     eventCache = new ArrayList<>();
                 }
 
-                eventCache.add(List.copyOf(updatedEntries));
+                eventCache.add(updatedEntries.copy());

Review Comment:
   Since we now have a separate class, we can unify `copy` and `clear` into a single call (e.g. `transfer`). I believe that they are always used together



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Safe time propagation tests. */
+public abstract class ItMetaStorageSafeTimePropagationAbstractTest {
+
+    private KeyValueStorage storage;
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final ClusterTimeImpl time = new ClusterTimeImpl(new IgniteSpinBusyLock(), clock);
+
+    @BeforeEach
+    public void setUp() {
+        storage = createStorage();
+
+        storage.start();
+
+        storage.startWatches((e, t) -> {
+            time.updateSafeTime(t);
+
+            return CompletableFuture.completedFuture(null);
+        });
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        storage.close();
+    }
+
+    @Test
+    public void testTimePropagated() throws Exception {
+        CompletableFuture<Void> f = new CompletableFuture<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        storage.watchExact(key(0), 1, new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                latch.countDown();

Review Comment:
   Why do we need both a latch and a future here? I think a single future is enough, you can simply complete it here



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -64,6 +64,9 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     /** Timestamp to revision mapping. */
     private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
 
+    /** Revision to timestamp mapping. */
+    private final NavigableMap<Long, Long> revToTsMap = new TreeMap<>();

Review Comment:
   Also, why doesn't it store `HybridTimestamp`s directly?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageWriteHandler.java:
##########
@@ -88,13 +88,9 @@ void handleWriteCommand(CommandClosure<WriteCommand> clo) {
                 safeTime = cmdWithTime.safeTime();
 
                 handleWriteWithTime(clo, cmdWithTime, safeTime);
-
-                // Every MetaStorageWriteCommand holds safe time that we should set as the cluster time.
-                clusterTime.updateSafeTime(safeTime);
             } else if (command instanceof SyncTimeCommand) {
-                clusterTime.updateSafeTime(((SyncTimeCommand) command).safeTime());
-
-                clo.result(null);
+                // TODO: IGNITE-19199 WatchProcessor must be notified of the new safe time.
+                throw new UnsupportedOperationException("https://issues.apache.org/jira/browse/IGNITE-19199");

Review Comment:
   So, `SyncTimeCommand` is not used anywhere right now, correct?



##########
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java:
##########
@@ -82,7 +82,12 @@ public HybridTimestamp(long physical, int logical) {
         }
     }
 
-    private HybridTimestamp(long time) {
+    /**
+     * The constructor.
+     *
+     * @param time Long time value.
+     */
+    public HybridTimestamp(long time) {

Review Comment:
   What is this change for?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java:
##########
@@ -31,7 +31,9 @@ enum StorageColumnFamilyType {
     INDEX("INDEX".getBytes(StandardCharsets.UTF_8)),
 
     /** Column family for the timestamp to revision mapping. */
-    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8));
+    TS_TO_REVISION("TSTOREV".getBytes(StandardCharsets.UTF_8)),
+
+    REVISION_TO_TS("REVTOTTS".getBytes(StandardCharsets.UTF_8));

Review Comment:
   Shouldn't it be `REVTOTS`?



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java:
##########
@@ -365,12 +356,31 @@ void testSafeTimePropagation(boolean useFollower, TestInfo testInfo) throws Exce
 
         assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
 
+        CompletableFuture<Void> f = new CompletableFuture<>();
+        CountDownLatch l = new CountDownLatch(1);
+
+        secondNode.metaStorageManager.registerExactWatch(ByteArray.fromString("test-key"), new WatchListener() {
+            @Override
+            public CompletableFuture<Void> onUpdate(WatchEvent event) {
+                l.countDown();
+                return f;
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                // No-op.
+            }
+        });
+
         // Try putting data from both nodes, because any of them can be a leader.
         assertThat(
                 firstNode.metaStorageManager.put(ByteArray.fromString("test-key"), new byte[]{0, 1, 2, 3}),
                 willCompleteSuccessfully()
         );
 
+        assertTrue(l.await(1, TimeUnit.SECONDS));

Review Comment:
   Please add a comment about why these lines are required



##########
modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java:
##########
@@ -64,6 +64,9 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     /** Timestamp to revision mapping. */
     private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
 
+    /** Revision to timestamp mapping. */
+    private final NavigableMap<Long, Long> revToTsMap = new TreeMap<>();

Review Comment:
   Why does it need to be a TreeMap?



##########
modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationAbstractTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Safe time propagation tests. */
+public abstract class ItMetaStorageSafeTimePropagationAbstractTest {

Review Comment:
   Why don't we reuse `AbstractKeyValueStorageTest` here?



##########
modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java:
##########
@@ -1379,36 +1400,60 @@ private void replayUpdates(long upperRevision) {
                     if (!updatedEntries.isEmpty()) {
                         var updatedEntriesCopy = List.copyOf(updatedEntries);
 
-                        watchProcessor.notifyWatches(updatedEntriesCopy);
+                        assert ts != null;
+
+                        watchProcessor.notifyWatches(updatedEntriesCopy, ts);
 
                         updatedEntries.clear();
                     }
 
                     lastSeenRevision = revision;
                 }
 
+                if (ts == null) {
+                    ts = timestampByRevision(revision);
+                }
+
                 updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision, bytesToValue(rocksValue)));
             }
 
             RocksUtils.checkIterator(it);
 
             // Notify about the events left after finishing the cycle above.
             if (!updatedEntries.isEmpty()) {
-                watchProcessor.notifyWatches(updatedEntries);
+                assert ts != null;
+
+                watchProcessor.notifyWatches(updatedEntries, ts);
             }
         }
 
         finishReplay();
     }
 
+    private HybridTimestamp timestampByRevision(long revision) {
+        try {
+            byte[] tsBytes = revisionToTs.get(longToBytes(revision));
+
+            assert tsBytes != null;
+
+            return new HybridTimestamp(bytesToLong(tsBytes));
+        } catch (RocksDBException e) {
+            throw new RuntimeException(e);

Review Comment:
   Shouldn't it be a `StorageException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org