You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/20 09:01:56 UTC

[ignite-3] branch main updated: IGNITE-17555 Fix AssertionError and NPE in configuration (#1023)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9767027226 IGNITE-17555 Fix AssertionError and NPE in configuration (#1023)
9767027226 is described below

commit 976702722640747f47d4f7583b0e056efd0e7163
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Sat Aug 20 13:01:52 2022 +0400

    IGNITE-17555 Fix AssertionError and NPE in configuration (#1023)
---
 .../configuration/ConfigurationChanger.java        |  28 ++-
 .../notifications/ConfigurationNotifier.java       |   7 +-
 .../storage/ConfigurationStorage.java              |  15 +-
 .../configuration/ConfigurationChangerTest.java    |   2 +-
 .../storage/TestConfigurationStorage.java          |   8 +-
 .../configuration/tree/NamedListNodeTest.java      |  14 +-
 .../org/apache/ignite/internal/util/ByteUtils.java |  10 +-
 .../internal/metastorage/client/EntryImpl.java     |   8 +-
 .../ItDistributedConfigurationStorageTest.java     |   2 +-
 .../storage/DistributedConfigurationStorage.java   | 113 ++++++----
 .../storage/LocalConfigurationStorage.java         |   9 +-
 .../DistributedConfigurationCatchUpTest.java       | 227 +++++++++++++++++++++
 12 files changed, 365 insertions(+), 78 deletions(-)

diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index d68be24d60..4369df3640 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -199,7 +199,7 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
         Data data;
 
         try {
-            data = storage.readAll().get();
+            data = storage.readDataOnRecovery().get();
         } catch (ExecutionException e) {
             throw new ConfigurationChangeException("Failed to initialize configuration: " + e.getCause().getMessage(), e.getCause());
         } catch (InterruptedException e) {
@@ -572,20 +572,26 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
 
         storageRoots = new StorageRoots(newSuperRoot, newChangeId);
 
-        if (dataValuesPrefixMap.isEmpty()) {
-            oldStorageRoots.changeFuture.complete(null);
-
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId, notificationListenerCnt.incrementAndGet())
-                .whenComplete((v, t) -> {
-                    if (t == null) {
+        // Save revisions for recovery.
+        return storage.writeConfigurationRevision(oldStorageRoots.version, storageRoots.version)
+                .thenCompose(unused -> {
+                    if (dataValuesPrefixMap.isEmpty()) {
                         oldStorageRoots.changeFuture.complete(null);
+
+                        return CompletableFuture.completedFuture(null);
                     } else {
-                        oldStorageRoots.changeFuture.completeExceptionally(t);
+                        long notificationNumber = notificationListenerCnt.incrementAndGet();
+
+                        return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId, notificationNumber)
+                                .whenComplete((v, t) -> {
+                                    if (t == null) {
+                                        oldStorageRoots.changeFuture.complete(null);
+                                    } else {
+                                        oldStorageRoots.changeFuture.completeExceptionally(t);
+                                    }
+                                });
                     }
                 });
-        }
     }
 
     /**
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
index e0791c9cad..017092e08c 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
@@ -425,9 +425,14 @@ public class ConfigurationNotifier {
                 // Lazy initialization.
                 Collection<DynamicConfiguration<InnerNode, ?>> newAnyConfigs = null;
 
+                NamedListConfiguration<?, InnerNode, ?> namedListCfg = namedDynamicConfig(config, key);
+
+                // Initialize named list configuration.
+                namedListCfg.touchMembers();
+
                 for (String name : newNamedList.namedListKeys()) {
                     DynamicConfiguration<InnerNode, ?> namedNodeConfig =
-                            (DynamicConfiguration<InnerNode, ?>) namedDynamicConfig(config, key).getConfig(name);
+                            (DynamicConfiguration<InnerNode, ?>) namedListCfg.getConfig(name);
 
                     ctx.addContainer(namedNodeConfig, name);
 
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
index 44d431c97d..2977168f1d 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
@@ -27,11 +27,11 @@ import org.apache.ignite.configuration.annotation.ConfigurationType;
  */
 public interface ConfigurationStorage extends AutoCloseable {
     /**
-     * Read all configuration values and current storage version.
+     * Reads all configuration values and current storage version during the recovery phase.
      *
      * @return Future that resolves into extracted values and version or a {@link StorageException} if the data could not be read.
      */
-    CompletableFuture<Data> readAll();
+    CompletableFuture<Data> readDataOnRecovery();
 
     /**
      * Retrieves the most recent values which keys start with the given prefix, regardless of the current storage version.
@@ -77,4 +77,15 @@ public interface ConfigurationStorage extends AutoCloseable {
      * Returns a future that will be completed when the latest revision of the storage is received.
      */
     CompletableFuture<Long> lastRevision();
+
+    /**
+     * Writes previous and current configuration's MetaStorage revision for recovery.
+     * We need previous and current for the fail-safety: in case if node fails before changing master key on configuration update,
+     * MetaStorage's applied revision will be lower than {@code currentRevision} and we will be using previous revision.
+     *
+     * @param prevRevision Previous revision.
+     * @param currentRevision Current revision.
+     * @return A future that will be completed when revisions are written to the storage.
+     */
+    CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision);
 }
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
index 9cb4f2abd6..75d3faa88e 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
@@ -251,7 +251,7 @@ public class ConfigurationChangerTest {
 
         storage.fail(false);
 
-        CompletableFuture<Map<String, ? extends Serializable>> dataFuture = storage.readAll().thenApply(Data::values);
+        CompletableFuture<Map<String, ? extends Serializable>> dataFuture = storage.readDataOnRecovery().thenApply(Data::values);
 
         assertThat(dataFuture, willBe(anEmptyMap()));
 
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
index 1d5265f711..74cd418109 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
@@ -103,7 +103,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Data> readAll() {
+    public CompletableFuture<Data> readDataOnRecovery() {
         return supplyAsync(() -> {
             synchronized (this) {
                 if (fail) {
@@ -165,6 +165,12 @@ public class TestConfigurationStorage implements ConfigurationStorage {
         return CompletableFuture.completedFuture(version);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision) {
+        return CompletableFuture.completedFuture(null);
+    }
+
     /**
      * Increase the current revision of the storage.
      *
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java
index 93b2b054ac..c84f43ff46 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java
@@ -140,7 +140,7 @@ public class NamedListNodeTest {
         String x0Id = ((NamedListNode<?>) a.second().value()).internalId("X").toString();
         String z0Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z0").toString();
 
-        CompletableFuture<Map<String, ? extends Serializable>> storageValues = storage.readAll().thenApply(Data::values);
+        CompletableFuture<Map<String, ? extends Serializable>> storageValues = storage.readDataOnRecovery().thenApply(Data::values);
 
         assertThat(
                 storageValues,
@@ -165,7 +165,7 @@ public class NamedListNodeTest {
 
         String z5Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z5").toString();
 
-        storageValues = storage.readAll().thenApply(Data::values);
+        storageValues = storage.readDataOnRecovery().thenApply(Data::values);
 
         assertThat(
                 storageValues,
@@ -192,7 +192,7 @@ public class NamedListNodeTest {
 
         String z2Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z2").toString();
 
-        storageValues = storage.readAll().thenApply(Data::values);
+        storageValues = storage.readDataOnRecovery().thenApply(Data::values);
 
         assertThat(
                 storageValues,
@@ -223,7 +223,7 @@ public class NamedListNodeTest {
 
         String z3Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z3").toString();
 
-        storageValues = storage.readAll().thenApply(Data::values);
+        storageValues = storage.readDataOnRecovery().thenApply(Data::values);
 
         assertThat(
                 storageValues,
@@ -255,7 +255,7 @@ public class NamedListNodeTest {
         // Delete keys from the middle. Indexes of Z3 should be updated to 1.
         x.third().change(xb -> xb.delete("Z2").delete("Z5")).get();
 
-        storageValues = storage.readAll().thenApply(Data::values);
+        storageValues = storage.readDataOnRecovery().thenApply(Data::values);
 
         assertThat(
                 storageValues,
@@ -279,7 +279,7 @@ public class NamedListNodeTest {
         // Delete keys from the middle. Indexes of Z3 should be updated to 1.
         x.third().change(xb -> xb.rename("Z0", "Z1")).get();
 
-        storageValues = storage.readAll().thenApply(Data::values);
+        storageValues = storage.readDataOnRecovery().thenApply(Data::values);
 
         assertThat(
                 storageValues,
@@ -303,7 +303,7 @@ public class NamedListNodeTest {
         // Delete values on several layers simultaneously. Storage must be empty after that.
         a.second().change(b -> b.delete("X")).get();
 
-        assertThat(storage.readAll().thenApply(Data::values), willBe(anEmptyMap()));
+        assertThat(storage.readDataOnRecovery().thenApply(Data::values), willBe(anEmptyMap()));
     }
 
     /** Tests exceptions described in methods signatures. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index 8fdd8e01ab..1c58b192fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -73,7 +73,7 @@ public class ByteUtils {
      * @return Array of bytes.
      */
     public static byte[] longToBytes(long l) {
-        return longToBytes(l, new byte[8], 0, 8);
+        return longToBytes(l, new byte[8], 0);
     }
 
     /**
@@ -83,15 +83,13 @@ public class ByteUtils {
      * @param l     Unsigned long value.
      * @param bytes Bytes array to write result to.
      * @param off   Offset in the target array to write result to.
-     * @param limit Limit of bytes to write into output.
      * @return Number of bytes overwritten in {@code bytes} array.
      */
-    private static byte[] longToBytes(long l, byte[] bytes, int off, int limit) {
+    public static byte[] longToBytes(long l, byte[] bytes, int off) {
         assert bytes != null;
-        assert limit <= Long.BYTES;
-        assert bytes.length >= off + limit;
+        assert bytes.length >= off + Long.BYTES;
 
-        for (int i = limit - 1; i >= 0; i--) {
+        for (int i = Long.BYTES - 1; i >= 0; i--) {
             bytes[off + i] = (byte) l;
             l >>>= 8;
         }
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
index 859ab9fba0..33a07f685e 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
@@ -31,8 +31,7 @@ public final class EntryImpl implements Entry {
     private final ByteArray key;
 
     /** Value. */
-    @Nullable
-    private final byte[] val;
+    private final byte @Nullable [] val;
 
     /** Revision. */
     private final long rev;
@@ -48,7 +47,7 @@ public final class EntryImpl implements Entry {
      * @param rev     Revision.
      * @param updCntr Update counter.
      */
-    EntryImpl(@NotNull ByteArray key, @Nullable byte[] val, long rev, long updCntr) {
+    public EntryImpl(@NotNull ByteArray key, byte @Nullable [] val, long rev, long updCntr) {
         this.key = key;
         this.val = val;
         this.rev = rev;
@@ -63,9 +62,8 @@ public final class EntryImpl implements Entry {
     }
 
     /** {@inheritDoc} */
-    @Nullable
     @Override
-    public byte[] value() {
+    public byte @Nullable [] value() {
         return val;
     }
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 6711a32042..beb6f479f8 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -168,7 +168,7 @@ public class ItDistributedConfigurationStorageTest {
         try {
             node2.start();
 
-            CompletableFuture<Data> storageData = node2.cfgStorage.readAll();
+            CompletableFuture<Data> storageData = node2.cfgStorage.readDataOnRecovery();
 
             assertThat(storageData.thenApply(Data::values), willBe(equalTo(data)));
         } finally {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index c19b55a66a..04899e69fb 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Distributed configuration storage.
@@ -67,6 +68,11 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
      */
     private static final ByteArray MASTER_KEY = new ByteArray(DISTRIBUTED_PREFIX + "$master$key");
 
+    /**
+     * Vault's key for a value of previous and current configuration's MetaStorage revision.
+     */
+    private static final ByteArray CONFIGURATION_REVISIONS_KEY = new ByteArray("$revisions");
+
     /**
      * Prefix for all keys in the distributed storage. This key is expected to be the first key in lexicographical order of distributed
      * configuration keys.
@@ -185,39 +191,68 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Data> readAll() throws StorageException {
-        return registerFuture(vaultMgr.get(MetaStorageManager.APPLIED_REV)
-                .thenApplyAsync(appliedRevEntry -> {
-                    long appliedRevision = appliedRevEntry == null ? 0L : ByteUtils.bytesToLong(appliedRevEntry.value());
+    public CompletableFuture<Data> readDataOnRecovery() throws StorageException {
+        CompletableFuture<Data> future = vaultMgr.get(MetaStorageManager.APPLIED_REV)
+                .thenCombine(vaultMgr.get(CONFIGURATION_REVISIONS_KEY), this::resolveRevision)
+                .thenApplyAsync(this::readDataOnRecovery0, threadPool);
 
-                    var data = new HashMap<String, Serializable>();
+        return registerFuture(future);
+    }
 
-                    try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) {
-                        for (VaultEntry entry : entries) {
-                            ByteArray key = entry.key();
-                            byte[] value = entry.value();
+    /**
+     * Resolves current configuration revision based on the saved in the Vault revision of the metastorage and also
+     * previous and current revisions of the configuration saved in the Vault.
+     *
+     * @param appliedRevEntry Applied revision entry.
+     * @param revisionsEntry Configuration revisions entry.
+     * @return Configuration revision.
+     */
+    private long resolveRevision(@Nullable VaultEntry appliedRevEntry, @Nullable VaultEntry revisionsEntry) {
+        long appliedRevision = appliedRevEntry == null ? 0L : ByteUtils.bytesToLong(appliedRevEntry.value());
 
-                            // vault iterator should not return nulls as values
-                            assert value != null;
+        long cfgRevision = appliedRevision;
 
-                            if (key.equals(MASTER_KEY)) {
-                                continue;
-                            }
+        if (revisionsEntry != null) {
+            byte[] value = revisionsEntry.value();
+            long prevMasterKeyRevision = ByteUtils.bytesToLong(value, 0);
+            long curMasterKeyRevision = ByteUtils.bytesToLong(value, Long.BYTES);
 
-                            String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length());
+            // If current master key revision is higher than applied revision, then node failed
+            // before applied revision changed, so we have to use previous master key revision
+            cfgRevision = curMasterKeyRevision <= appliedRevision ? curMasterKeyRevision : prevMasterKeyRevision;
+        }
 
-                            data.put(dataKey, ConfigurationSerializationUtil.fromBytes(value));
-                        }
-                    } catch (Exception e) {
-                        throw new StorageException("Exception when closing a Vault cursor", e);
-                    }
+        return cfgRevision;
+    }
+
+    private Data readDataOnRecovery0(long cfgRevision) {
+        var data = new HashMap<String, Serializable>();
+
+        try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) {
+            for (VaultEntry entry : entries) {
+                ByteArray key = entry.key();
+                byte[] value = entry.value();
 
-                    assert data.isEmpty() || appliedRevision > 0;
+                // vault iterator should not return nulls as values
+                assert value != null;
 
-                    changeId.set(data.isEmpty() ? 0 : appliedRevision);
+                if (key.equals(MASTER_KEY)) {
+                    continue;
+                }
+
+                String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length());
+
+                data.put(dataKey, ConfigurationSerializationUtil.fromBytes(value));
+            }
+        } catch (Exception e) {
+            throw new StorageException("Exception when closing a Vault cursor", e);
+        }
+
+        assert data.isEmpty() || cfgRevision > 0;
+
+        changeId.set(data.isEmpty() ? 0 : cfgRevision);
 
-                    return new Data(data, appliedRevision);
-                }, threadPool));
+        return new Data(data, cfgRevision);
     }
 
     /** {@inheritDoc} */
@@ -247,26 +282,9 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
 
         operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(curChangeId)));
 
-        // Condition for a valid MetaStorage data update. Several possibilities here:
-        //  - First update ever, MASTER_KEY property must be absent from MetaStorage.
-        //  - Current node has already performed some updates or received them from MetaStorage watch listener. In this
-        //    case "curChangeId" must match the MASTER_KEY revision exactly.
-        //  - Current node has been restarted and received updates from MetaStorage watch listeners after that. Same as
-        //    above, "curChangeId" must match the MASTER_KEY revision exactly.
-        //  - Current node has been restarted and have not received any updates from MetaStorage watch listeners yet.
-        //    In this case "curChangeId" matches APPLIED_REV, which may or may not match the MASTER_KEY revision. Two
-        //    options here:
-        //     - MASTER_KEY is missing in local MetaStorage copy. This means that current node have not performed nor
-        //       observed any configuration changes. Valid condition is "MASTER_KEY does not exist".
-        //     - MASTER_KEY is present in local MetaStorage copy. The MASTER_KEY revision is unknown but is less than or
-        //       equal to APPLIED_REV. Obviously, there have been no updates from the future yet. It's also guaranteed
-        //       that the next received configuration update will have the MASTER_KEY revision strictly greater than
-        //       current APPLIED_REV. This allows to conclude that "MASTER_KEY revision <= curChangeId" is a valid
-        //       condition for update.
-        // Combining all of the above, it's concluded that the following condition must be used:
         SimpleCondition condition = curChangeId == 0L
                 ? Conditions.notExists(MASTER_KEY)
-                : Conditions.revision(MASTER_KEY).le(curChangeId);
+                : Conditions.revision(MASTER_KEY).eq(curChangeId);
 
         return metaStorageMgr.invoke(condition, operations, Set.of(Operations.noop()));
     }
@@ -340,6 +358,17 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
         return metaStorageMgr.get(MASTER_KEY).thenApply(Entry::revision);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision) {
+        byte[] value = new byte[Long.BYTES * 2];
+
+        ByteUtils.longToBytes(prevRevision, value, 0);
+        ByteUtils.longToBytes(currentRevision, value, Long.BYTES);
+
+        return vaultMgr.put(CONFIGURATION_REVISIONS_KEY, value);
+    }
+
     /**
      * Method that returns all distributed configuration keys from the meta storage that were stored in the vault filtered out by the
      * current applied revision as an upper bound. Applied revision is a revision of the last successful vault update.
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index 7dc189a6a3..2f7e12e17f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -119,7 +119,7 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Data> readAll() {
+    public CompletableFuture<Data> readDataOnRecovery() {
         return readAll(LOC_KEYS_START_RANGE, LOC_KEYS_END_RANGE);
     }
 
@@ -223,6 +223,13 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
                 .thenApply(entry -> entry == null ? 0 : (Long) fromBytes(entry.value()));
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision) {
+        // No-op.
+        return CompletableFuture.completedFuture(null);
+    }
+
     /**
      * Increments the last character of the given string.
      */
diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
new file mode 100644
index 0000000000..1acf996e87
--- /dev/null
+++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.storage;
+
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.internal.configuration.TestConfigurationChanger;
+import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
+import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
+import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.Operation;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the {@link DistributedConfigurationStorage}.
+ */
+public class DistributedConfigurationCatchUpTest {
+    private final VaultManager vaultManager = new VaultManager(new InMemoryVaultService());
+
+    /**
+     * Before each.
+     */
+    @BeforeEach
+    void start() {
+        vaultManager.start();
+    }
+
+    /**
+     * After each.
+     */
+    @AfterEach
+    void stop() throws Exception {
+        vaultManager.stop();
+    }
+
+    /**
+     * Dummy configuration.
+     */
+    @ConfigurationRoot(rootName = "someKey", type = DISTRIBUTED)
+    public static class DistributedTestConfigurationSchema {
+        @Value(hasDefault = true)
+        public final int foobar = 0;
+    }
+
+    /**
+     * Tests that distributed configuration storage correctly picks up latest configuration MetaStorage revision
+     * during recovery process.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testMetaStorageRevisionDifferentFromConfigurationOnRestart() throws Exception {
+        RootKey<DistributedTestConfiguration, DistributedTestView> rootKey = DistributedTestConfiguration.KEY;
+
+        ConfigurationAsmGenerator cgen = new ConfigurationAsmGenerator();
+
+        MetaStorageMockWrapper wrapper = new MetaStorageMockWrapper();
+
+        try (var storage = new DistributedConfigurationStorage(wrapper.metaStorageManager(), vaultManager)) {
+            var changer = new TestConfigurationChanger(cgen, List.of(rootKey), Collections.emptyMap(),
+                    storage, Collections.emptyList(), Collections.emptyList());
+
+            try {
+                changer.start();
+
+                ConfigurationSource source = source(
+                        rootKey,
+                        (DistributedTestChange change) -> change.changeFoobar(1)
+                );
+
+                CompletableFuture<Void> change = changer.change(source);
+
+                change.get();
+            } finally {
+                changer.stop();
+            }
+        }
+
+        // Put a value to the configuration, so we start on non-empty vault.
+        vaultManager.put(MetaStorageMockWrapper.TEST_KEY, new byte[]{4, 1, 2, 3, 4}).get();
+
+        // This emulates a change in MetaStorage that is not related to the configuration.
+        vaultManager.put(MetaStorageManager.APPLIED_REV, ByteUtils.longToBytes(2)).get();
+
+        try (var storage = new DistributedConfigurationStorage(wrapper.metaStorageManager(), vaultManager)) {
+            var changer = new TestConfigurationChanger(cgen, List.of(rootKey), Collections.emptyMap(),
+                    storage, Collections.emptyList(), Collections.emptyList());
+
+            try {
+                changer.start();
+
+                // Should return last configuration change, not last MetaStorage change.
+                Long lastConfigurationChangeRevision = storage.lastRevision().get();
+
+                // Should be one, because we only changed configuration once.
+                assertEquals(1, lastConfigurationChangeRevision);
+            } finally {
+                changer.stop();
+            }
+        }
+    }
+
+    /**
+     * This class stores data for {@link MetaStorageManager}'s mock.
+     */
+    private static class MetaStorageMockWrapper {
+        private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
+
+        /**
+         * This and previous field are copy-pasted intentionally, so in case if something changes,
+         * this test should fail and be reviewed and re-written.
+         */
+        private static final ByteArray MASTER_KEY = new ByteArray(DISTRIBUTED_PREFIX + "$master$key");
+
+        private static final ByteArray TEST_KEY = new ByteArray(DISTRIBUTED_PREFIX + "someKey.foobar");
+
+        /** MetaStorage mock. */
+        private final MetaStorageManager mock;
+
+        /** Captured MetaStorage listener. */
+        private WatchListener lsnr;
+
+        /** Current master key revision. */
+        private long masterKeyRevision;
+
+        private MetaStorageMockWrapper() {
+            mock = mock(MetaStorageManager.class);
+
+            setup();
+        }
+
+        private void setup() {
+            // Returns current master key revision.
+            when(mock.get(MASTER_KEY)).then(invocation -> {
+                return CompletableFuture.completedFuture(new EntryImpl(MASTER_KEY, null, masterKeyRevision, -1));
+            });
+
+            // On any invocation - trigger storage listener.
+            when(mock.invoke(any(), anyCollection(), any()))
+                    .then(invocation -> {
+                        triggerStorageListener();
+
+                        return CompletableFuture.completedFuture(true);
+                    });
+
+            when(mock.invoke(any(), any(Operation.class), any()))
+                    .then(invocation -> {
+                        triggerStorageListener();
+
+                        return CompletableFuture.completedFuture(true);
+                    });
+
+            // This captures the listener.
+            when(mock.registerWatchByPrefix(any(), any())).then(invocation -> {
+                lsnr = invocation.getArgument(1);
+
+                return CompletableFuture.completedFuture(null);
+            });
+        }
+
+        /**
+         * Triggers MetaStorage listener incrementing master key revision.
+         */
+        private void triggerStorageListener() {
+            EntryEvent entryEvent = new EntryEvent(null, new EntryImpl(MASTER_KEY, null, ++masterKeyRevision, -1));
+            lsnr.onUpdate(new WatchEvent(entryEvent));
+        }
+
+        private MetaStorageManager metaStorageManager() {
+            return mock;
+        }
+    }
+
+    private static <CHANGET> ConfigurationSource source(RootKey<?, ? super CHANGET> rootKey, Consumer<CHANGET> changer) {
+        return new ConfigurationSource() {
+            @Override
+            public void descend(ConstructableTreeNode node) {
+                ConfigurationSource changerSrc = new ConfigurationSource() {
+                    @Override
+                    public void descend(ConstructableTreeNode node) {
+                        changer.accept((CHANGET) node);
+                    }
+                };
+
+                node.construct(rootKey.key(), changerSrc, true);
+            }
+        };
+    }
+}