You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/08/11 06:30:04 UTC
[ignite-3] branch main updated: IGNITE-15264 Fixed wrong applied
revision handling in DistributedConfigurationStorage (#262)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 356c766 IGNITE-15264 Fixed wrong applied revision handling in DistributedConfigurationStorage (#262)
356c766 is described below
commit 356c76608d6667323bd5b312fac9590af0368f7d
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Aug 11 09:29:55 2021 +0300
IGNITE-15264 Fixed wrong applied revision handling in DistributedConfigurationStorage (#262)
---
.../configuration/ConfigurationChanger.java | 21 ++-
.../storage/ConfigurationStorage.java | 7 -
.../storage/ConfigurationStorageListener.java | 6 +-
.../storage/TestConfigurationStorage.java | 6 +-
.../internal/metastorage/MetaStorageManager.java | 5 +-
.../ITDistributedConfigurationStorageTest.java | 3 +-
.../storage/DistributedConfigurationStorage.java | 144 +++++++++++----------
.../storage/LocalConfigurationStorage.java | 15 +--
8 files changed, 95 insertions(+), 112 deletions(-)
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 58b1494..96eb0a4 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -408,11 +408,13 @@ public abstract class ConfigurationChanger {
}
/**
- * Update configuration from storage listener.
+ * Updates configuration from storage listener.
+ *
* @param storageType Type of the storage that propagated these changes.
* @param changedEntries Changed data.
+ * @return Future that signifies update completion.
*/
- private void updateFromListener(
+ private CompletableFuture<Void> updateFromListener(
ConfigurationType storageType,
Data changedEntries
) {
@@ -427,20 +429,17 @@ public abstract class ConfigurationChanger {
fillFromPrefixMap(newSuperRoot, dataValuesPrefixMap);
- StorageRoots newStorageRoots = new StorageRoots(newSuperRoot, changedEntries.changeId());
-
- storagesRootsMap.put(storageType, newStorageRoots);
+ long newChangeId = changedEntries.changeId();
- ConfigurationStorage storage = storageInstances.get(storageType);
+ StorageRoots newStorageRoots = new StorageRoots(newSuperRoot, newChangeId);
- long storageRevision = changedEntries.changeId();
+ storagesRootsMap.put(storageType, newStorageRoots);
- // This will also be updated during the metastorage integration.
- notificator.notify(
+ return notificator.notify(
oldSuperRoot,
newSuperRoot,
- storageRevision
- ).whenCompleteAsync((res, throwable) -> storage.notifyApplied(storageRevision), pool);
+ newChangeId
+ );
}
/**
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
index 8bcd2e1..efc133e 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
@@ -49,13 +49,6 @@ public interface ConfigurationStorage {
void registerConfigurationListener(@NotNull ConfigurationStorageListener lsnr);
/**
- * Notify storage that this specific revision was successfully handled and it is not necessary to repeat the same
- * notification on node restart.
- * @param storageRevision Storage revision.
- */
- void notifyApplied(long storageRevision);
-
- /**
* @return Type of this configuration storage.
*/
ConfigurationType type();
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorageListener.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorageListener.java
index 5445edd..ccbbb1c 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorageListener.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorageListener.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.internal.configuration.storage;
+import java.util.concurrent.CompletableFuture;
+
/**
* Configuration storage listener for changes.
*/
@@ -23,7 +25,9 @@ package org.apache.ignite.internal.configuration.storage;
public interface ConfigurationStorageListener {
/**
* Method called when entries in storage change.
+ *
* @param changedEntries Changed entries, key-value pairs and new version of the storage.
+ * @return Completable future that signifies the completion of all custom user listeners execution.
*/
- void onEntriesChanged(Data changedEntries);
+ CompletableFuture<Void> onEntriesChanged(Data changedEntries);
}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
index e0cb771..1595250 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
@@ -86,7 +86,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
version.incrementAndGet();
- listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get())));
+ listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get())).join());
return CompletableFuture.completedFuture(true);
}
@@ -97,10 +97,6 @@ public class TestConfigurationStorage implements ConfigurationStorage {
}
/** {@inheritDoc} */
- @Override public void notifyApplied(long storageRevision) {
- }
-
- /** {@inheritDoc} */
@Override public ConfigurationType type() {
return configurationType;
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index ec8fa1b..a3a8d90 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -74,14 +74,11 @@ public class MetaStorageManager implements IgniteComponent {
/** Meta storage raft group name. */
private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group";
- /** Prefix added to configuration keys to distinguish them in the meta storage. Must end with a dot. */
- public static final String DISTRIBUTED_PREFIX = "dst-cfg.";
-
/**
* Special key for the vault where the applied revision for {@link MetaStorageManager#storeEntries}
* operation is stored. This mechanism is needed for committing processed watches to {@link VaultManager}.
*/
- public static final ByteArray APPLIED_REV = ByteArray.fromString(DISTRIBUTED_PREFIX + "applied_revision");
+ public static final ByteArray APPLIED_REV = ByteArray.fromString("applied_revision");
/** Vault manager in order to commit processed watches with corresponding applied revision. */
private final VaultManager vaultMgr;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java
index 216770c..002f73a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/storage/ITDistributedConfigurationStorageTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -132,7 +133,7 @@ public class ITDistributedConfigurationStorageTest {
Stream.of(clusterService, raftManager, metaStorageManager).forEach(IgniteComponent::start);
// this is needed to avoid assertion errors
- cfgStorage.registerConfigurationListener(changedEntries -> {});
+ cfgStorage.registerConfigurationListener(changedEntries -> completedFuture(null));
// deploy watches to propagate data from the metastore into the vault
metaStorageManager.deployWatches();
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
index e5e81e7..92bc620 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
@@ -18,10 +18,10 @@
package org.apache.ignite.internal.storage;
import java.io.Serializable;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.configuration.annotation.ConfigurationType;
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.configuration.storage.ConfigurationStorageList
import org.apache.ignite.internal.configuration.storage.Data;
import org.apache.ignite.internal.configuration.storage.StorageException;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.Conditions;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.EntryEvent;
@@ -45,8 +46,6 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.jetbrains.annotations.NotNull;
-import static org.apache.ignite.internal.metastorage.MetaStorageManager.DISTRIBUTED_PREFIX;
-
/**
* Distributed configuration storage.
*/
@@ -54,6 +53,9 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(DistributedConfigurationStorage.class);
+ /** Prefix added to configuration keys to distinguish them in the meta storage. Must end with a dot. */
+ public static final String DISTRIBUTED_PREFIX = "dst-cfg.";
+
/**
* Key for CAS-ing configuration keys to meta storage.
*/
@@ -68,7 +70,7 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
/**
* This key is expected to be the last key in lexicographical order of distributed configuration keys. It is
* possible because keys are in lexicographical order in meta storage and adding {@code (char)('.' + 1)} to the end
- * will produce all keys with prefix {@link MetaStorageManager#DISTRIBUTED_PREFIX}
+ * will produce all keys with prefix {@link DistributedConfigurationStorage#DISTRIBUTED_PREFIX}
*/
private static final ByteArray DST_KEYS_END_RANGE =
new ByteArray(DISTRIBUTED_PREFIX.substring(0, DISTRIBUTED_PREFIX.length() - 1) + (char)('.' + 1));
@@ -82,8 +84,25 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
/** Configuration changes listener. */
private volatile ConfigurationStorageListener lsnr;
- /** Storage version. It stores actual meta storage revision, that is applied to configuration manager. */
- private final AtomicLong ver = new AtomicLong(0L);
+ /**
+ * Currently known change id. Either matches or will soon match the Meta Storage revision of the latest
+ * configuration update. It is possible that {@code changeId} is already updated but notifications are not yet
+ * handled, thus revision is valid but not applied. This is fine.
+ * <p/>
+ * Given that {@link #MASTER_KEY} is updated on every configuration change, one could assume that {@code changeId}
+ * matches the revision of {@link #MASTER_KEY}.
+ * <p/>
+ * This is true for all cases except for node restart. Key-specific revision values are lost on local vault copy
+ * after restart, so stored {@link MetaStorageManager#APPLIED_REV} value is used instead. This fact has very
+ * important side effect: it's no longer possible to use {@link Condition.RevisionCondition#eq} on
+ * {@link #MASTER_KEY} in {@link DistributedConfigurationStorage#write(Map, long)}.
+ * {@link Condition.RevisionCondition#le(long)} must be used instead.
+ *
+ * @see #MASTER_KEY
+ * @see MetaStorageManager#APPLIED_REV
+ * @see #write(Map, long)
+ */
+ private final AtomicLong changeId = new AtomicLong(0L);
/**
* Constructor.
@@ -98,10 +117,12 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
}
/** {@inheritDoc} */
- @Override public synchronized Data readAll() throws StorageException {
+ @Override public Data readAll() throws StorageException {
var data = new HashMap<String, Serializable>();
- long appliedRevision = 0L;
+ VaultEntry appliedRevEntry = vaultMgr.get(MetaStorageManager.APPLIED_REV).join();
+
+ long appliedRevision = appliedRevEntry.value() == null ? 0L : ByteUtils.bytesToLong(appliedRevEntry.value());
try (Cursor<VaultEntry> entries = storedDistributedConfigKeys()) {
for (VaultEntry entry : entries) {
@@ -111,16 +132,10 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
// vault iterator should not return nulls as values
assert value != null;
- if (key.equals(MetaStorageManager.APPLIED_REV)) {
- appliedRevision = ByteUtils.bytesToLong(value);
-
- continue;
- }
-
if (key.equals(MASTER_KEY))
continue;
- String dataKey = key.toString().substring((DISTRIBUTED_PREFIX).length());
+ String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length());
data.put(dataKey, (Serializable)ByteUtils.fromBytes(value));
}
@@ -129,30 +144,25 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
throw new StorageException("Exception when closing a Vault cursor", e);
}
- if (!data.isEmpty()) {
- assert appliedRevision > 0;
-
- assert appliedRevision >= ver.get() : "Applied revision cannot be less than storage version " +
- "that is applied to configuration manager.";
+ assert data.isEmpty() || appliedRevision > 0;
- return new Data(data, appliedRevision);
- }
+ changeId.set(data.isEmpty() ? 0 : appliedRevision);
- return new Data(data, ver.get());
+ return new Data(data, appliedRevision);
}
/** {@inheritDoc} */
- @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
- assert sentVersion <= ver.get();
+ @Override public CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long curChangeId) {
+ assert curChangeId <= changeId.get();
assert lsnr != null : "Configuration listener must be initialized before write.";
- if (sentVersion != ver.get())
- // This means that sentVersion is less than version and other node has already updated configuration and
+ if (curChangeId < changeId.get())
+ // This means that curChangeId is less than version and other node has already updated configuration and
// write should be retried. Actual version will be set when watch and corresponding configuration listener
- // updates configuration and notifyApplied is triggered afterwards.
+ // updates configuration.
return CompletableFuture.completedFuture(false);
- HashSet<Operation> operations = new HashSet<>();
+ Set<Operation> operations = new HashSet<>();
for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
ByteArray key = new ByteArray(DISTRIBUTED_PREFIX + entry.getKey());
@@ -165,20 +175,30 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
operations.add(Operations.remove(key));
}
- operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(sentVersion)));
-
- if (sentVersion == 0) {
- return metaStorageMgr.invoke(
- Conditions.notExists(MASTER_KEY),
- operations,
- Collections.singleton(Operations.noop()));
- }
- else {
- return metaStorageMgr.invoke(
- Conditions.revision(MASTER_KEY).eq(ver.get()),
- operations,
- Collections.singleton(Operations.noop()));
- }
+ operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(curChangeId)));
+
+ // Condition for a valid MetaStorage data update. Several possibilities here:
+ // - First update ever, MASTER_KEY property must be absent from MetaStorage.
+ // - Current node has already performed some updates or received them from MetaStorage watch listener. In this
+ // case "curChangeId" must match the MASTER_KEY revision exactly.
+ // - Current node has been restarted and received updates from MetaStorage watch listeners after that. Same as
+ // above, "curChangeId" must match the MASTER_KEY revision exactly.
+ // - Current node has been restarted and have not received any updates from MetaStorage watch listeners yet.
+ // In this case "curChangeId" matches APPLIED_REV, which may or may not match the MASTER_KEY revision. Two
+ // options here:
+ // - MASTER_KEY is missing in local MetaStorage copy. This means that current node have not performed nor
+ // observed any configuration changes. Valid condition is "MASTER_KEY does not exist".
+ // - MASTER_KEY is present in local MetaStorage copy. The MASTER_KEY revision is unknown but is less than or
+ // equal to APPLIED_REV. Obviously, there have been no updates from the future yet. It's also guaranteed
+ // that the next received configuration update will have the MASTER_KEY revision strictly greater than
+ // current APPLIED_REV. This allows to conclude that "MASTER_KEY revision <= curChangeId" is a valid
+ // condition for update.
+ // Joing all of the above, it's concluded that the following condition must be used:
+ Condition condition = curChangeId == 0L
+ ? Conditions.notExists(MASTER_KEY)
+ : Conditions.revision(MASTER_KEY).le(curChangeId);
+
+ return metaStorageMgr.invoke(condition, operations, Set.of(Operations.noop()));
}
/** {@inheritDoc} */
@@ -190,43 +210,38 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
// TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
metaStorageMgr.registerWatchByPrefix(DST_KEYS_START_RANGE, new WatchListener() {
@Override public boolean onUpdate(@NotNull WatchEvent events) {
- HashMap<String, Serializable> data = new HashMap<>();
-
- long maxRevision = 0L;
+ Map<String, Serializable> data = new HashMap<>();
- Entry entryForMasterKey = null;
+ Entry masterKeyEntry = null;
for (EntryEvent event : events.entryEvents()) {
Entry e = event.newEntry();
if (e.key().equals(MASTER_KEY))
- entryForMasterKey = e;
+ masterKeyEntry = e;
else {
- String key = e.key().toString().substring((DISTRIBUTED_PREFIX).length());
+ String key = e.key().toString().substring(DISTRIBUTED_PREFIX.length());
Serializable value = e.value() == null ?
null :
(Serializable)ByteUtils.fromBytes(e.value());
data.put(key, value);
-
- if (maxRevision < e.revision())
- maxRevision = e.revision();
}
}
// Contract of meta storage ensures that all updates of one revision will come in one batch.
// Also masterKey should be updated every time when we update cfg.
// That means that masterKey update must be included in the batch.
- assert entryForMasterKey != null;
+ assert masterKeyEntry != null;
- assert maxRevision == entryForMasterKey.revision();
+ long newChangeId = masterKeyEntry.revision();
- assert maxRevision >= ver.get();
+ assert newChangeId > changeId.get();
- long finalMaxRevision = maxRevision;
+ changeId.set(newChangeId);
- DistributedConfigurationStorage.this.lsnr.onEntriesChanged(new Data(data, finalMaxRevision));
+ lsnr.onEntriesChanged(new Data(data, newChangeId)).join();
return true;
}
@@ -242,17 +257,6 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
}
/** {@inheritDoc} */
- @Override public synchronized void notifyApplied(long storageRevision) {
- assert ver.get() <= storageRevision;
-
- ver.set(storageRevision);
-
- // TODO: Also we should persist version,
- // TODO: this should be done when nodes restart is introduced.
- // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
- }
-
- /** {@inheritDoc} */
@Override public ConfigurationType type() {
return ConfigurationType.DISTRIBUTED;
}
@@ -263,13 +267,11 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
* successful vault update.
* <p>
* This is possible to distinguish cfg keys from meta storage because we add a special prefix {@link
- * MetaStorageManager#DISTRIBUTED_PREFIX} to all configuration keys that we put to the meta storage.
+ * DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all configuration keys that we put to the meta storage.
*
* @return Iterator built upon all distributed configuration entries stored in vault.
*/
private @NotNull Cursor<VaultEntry> storedDistributedConfigKeys() {
- // TODO: rangeWithAppliedRevision could throw OperationTimeoutException and CompactedException and we should
- // TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
return vaultMgr.range(DST_KEYS_START_RANGE, DST_KEYS_END_RANGE);
}
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
index ca41af2..103fcf2 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
@@ -111,11 +111,9 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
Data entries = new Data(newValues, ver.incrementAndGet());
- return vaultMgr.putAll(data).thenApply(res -> {
- lsnr.onEntriesChanged(entries);
-
- return true;
- });
+ return vaultMgr.putAll(data)
+ .thenCompose(v -> lsnr.onEntriesChanged(entries))
+ .thenApply(v -> true);
}
/** {@inheritDoc} */
@@ -127,13 +125,6 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
}
/** {@inheritDoc} */
- @Override public void notifyApplied(long storageRevision) {
- // No-op.
- // TODO: implement this method when restart mechanism will be introduced
- // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
- }
-
- /** {@inheritDoc} */
@Override public ConfigurationType type() {
return ConfigurationType.LOCAL;
}