You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/20 09:01:56 UTC
[ignite-3] branch main updated: IGNITE-17555 Fix AssertionError and NPE in configuration (#1023)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9767027226 IGNITE-17555 Fix AssertionError and NPE in configuration (#1023)
9767027226 is described below
commit 976702722640747f47d4f7583b0e056efd0e7163
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Sat Aug 20 13:01:52 2022 +0400
IGNITE-17555 Fix AssertionError and NPE in configuration (#1023)
---
.../configuration/ConfigurationChanger.java | 28 ++-
.../notifications/ConfigurationNotifier.java | 7 +-
.../storage/ConfigurationStorage.java | 15 +-
.../configuration/ConfigurationChangerTest.java | 2 +-
.../storage/TestConfigurationStorage.java | 8 +-
.../configuration/tree/NamedListNodeTest.java | 14 +-
.../org/apache/ignite/internal/util/ByteUtils.java | 10 +-
.../internal/metastorage/client/EntryImpl.java | 8 +-
.../ItDistributedConfigurationStorageTest.java | 2 +-
.../storage/DistributedConfigurationStorage.java | 113 ++++++----
.../storage/LocalConfigurationStorage.java | 9 +-
.../DistributedConfigurationCatchUpTest.java | 227 +++++++++++++++++++++
12 files changed, 365 insertions(+), 78 deletions(-)
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index d68be24d60..4369df3640 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -199,7 +199,7 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
Data data;
try {
- data = storage.readAll().get();
+ data = storage.readDataOnRecovery().get();
} catch (ExecutionException e) {
throw new ConfigurationChangeException("Failed to initialize configuration: " + e.getCause().getMessage(), e.getCause());
} catch (InterruptedException e) {
@@ -572,20 +572,26 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
storageRoots = new StorageRoots(newSuperRoot, newChangeId);
- if (dataValuesPrefixMap.isEmpty()) {
- oldStorageRoots.changeFuture.complete(null);
-
- return CompletableFuture.completedFuture(null);
- } else {
- return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId, notificationListenerCnt.incrementAndGet())
- .whenComplete((v, t) -> {
- if (t == null) {
+ // Save revisions for recovery.
+ return storage.writeConfigurationRevision(oldStorageRoots.version, storageRoots.version)
+ .thenCompose(unused -> {
+ if (dataValuesPrefixMap.isEmpty()) {
oldStorageRoots.changeFuture.complete(null);
+
+ return CompletableFuture.completedFuture(null);
} else {
- oldStorageRoots.changeFuture.completeExceptionally(t);
+ long notificationNumber = notificationListenerCnt.incrementAndGet();
+
+ return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId, notificationNumber)
+ .whenComplete((v, t) -> {
+ if (t == null) {
+ oldStorageRoots.changeFuture.complete(null);
+ } else {
+ oldStorageRoots.changeFuture.completeExceptionally(t);
+ }
+ });
}
});
- }
}
/**
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
index e0791c9cad..017092e08c 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
@@ -425,9 +425,14 @@ public class ConfigurationNotifier {
// Lazy initialization.
Collection<DynamicConfiguration<InnerNode, ?>> newAnyConfigs = null;
+ NamedListConfiguration<?, InnerNode, ?> namedListCfg = namedDynamicConfig(config, key);
+
+ // Initialize named list configuration.
+ namedListCfg.touchMembers();
+
for (String name : newNamedList.namedListKeys()) {
DynamicConfiguration<InnerNode, ?> namedNodeConfig =
- (DynamicConfiguration<InnerNode, ?>) namedDynamicConfig(config, key).getConfig(name);
+ (DynamicConfiguration<InnerNode, ?>) namedListCfg.getConfig(name);
ctx.addContainer(namedNodeConfig, name);
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
index 44d431c97d..2977168f1d 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
@@ -27,11 +27,11 @@ import org.apache.ignite.configuration.annotation.ConfigurationType;
*/
public interface ConfigurationStorage extends AutoCloseable {
/**
- * Read all configuration values and current storage version.
+ * Reads all configuration values and current storage version during the recovery phase.
*
* @return Future that resolves into extracted values and version or a {@link StorageException} if the data could not be read.
*/
- CompletableFuture<Data> readAll();
+ CompletableFuture<Data> readDataOnRecovery();
/**
* Retrieves the most recent values which keys start with the given prefix, regardless of the current storage version.
@@ -77,4 +77,15 @@ public interface ConfigurationStorage extends AutoCloseable {
* Returns a future that will be completed when the latest revision of the storage is received.
*/
CompletableFuture<Long> lastRevision();
+
+ /**
+ * Writes previous and current configuration's MetaStorage revision for recovery.
+ * We need previous and current for the fail-safety: in case if node fails before changing master key on configuration update,
+ * MetaStorage's applied revision will be lower than {@code currentRevision} and we will be using previous revision.
+ *
+ * @param prevRevision Previous revision.
+ * @param currentRevision Current revision.
+ * @return A future that will be completed when revisions are written to the storage.
+ */
+ CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision);
}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
index 9cb4f2abd6..75d3faa88e 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
@@ -251,7 +251,7 @@ public class ConfigurationChangerTest {
storage.fail(false);
- CompletableFuture<Map<String, ? extends Serializable>> dataFuture = storage.readAll().thenApply(Data::values);
+ CompletableFuture<Map<String, ? extends Serializable>> dataFuture = storage.readDataOnRecovery().thenApply(Data::values);
assertThat(dataFuture, willBe(anEmptyMap()));
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
index 1d5265f711..74cd418109 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
@@ -103,7 +103,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Data> readAll() {
+ public CompletableFuture<Data> readDataOnRecovery() {
return supplyAsync(() -> {
synchronized (this) {
if (fail) {
@@ -165,6 +165,12 @@ public class TestConfigurationStorage implements ConfigurationStorage {
return CompletableFuture.completedFuture(version);
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision) {
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* Increase the current revision of the storage.
*
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java
index 93b2b054ac..c84f43ff46 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/tree/NamedListNodeTest.java
@@ -140,7 +140,7 @@ public class NamedListNodeTest {
String x0Id = ((NamedListNode<?>) a.second().value()).internalId("X").toString();
String z0Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z0").toString();
- CompletableFuture<Map<String, ? extends Serializable>> storageValues = storage.readAll().thenApply(Data::values);
+ CompletableFuture<Map<String, ? extends Serializable>> storageValues = storage.readDataOnRecovery().thenApply(Data::values);
assertThat(
storageValues,
@@ -165,7 +165,7 @@ public class NamedListNodeTest {
String z5Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z5").toString();
- storageValues = storage.readAll().thenApply(Data::values);
+ storageValues = storage.readDataOnRecovery().thenApply(Data::values);
assertThat(
storageValues,
@@ -192,7 +192,7 @@ public class NamedListNodeTest {
String z2Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z2").toString();
- storageValues = storage.readAll().thenApply(Data::values);
+ storageValues = storage.readDataOnRecovery().thenApply(Data::values);
assertThat(
storageValues,
@@ -223,7 +223,7 @@ public class NamedListNodeTest {
String z3Id = ((NamedListNode<?>) a.second().get("X").third().value()).internalId("Z3").toString();
- storageValues = storage.readAll().thenApply(Data::values);
+ storageValues = storage.readDataOnRecovery().thenApply(Data::values);
assertThat(
storageValues,
@@ -255,7 +255,7 @@ public class NamedListNodeTest {
// Delete keys from the middle. Indexes of Z3 should be updated to 1.
x.third().change(xb -> xb.delete("Z2").delete("Z5")).get();
- storageValues = storage.readAll().thenApply(Data::values);
+ storageValues = storage.readDataOnRecovery().thenApply(Data::values);
assertThat(
storageValues,
@@ -279,7 +279,7 @@ public class NamedListNodeTest {
// Delete keys from the middle. Indexes of Z3 should be updated to 1.
x.third().change(xb -> xb.rename("Z0", "Z1")).get();
- storageValues = storage.readAll().thenApply(Data::values);
+ storageValues = storage.readDataOnRecovery().thenApply(Data::values);
assertThat(
storageValues,
@@ -303,7 +303,7 @@ public class NamedListNodeTest {
// Delete values on several layers simultaneously. Storage must be empty after that.
a.second().change(b -> b.delete("X")).get();
- assertThat(storage.readAll().thenApply(Data::values), willBe(anEmptyMap()));
+ assertThat(storage.readDataOnRecovery().thenApply(Data::values), willBe(anEmptyMap()));
}
/** Tests exceptions described in methods signatures. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index 8fdd8e01ab..1c58b192fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -73,7 +73,7 @@ public class ByteUtils {
* @return Array of bytes.
*/
public static byte[] longToBytes(long l) {
- return longToBytes(l, new byte[8], 0, 8);
+ return longToBytes(l, new byte[8], 0);
}
/**
@@ -83,15 +83,13 @@ public class ByteUtils {
* @param l Unsigned long value.
* @param bytes Bytes array to write result to.
* @param off Offset in the target array to write result to.
- * @param limit Limit of bytes to write into output.
* @return Number of bytes overwritten in {@code bytes} array.
*/
- private static byte[] longToBytes(long l, byte[] bytes, int off, int limit) {
+ public static byte[] longToBytes(long l, byte[] bytes, int off) {
assert bytes != null;
- assert limit <= Long.BYTES;
- assert bytes.length >= off + limit;
+ assert bytes.length >= off + Long.BYTES;
- for (int i = limit - 1; i >= 0; i--) {
+ for (int i = Long.BYTES - 1; i >= 0; i--) {
bytes[off + i] = (byte) l;
l >>>= 8;
}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
index 859ab9fba0..33a07f685e 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
@@ -31,8 +31,7 @@ public final class EntryImpl implements Entry {
private final ByteArray key;
/** Value. */
- @Nullable
- private final byte[] val;
+ private final byte @Nullable [] val;
/** Revision. */
private final long rev;
@@ -48,7 +47,7 @@ public final class EntryImpl implements Entry {
* @param rev Revision.
* @param updCntr Update counter.
*/
- EntryImpl(@NotNull ByteArray key, @Nullable byte[] val, long rev, long updCntr) {
+ public EntryImpl(@NotNull ByteArray key, byte @Nullable [] val, long rev, long updCntr) {
this.key = key;
this.val = val;
this.rev = rev;
@@ -63,9 +62,8 @@ public final class EntryImpl implements Entry {
}
/** {@inheritDoc} */
- @Nullable
@Override
- public byte[] value() {
+ public byte @Nullable [] value() {
return val;
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 6711a32042..beb6f479f8 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -168,7 +168,7 @@ public class ItDistributedConfigurationStorageTest {
try {
node2.start();
- CompletableFuture<Data> storageData = node2.cfgStorage.readAll();
+ CompletableFuture<Data> storageData = node2.cfgStorage.readDataOnRecovery();
assertThat(storageData.thenApply(Data::values), willBe(equalTo(data)));
} finally {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index c19b55a66a..04899e69fb 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Distributed configuration storage.
@@ -67,6 +68,11 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
*/
private static final ByteArray MASTER_KEY = new ByteArray(DISTRIBUTED_PREFIX + "$master$key");
+ /**
+ * Vault's key for a value of previous and current configuration's MetaStorage revision.
+ */
+ private static final ByteArray CONFIGURATION_REVISIONS_KEY = new ByteArray("$revisions");
+
/**
* Prefix for all keys in the distributed storage. This key is expected to be the first key in lexicographical order of distributed
* configuration keys.
@@ -185,39 +191,68 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Data> readAll() throws StorageException {
- return registerFuture(vaultMgr.get(MetaStorageManager.APPLIED_REV)
- .thenApplyAsync(appliedRevEntry -> {
- long appliedRevision = appliedRevEntry == null ? 0L : ByteUtils.bytesToLong(appliedRevEntry.value());
+ public CompletableFuture<Data> readDataOnRecovery() throws StorageException {
+ CompletableFuture<Data> future = vaultMgr.get(MetaStorageManager.APPLIED_REV)
+ .thenCombine(vaultMgr.get(CONFIGURATION_REVISIONS_KEY), this::resolveRevision)
+ .thenApplyAsync(this::readDataOnRecovery0, threadPool);
- var data = new HashMap<String, Serializable>();
+ return registerFuture(future);
+ }
- try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) {
- for (VaultEntry entry : entries) {
- ByteArray key = entry.key();
- byte[] value = entry.value();
+ /**
+ * Resolves current configuration revision based on the saved in the Vault revision of the metastorage and also
+ * previous and current revisions of the configuration saved in the Vault.
+ *
+ * @param appliedRevEntry Applied revision entry.
+ * @param revisionsEntry Configuration revisions entry.
+ * @return Configuration revision.
+ */
+ private long resolveRevision(@Nullable VaultEntry appliedRevEntry, @Nullable VaultEntry revisionsEntry) {
+ long appliedRevision = appliedRevEntry == null ? 0L : ByteUtils.bytesToLong(appliedRevEntry.value());
- // vault iterator should not return nulls as values
- assert value != null;
+ long cfgRevision = appliedRevision;
- if (key.equals(MASTER_KEY)) {
- continue;
- }
+ if (revisionsEntry != null) {
+ byte[] value = revisionsEntry.value();
+ long prevMasterKeyRevision = ByteUtils.bytesToLong(value, 0);
+ long curMasterKeyRevision = ByteUtils.bytesToLong(value, Long.BYTES);
- String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length());
+ // If current master key revision is higher than applied revision, then node failed
+ // before applied revision changed, so we have to use previous master key revision
+ cfgRevision = curMasterKeyRevision <= appliedRevision ? curMasterKeyRevision : prevMasterKeyRevision;
+ }
- data.put(dataKey, ConfigurationSerializationUtil.fromBytes(value));
- }
- } catch (Exception e) {
- throw new StorageException("Exception when closing a Vault cursor", e);
- }
+ return cfgRevision;
+ }
+
+ private Data readDataOnRecovery0(long cfgRevision) {
+ var data = new HashMap<String, Serializable>();
+
+ try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) {
+ for (VaultEntry entry : entries) {
+ ByteArray key = entry.key();
+ byte[] value = entry.value();
- assert data.isEmpty() || appliedRevision > 0;
+ // vault iterator should not return nulls as values
+ assert value != null;
- changeId.set(data.isEmpty() ? 0 : appliedRevision);
+ if (key.equals(MASTER_KEY)) {
+ continue;
+ }
+
+ String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length());
+
+ data.put(dataKey, ConfigurationSerializationUtil.fromBytes(value));
+ }
+ } catch (Exception e) {
+ throw new StorageException("Exception when closing a Vault cursor", e);
+ }
+
+ assert data.isEmpty() || cfgRevision > 0;
+
+ changeId.set(data.isEmpty() ? 0 : cfgRevision);
- return new Data(data, appliedRevision);
- }, threadPool));
+ return new Data(data, cfgRevision);
}
/** {@inheritDoc} */
@@ -247,26 +282,9 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(curChangeId)));
- // Condition for a valid MetaStorage data update. Several possibilities here:
- // - First update ever, MASTER_KEY property must be absent from MetaStorage.
- // - Current node has already performed some updates or received them from MetaStorage watch listener. In this
- // case "curChangeId" must match the MASTER_KEY revision exactly.
- // - Current node has been restarted and received updates from MetaStorage watch listeners after that. Same as
- // above, "curChangeId" must match the MASTER_KEY revision exactly.
- // - Current node has been restarted and have not received any updates from MetaStorage watch listeners yet.
- // In this case "curChangeId" matches APPLIED_REV, which may or may not match the MASTER_KEY revision. Two
- // options here:
- // - MASTER_KEY is missing in local MetaStorage copy. This means that current node have not performed nor
- // observed any configuration changes. Valid condition is "MASTER_KEY does not exist".
- // - MASTER_KEY is present in local MetaStorage copy. The MASTER_KEY revision is unknown but is less than or
- // equal to APPLIED_REV. Obviously, there have been no updates from the future yet. It's also guaranteed
- // that the next received configuration update will have the MASTER_KEY revision strictly greater than
- // current APPLIED_REV. This allows to conclude that "MASTER_KEY revision <= curChangeId" is a valid
- // condition for update.
- // Combining all of the above, it's concluded that the following condition must be used:
SimpleCondition condition = curChangeId == 0L
? Conditions.notExists(MASTER_KEY)
- : Conditions.revision(MASTER_KEY).le(curChangeId);
+ : Conditions.revision(MASTER_KEY).eq(curChangeId);
return metaStorageMgr.invoke(condition, operations, Set.of(Operations.noop()));
}
@@ -340,6 +358,17 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
return metaStorageMgr.get(MASTER_KEY).thenApply(Entry::revision);
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision) {
+ byte[] value = new byte[Long.BYTES * 2];
+
+ ByteUtils.longToBytes(prevRevision, value, 0);
+ ByteUtils.longToBytes(currentRevision, value, Long.BYTES);
+
+ return vaultMgr.put(CONFIGURATION_REVISIONS_KEY, value);
+ }
+
/**
* Method that returns all distributed configuration keys from the meta storage that were stored in the vault filtered out by the
* current applied revision as an upper bound. Applied revision is a revision of the last successful vault update.
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index 7dc189a6a3..2f7e12e17f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -119,7 +119,7 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
/** {@inheritDoc} */
@Override
- public CompletableFuture<Data> readAll() {
+ public CompletableFuture<Data> readDataOnRecovery() {
return readAll(LOC_KEYS_START_RANGE, LOC_KEYS_END_RANGE);
}
@@ -223,6 +223,13 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
.thenApply(entry -> entry == null ? 0 : (Long) fromBytes(entry.value()));
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> writeConfigurationRevision(long prevRevision, long currentRevision) {
+ // No-op.
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* Increments the last character of the given string.
*/
diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
new file mode 100644
index 0000000000..1acf996e87
--- /dev/null
+++ b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.storage;
+
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.internal.configuration.TestConfigurationChanger;
+import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
+import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
+import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.Operation;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the {@link DistributedConfigurationStorage}.
+ */
+public class DistributedConfigurationCatchUpTest {
+ private final VaultManager vaultManager = new VaultManager(new InMemoryVaultService());
+
+ /**
+ * Before each.
+ */
+ @BeforeEach
+ void start() {
+ vaultManager.start();
+ }
+
+ /**
+ * After each.
+ */
+ @AfterEach
+ void stop() throws Exception {
+ vaultManager.stop();
+ }
+
+ /**
+ * Dummy configuration.
+ */
+ @ConfigurationRoot(rootName = "someKey", type = DISTRIBUTED)
+ public static class DistributedTestConfigurationSchema {
+ @Value(hasDefault = true)
+ public final int foobar = 0;
+ }
+
+ /**
+ * Tests that distributed configuration storage correctly picks up latest configuration MetaStorage revision
+ * during recovery process.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testMetaStorageRevisionDifferentFromConfigurationOnRestart() throws Exception {
+ RootKey<DistributedTestConfiguration, DistributedTestView> rootKey = DistributedTestConfiguration.KEY;
+
+ ConfigurationAsmGenerator cgen = new ConfigurationAsmGenerator();
+
+ MetaStorageMockWrapper wrapper = new MetaStorageMockWrapper();
+
+ try (var storage = new DistributedConfigurationStorage(wrapper.metaStorageManager(), vaultManager)) {
+ var changer = new TestConfigurationChanger(cgen, List.of(rootKey), Collections.emptyMap(),
+ storage, Collections.emptyList(), Collections.emptyList());
+
+ try {
+ changer.start();
+
+ ConfigurationSource source = source(
+ rootKey,
+ (DistributedTestChange change) -> change.changeFoobar(1)
+ );
+
+ CompletableFuture<Void> change = changer.change(source);
+
+ change.get();
+ } finally {
+ changer.stop();
+ }
+ }
+
+ // Put a value to the configuration, so we start on non-empty vault.
+ vaultManager.put(MetaStorageMockWrapper.TEST_KEY, new byte[]{4, 1, 2, 3, 4}).get();
+
+ // This emulates a change in MetaStorage that is not related to the configuration.
+ vaultManager.put(MetaStorageManager.APPLIED_REV, ByteUtils.longToBytes(2)).get();
+
+ try (var storage = new DistributedConfigurationStorage(wrapper.metaStorageManager(), vaultManager)) {
+ var changer = new TestConfigurationChanger(cgen, List.of(rootKey), Collections.emptyMap(),
+ storage, Collections.emptyList(), Collections.emptyList());
+
+ try {
+ changer.start();
+
+ // Should return last configuration change, not last MetaStorage change.
+ Long lastConfigurationChangeRevision = storage.lastRevision().get();
+
+ // Should be one, because we only changed configuration once.
+ assertEquals(1, lastConfigurationChangeRevision);
+ } finally {
+ changer.stop();
+ }
+ }
+ }
+
+ /**
+ * This class stores data for {@link MetaStorageManager}'s mock.
+ */
+ private static class MetaStorageMockWrapper {
+ private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
+
+ /**
+ * This and previous field are copy-pasted intentionally, so in case if something changes,
+ * this test should fail and be reviewed and re-written.
+ */
+ private static final ByteArray MASTER_KEY = new ByteArray(DISTRIBUTED_PREFIX + "$master$key");
+
+ private static final ByteArray TEST_KEY = new ByteArray(DISTRIBUTED_PREFIX + "someKey.foobar");
+
+ /** MetaStorage mock. */
+ private final MetaStorageManager mock;
+
+ /** Captured MetaStorage listener. */
+ private WatchListener lsnr;
+
+ /** Current master key revision. */
+ private long masterKeyRevision;
+
+ private MetaStorageMockWrapper() {
+ mock = mock(MetaStorageManager.class);
+
+ setup();
+ }
+
+ private void setup() {
+ // Returns current master key revision.
+ when(mock.get(MASTER_KEY)).then(invocation -> {
+ return CompletableFuture.completedFuture(new EntryImpl(MASTER_KEY, null, masterKeyRevision, -1));
+ });
+
+ // On any invocation - trigger storage listener.
+ when(mock.invoke(any(), anyCollection(), any()))
+ .then(invocation -> {
+ triggerStorageListener();
+
+ return CompletableFuture.completedFuture(true);
+ });
+
+ when(mock.invoke(any(), any(Operation.class), any()))
+ .then(invocation -> {
+ triggerStorageListener();
+
+ return CompletableFuture.completedFuture(true);
+ });
+
+ // This captures the listener.
+ when(mock.registerWatchByPrefix(any(), any())).then(invocation -> {
+ lsnr = invocation.getArgument(1);
+
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ /**
+ * Triggers MetaStorage listener incrementing master key revision.
+ */
+ private void triggerStorageListener() {
+ EntryEvent entryEvent = new EntryEvent(null, new EntryImpl(MASTER_KEY, null, ++masterKeyRevision, -1));
+ lsnr.onUpdate(new WatchEvent(entryEvent));
+ }
+
+ private MetaStorageManager metaStorageManager() {
+ return mock;
+ }
+ }
+
+ private static <CHANGET> ConfigurationSource source(RootKey<?, ? super CHANGET> rootKey, Consumer<CHANGET> changer) {
+ return new ConfigurationSource() {
+ @Override
+ public void descend(ConstructableTreeNode node) {
+ ConfigurationSource changerSrc = new ConfigurationSource() {
+ @Override
+ public void descend(ConstructableTreeNode node) {
+ changer.accept((CHANGET) node);
+ }
+ };
+
+ node.construct(rootKey.key(), changerSrc, true);
+ }
+ };
+ }
+}