You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/09/11 14:18:18 UTC

[ignite-3] branch main updated: IGNITE-20391 Return future metastore event processing from UpdateListener#onUpdate (#2570)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 407419d900 IGNITE-20391 Return future metastore event processing from UpdateListener#onUpdate (#2570)
407419d900 is described below

commit 407419d900014d272df0755884d6b0c884975b9f
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Sep 11 17:18:12 2023 +0300

    IGNITE-20391 Return future metastore event processing from UpdateListener#onUpdate (#2570)
---
 .../internal/catalog/CatalogManagerImpl.java       |  5 +--
 .../ignite/internal/catalog/storage/UpdateLog.java |  8 ++---
 .../internal/catalog/storage/UpdateLogImpl.java    | 24 ++++++++++----
 .../catalog/storage/UpdateLogImplTest.java         | 37 ++++++++++++++++++++--
 4 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index ad709ccf39..eaf0cf8df9 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.catalog;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateAlterZoneParams;
@@ -511,7 +512,7 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam
 
     class OnUpdateHandlerImpl implements OnUpdateHandler {
         @Override
-        public void handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
+        public CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken) {
             int version = update.version();
             Catalog catalog = catalogByVer.get(version - 1);
 
@@ -538,7 +539,7 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam
                 }
             }
 
-            CompletableFuture.allOf(eventFutures.toArray(CompletableFuture[]::new))
+            return allOf(eventFutures.toArray(CompletableFuture[]::new))
                     .whenComplete((ignore, err) -> {
                         if (err != null) {
                             LOG.warn("Failed to apply catalog update.", err);
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index eb17046103..c2d24086e3 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -58,10 +58,7 @@ public interface UpdateLog extends IgniteComponent {
      */
     @Override void start() throws IgniteInternalException;
 
-    /**
-     * An interface describing a handler that will receive notification
-     * when a new update is added to the log.
-     */
+    /** An interface describing a handler that will receive notification when a new update is added to the log. */
     @FunctionalInterface
     interface OnUpdateHandler {
         /**
@@ -70,7 +67,8 @@ public interface UpdateLog extends IgniteComponent {
          * @param update A new update.
          * @param metaStorageUpdateTimestamp Timestamp assigned to the update by the Metastorage.
          * @param causalityToken Causality token.
+         * @return Handler future.
          */
-        void handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken);
+        CompletableFuture<Void> handle(VersionedUpdate update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken);
     }
 }
diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 4d0e00198c..062e5f07f4 100644
--- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
@@ -27,9 +28,13 @@ import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.EntryEvent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -52,12 +57,16 @@ import org.jetbrains.annotations.Nullable;
  * Metastore-based implementation of UpdateLog.
  */
 public class UpdateLogImpl implements UpdateLog {
+    private static final IgniteLogger LOG = Loggers.forClass(UpdateLogImpl.class);
+
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
     private final MetaStorageManager metastore;
 
     private volatile OnUpdateHandler onUpdateHandler;
+
     private volatile @Nullable UpdateListener listener;
 
     /**
@@ -197,25 +206,26 @@ public class UpdateLogImpl implements UpdateLog {
 
         @Override
         public CompletableFuture<Void> onUpdate(WatchEvent event) {
-            for (EntryEvent eventEntry : event.entryEvents()) {
-                assert eventEntry.newEntry() != null;
-                assert !eventEntry.newEntry().empty();
+            Collection<EntryEvent> entryEvents = event.entryEvents();
+
+            var handleFutures = new ArrayList<CompletableFuture<Void>>(entryEvents.size());
 
+            for (EntryEvent eventEntry : entryEvents) {
                 byte[] payload = eventEntry.newEntry().value();
 
-                assert payload != null;
+                assert payload != null : eventEntry;
 
                 VersionedUpdate update = fromBytes(payload);
 
-                onUpdateHandler.handle(update, event.timestamp(), event.revision());
+                handleFutures.add(onUpdateHandler.handle(update, event.timestamp(), event.revision()));
             }
 
-            return CompletableFuture.completedFuture(null);
+            return allOf(handleFutures.toArray(CompletableFuture[]::new));
         }
 
         @Override
         public void onError(Throwable e) {
-            assert false;
+            LOG.warn("Unable to process catalog event", e);
         }
     }
 }
diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 27c1b524c2..414974e159 100644
--- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -17,18 +17,21 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
@@ -49,7 +52,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 /** Tests to verify {@link UpdateLogImpl}. */
-@SuppressWarnings("ConstantConditions")
 class UpdateLogImplTest extends BaseIgniteAbstractTest {
     private KeyValueStorage keyValueStorage;
 
@@ -66,6 +68,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
         metastore = StandaloneMetaStorageManager.create(vault, keyValueStorage);
 
         vault.start();
+        keyValueStorage.start();
         metastore.start();
     }
 
@@ -73,6 +76,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
     public void tearDown() throws Exception {
         IgniteUtils.closeAll(
                 metastore == null ? null : metastore::stop,
+                keyValueStorage == null ? null : keyValueStorage::close,
                 vault == null ? null : vault::stop
         );
     }
@@ -80,7 +84,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
     @Test
     void logReplayedOnStart() throws Exception {
         // First, let's append a few entries to the update log.
-        UpdateLogImpl updateLogImpl = createAndStartUpdateLogImpl((update, ts, causalityToken) -> {/* no-op */});
+        UpdateLogImpl updateLogImpl = createAndStartUpdateLogImpl((update, ts, causalityToken) -> completedFuture(null));
 
         assertThat(metastore.deployWatches(), willCompleteSuccessfully());
 
@@ -95,7 +99,11 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
 
         var actualUpdates = new ArrayList<VersionedUpdate>();
 
-        createAndStartUpdateLogImpl((update, ts, causalityToken) -> actualUpdates.add(update));
+        createAndStartUpdateLogImpl((update, ts, causalityToken) -> {
+            actualUpdates.add(update);
+
+            return completedFuture(null);
+        });
 
         // Let's check that we have recovered to the latest version.
         assertThat(actualUpdates, equalTo(expectedUpdates));
@@ -162,6 +170,8 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
         updateLog.registerUpdateHandler((update, ts, causalityToken) -> {
             appliedVersions.add(update.version());
             causalityTokens.add(causalityToken);
+
+            return completedFuture(null);
         });
 
         long revisionBefore = metastore.appliedRevision();
@@ -203,6 +213,27 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
         assertThat(causalityTokens, equalTo(expectedTokens));
     }
 
+    @Test
+    void testUpdateMetastoreRevisionAfterUpdateHandlerComplete() throws Exception {
+        CompletableFuture<Void> onUpdateHandlerFuture = new CompletableFuture<>();
+
+        UpdateLog updateLog = createAndStartUpdateLogImpl((update, metaStorageUpdateTimestamp, causalityToken) -> onUpdateHandlerFuture);
+
+        assertThat(metastore.deployWatches(), willCompleteSuccessfully());
+
+        long metastoreRevision = metastore.appliedRevision();
+
+        assertThat(updateLog.append(singleEntryUpdateOfVersion(1)), willCompleteSuccessfully());
+
+        // Let's make sure that the metastore revision will not increase until onUpdateHandlerFuture is completed.
+        assertFalse(waitForCondition(() -> metastore.appliedRevision() > metastoreRevision, 200));
+
+        // Let's make sure that the metastore revision increases after completing onUpdateHandlerFuture.
+        onUpdateHandlerFuture.complete(null);
+
+        assertTrue(waitForCondition(() -> metastore.appliedRevision() > metastoreRevision, 200));
+    }
+
     private static VersionedUpdate singleEntryUpdateOfVersion(int version) {
         return new VersionedUpdate(version, 1, List.of(new TestUpdateEntry("foo_" + version)));
     }