You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/12 16:21:03 UTC
[ignite-3] branch main updated: IGNITE-14666 Added proper listener
handling in DistributedConfigurationStorage and LocalConfigurationStorage.
Fixes #114
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 33d0385 IGNITE-14666 Added proper listener handling in DistributedConfigurationStorage and LocalConfigurationStorage. Fixes #114
33d0385 is described below
commit 33d03850962ee0172e38f7a7e9d11007035cc960
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Wed May 12 19:20:28 2021 +0300
IGNITE-14666 Added proper listener handling in DistributedConfigurationStorage and LocalConfigurationStorage. Fixes #114
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../storage/TestConfigurationStorage.java | 4 +-
.../ignite/configuration/ConfigurationChanger.java | 6 +-
.../storage/ConfigurationStorage.java | 16 +-
.../apache/ignite/configuration/storage/Data.java | 24 +--
.../metastorage/common/KeyValueStorageImpl.java | 18 +-
.../json/TestConfigurationStorage.java | 4 +-
.../ignite/internal/runner/app/IgnitionTest.java | 4 +-
.../storage/DistributedConfigurationStorage.java | 205 ++++++++++++++++++---
.../storage/LocalConfigurationStorage.java | 79 +++++---
.../apache/ignite/internal/vault/VaultManager.java | 34 +++-
.../internal/vault/impl/VaultServiceImpl.java | 16 +-
.../internal/vault/service/VaultService.java | 21 ++-
12 files changed, 328 insertions(+), 103 deletions(-)
diff --git a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java
index 82d47f6..5d66b64 100644
--- a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java
+++ b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java
@@ -54,7 +54,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
if (fail)
throw new StorageException("Failed to read data");
- return new Data(new HashMap<>(map), version.get(), 0);
+ return new Data(new HashMap<>(map), version.get());
}
/** {@inheritDoc} */
@@ -74,7 +74,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
version.incrementAndGet();
- listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get(), 0)));
+ listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get())));
return CompletableFuture.completedFuture(true);
}
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
index 5f2926b..d7e09ff 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
@@ -189,7 +189,7 @@ public final class ConfigurationChanger {
superRoot.addRoot(rootKey, rootNode);
}
- StorageRoots storageRoots = new StorageRoots(superRoot, data.cfgVersion());
+ StorageRoots storageRoots = new StorageRoots(superRoot, data.changeId());
storagesRootsMap.put(configurationStorage.type(), storageRoots);
@@ -422,13 +422,13 @@ public final class ConfigurationChanger {
fillFromPrefixMap(newSuperRoot, dataValuesPrefixMap);
- StorageRoots newStorageRoots = new StorageRoots(newSuperRoot, changedEntries.cfgVersion());
+ StorageRoots newStorageRoots = new StorageRoots(newSuperRoot, changedEntries.changeId());
storagesRootsMap.put(storageType, newStorageRoots);
ConfigurationStorage storage = storageInstances.get(storageType);
- long storageRevision = changedEntries.storageRevision();
+ long storageRevision = changedEntries.changeId();
// This will also be updated during the metastorage integration.
notificator.notify(
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
index 78db7a1..f0bbe00 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
@@ -34,23 +34,27 @@ public interface ConfigurationStorage {
/**
* Write key-value pairs into the storage with last known version.
* @param newValues Key-value pairs.
- * @param version Last known version.
+ * @param ver Last known version.
* @return Future that gives you {@code true} if successfully written, {@code false} if version of the storage is
* different from the passed argument and {@link StorageException} if failed to write data.
*/
- CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version);
+ CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long ver);
/**
* Add listener to the storage that notifies of data changes.
- * @param listener Listener.
+ * @param lsnr Listener.
*/
- void addListener(ConfigurationStorageListener listener);
+ // TODO: seems that it's not needed to have an ability to set several listeners to storage, as far as only one is responsible
+ // TODO: for updating configuration and others are not needed. https://issues.apache.org/jira/browse/IGNITE-14689
+ void addListener(ConfigurationStorageListener lsnr);
/**
* Remove storage listener.
- * @param listener Listener.
+ * @param lsnr Listener.
*/
- void removeListener(ConfigurationStorageListener listener);
+ // TODO: seems that it's not needed to have an ability to set several listeners to storage, as far as only one is responsible
+ // TODO: for updating configuration and others are not needed. https://issues.apache.org/jira/browse/IGNITE-14689
+ void removeListener(ConfigurationStorageListener lsnr);
/**
* Notify storage that this specific revision was successfully handled and it is not necessary to repeat the same
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java
index a12a295..22e409a 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java
@@ -27,21 +27,16 @@ public class Data {
private final Map<String, Serializable> values;
/** Configuration storage version. */
- private final long cfgVersion;
-
- /** */
- private final long storageRevision;
+ private final long changeId;
/**
* Constructor.
* @param values Values.
- * @param cfgVersion Version.
- * @param storageRevision Storage revision.
+ * @param changeId Version.
*/
- public Data(Map<String, Serializable> values, long cfgVersion, long storageRevision) {
+ public Data(Map<String, Serializable> values, long changeId) {
this.values = values;
- this.cfgVersion = cfgVersion;
- this.storageRevision = storageRevision;
+ this.changeId = changeId;
}
/**
@@ -56,14 +51,7 @@ public class Data {
* Get version.
* @return version.
*/
- public long cfgVersion() {
- return cfgVersion;
- }
-
- /**
- * @return Storage revision.
- */
- public long storageRevision() {
- return storageRevision;
+ public long changeId() {
+ return changeId;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
index 8708731..cb59227 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
@@ -111,7 +111,23 @@ public class KeyValueStorageImpl implements KeyValueStorage {
/** {@inheritDoc} */
@Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- return null;
+ return new Cursor<Entry>() {
+ @NotNull @Override public Iterator<Entry> iterator() {
+ return new Iterator<Entry>() {
+ @Override public boolean hasNext() {
+ return false;
+ }
+
+ @Override public Entry next() {
+ return null;
+ }
+ };
+ }
+
+ @Override public void close() throws Exception {
+
+ }
+ };
}
/** {@inheritDoc} */
diff --git a/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java b/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java
index f525b1a..628882a 100644
--- a/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java
+++ b/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java
@@ -36,13 +36,13 @@ class TestConfigurationStorage implements ConfigurationStorage {
/** {@inheritDoc} */
@Override public Data readAll() throws StorageException {
- return new Data(Collections.emptyMap(), 0, 0);
+ return new Data(Collections.emptyMap(), 0);
}
/** {@inheritDoc} */
@Override public CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
for (ConfigurationStorageListener listener : listeners)
- listener.onEntriesChanged(new Data(newValues, version + 1, 0));
+ listener.onEntriesChanged(new Data(newValues, version + 1));
return CompletableFuture.completedFuture(true);
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
index 6e7657a..f914c51 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.IgnitionManager;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -84,7 +85,8 @@ class IgnitionTest {
* Check that Ignition.start() with bootstrap configuration returns Ignite instance.
*/
@Test
- void testNodeStartWithoutBootstrapConfiguartion() {
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-14709")
+ void testNodeStartWithoutBootstrapConfiguration() {
Ignite ignite = IgnitionManager.start(null);
Assertions.assertNotNull(ignite);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
index 9d53731..bd76c33 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
@@ -18,12 +18,15 @@
package org.apache.ignite.internal.storage;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.configuration.storage.ConfigurationStorage;
import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
@@ -31,15 +34,53 @@ import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.configuration.storage.Data;
import org.apache.ignite.configuration.storage.StorageException;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
/**
* Distributed configuration storage.
*/
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+ /** Prefix that we add to configuration keys to distinguish them in metastorage. Must end with dot. */
+ private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
+
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(DistributedConfigurationStorage.class);
+
+ /**
+ * Key for CAS-ing configuration keys to metastorage. This key is expected to be the first key in lexicographical
+ * order of distributed configuration keys.
+ */
+ private static final Key MASTER_KEY = new Key(DISTRIBUTED_PREFIX);
+
+ /**
+ * This key is expected to be the last key in lexicographical order of distributed configuration keys. It is
+ * possible because keys are in lexicographical order in metastorage and adding {@code (char)('.' + 1)} to the end
+ * will produce all keys with prefix {@link DistributedConfigurationStorage#DISTRIBUTED_PREFIX}
+ */
+ private static final Key DST_KEYS_END_RANGE = new Key(DISTRIBUTED_PREFIX.substring(0, DISTRIBUTED_PREFIX.length() - 1) + (char)('.' + 1));
+
+ /** Id of watch that is responsible for configuration update. */
+ private CompletableFuture<Long> watchId;
+
/** MetaStorage manager */
private final MetaStorageManager metaStorageMgr;
+ /** Change listeners. */
+ private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+
+ /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+ private AtomicLong ver = new AtomicLong(0L);
+
/**
* Constructor.
*
@@ -49,56 +90,172 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager;
this.metaStorageMgr = metaStorageMgr;
}
- /** Map to store values. */
- private Map<String, Serializable> map = new ConcurrentHashMap<>();
+ /** {@inheritDoc} */
+ @Override public synchronized Data readAll() throws StorageException {
+ HashMap<String, Serializable> data = new HashMap<>();
+
+ Iterator<Entry> entries = allDistributedConfigKeys().iterator();
- /** Change listeners. */
- private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+ long maxRevision = 0L;
- /** Storage version. */
- private AtomicLong version = new AtomicLong(0);
+ if (!entries.hasNext())
+ return new Data(data, ver.get());
- /** {@inheritDoc} */
- @Override public synchronized Data readAll() throws StorageException {
- return new Data(new HashMap<>(map), version.get(), 0);
+ Entry entryForMasterKey = entries.next();
+
+ // First key must be the masterKey because it's supposed to be the first in lexicographical order
+ assert entryForMasterKey.key().equals(MASTER_KEY);
+
+ while (entries.hasNext()) {
+ Entry entry = entries.next();
+
+ data.put(entry.key().toString().substring((DISTRIBUTED_PREFIX).length()), (Serializable)ByteUtils.fromBytes(entry.value()));
+
+ // Move to stream
+ if (maxRevision < entry.revision())
+ maxRevision = entry.revision();
+
+ }
+
+ if (!data.isEmpty()) {
+ assert maxRevision == entryForMasterKey.revision();
+
+ assert maxRevision >= ver.get();
+
+ return new Data(data, maxRevision);
+ }
+
+ return new Data(data, ver.get());
}
/** {@inheritDoc} */
- @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
- if (version != this.version.get())
+ @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+ assert sentVersion <= ver.get();
+
+ if (sentVersion != ver.get())
+ // This means that sentVersion is less than version and other node has already updated configuration and
+ // write should be retried. Actual version will be set when watch and corresponding configuration listener
+ // updates configuration and notifyApplied is triggered afterwards.
return CompletableFuture.completedFuture(false);
+ HashSet<Operation> operations = new HashSet<>();
+
for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+ Key key = new Key(DISTRIBUTED_PREFIX + entry.getKey());
+
if (entry.getValue() != null)
- map.put(entry.getKey(), entry.getValue());
+ // TODO: investigate overhead when serialize int, long, double, boolean, string, arrays of above
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14698
+ operations.add(Operations.put(key, ByteUtils.toBytes(entry.getValue())));
else
- map.remove(entry.getKey());
+ operations.add(Operations.remove(key));
}
- this.version.incrementAndGet();
+ operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(sentVersion)));
- listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
-
- return CompletableFuture.completedFuture(true);
+ return metaStorageMgr.invoke(
+ Conditions.key(MASTER_KEY).revision().eq(ver.get()),
+ operations,
+ Collections.singleton(Operations.noop()));
}
/** {@inheritDoc} */
- @Override public void addListener(ConfigurationStorageListener listener) {
+ @Override public synchronized void addListener(ConfigurationStorageListener listener) {
listeners.add(listener);
+
+ if (watchId == null) {
+ // TODO: registerWatchByPrefix could throw OperationTimeoutException and CompactedException and we should
+ // TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
+ watchId = metaStorageMgr.registerWatchByPrefix(MASTER_KEY, new WatchListener() {
+ @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+ HashMap<String, Serializable> data = new HashMap<>();
+
+ long maxRevision = 0L;
+
+ Entry entryForMasterKey = null;
+
+ for (WatchEvent event : events) {
+ Entry e = event.newEntry();
+
+ if (!e.key().equals(MASTER_KEY)) {
+ data.put(e.key().toString().substring((DISTRIBUTED_PREFIX).length()),
+ (Serializable)ByteUtils.fromBytes(e.value()));
+
+ if (maxRevision < e.revision())
+ maxRevision = e.revision();
+ } else
+ entryForMasterKey = e;
+ }
+
+ // Contract of metastorage ensures that all updates of one revision will come in one batch.
+ // Also masterKey should be updated every time when we update cfg.
+ // That means that masterKey update must be included in the batch.
+ assert entryForMasterKey != null;
+
+ assert maxRevision == entryForMasterKey.revision();
+
+ assert maxRevision >= ver.get();
+
+ long finalMaxRevision = maxRevision;
+
+ listeners.forEach(listener -> listener.onEntriesChanged(new Data(data, finalMaxRevision)));
+
+ return true;
+ }
+
+ @Override public void onError(@NotNull Throwable e) {
+ // TODO: need to handle this case and there should some mechanism for registering new watch as far as
+ // TODO: onError unregisters failed watch https://issues.apache.org/jira/browse/IGNITE-14604
+ LOG.error("Metastorage listener issue", e);
+ }
+ });
+ }
}
/** {@inheritDoc} */
- @Override public void removeListener(ConfigurationStorageListener listener) {
+ @Override public synchronized void removeListener(ConfigurationStorageListener listener) {
listeners.remove(listener);
+
+ if (listeners.isEmpty()) {
+ try {
+ metaStorageMgr.unregisterWatch(watchId.get());
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to unregister watch in meta storage.", e);
+ }
+
+ watchId = null;
+ }
}
/** {@inheritDoc} */
- @Override public void notifyApplied(long storageRevision) {
- // No-op.
+ @Override public synchronized void notifyApplied(long storageRevision) {
+ assert ver.get() <= storageRevision;
+
+ ver.set(storageRevision);
+
+ // TODO: Also we should persist version,
+ // TODO: this should be done when nodes restart is introduced.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
}
/** {@inheritDoc} */
@Override public ConfigurationType type() {
return ConfigurationType.DISTRIBUTED;
}
+
+ /**
+ * Method that returns all distributed configuration keys from metastorage filtered out by the current applied
+ * revision as an upper bound. Applied revision is a revision of the last successful vault update.
+ * <p>
+ * This is possible to distinguish cfg keys from metastorage because we add special prefix {@link
+ * DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all configuration keys that we put to metastorage.
+ *
+ * @return Cursor built upon all distributed configuration entries.
+ */
+ private Cursor<Entry> allDistributedConfigKeys() {
+ // TODO: rangeWithAppliedRevision could throw OperationTimeoutException and CompactedException and we should
+ // TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
+ return metaStorageMgr.rangeWithAppliedRevision(MASTER_KEY, DST_KEYS_END_RANGE);
+ }
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
index eb99781..eb5822c 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.storage;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.configuration.storage.ConfigurationStorage;
@@ -30,16 +30,33 @@ import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.configuration.storage.Data;
import org.apache.ignite.configuration.storage.StorageException;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.lang.ByteArray;
/**
* Local configuration storage.
*/
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class LocalConfigurationStorage implements ConfigurationStorage {
+public class LocalConfigurationStorage implements ConfigurationStorage {
+ /** Prefix that we add to configuration keys to distinguish them in metastorage. */
+ private static final String LOC_PREFIX = "loc-cfg.";
+
/** Vault manager. */
private final VaultManager vaultMgr;
+ /** Change listeners. */
+ private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+
+ /** Storage version. */
+ private AtomicLong ver = new AtomicLong(0);
+
+ /** Start key in range for searching local configuration keys. */
+ private static final ByteArray LOC_KEYS_START_RANGE = ByteArray.fromString(LOC_PREFIX);
+
+ /** End key in range for searching local configuration keys. */
+ private static final ByteArray LOC_KEYS_END_RANGE = ByteArray.fromString(LOC_PREFIX.substring(0, LOC_PREFIX.length() - 1) + (char)('.' + 1));
+
/**
* Constructor.
*
@@ -49,52 +66,62 @@ import org.apache.ignite.internal.vault.VaultManager;
this.vaultMgr = vaultMgr;
}
- /** Map to store values. */
- private Map<String, Serializable> map = new ConcurrentHashMap<>();
+ /** {@inheritDoc} */
+ @Override public synchronized Data readAll() throws StorageException {
+ Iterator<Entry> iter =
+ vaultMgr.range(LOC_KEYS_START_RANGE, LOC_KEYS_END_RANGE);
- /** Change listeners. */
- private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+ HashMap<String, Serializable> data = new HashMap<>();
- /** Storage version. */
- private AtomicLong version = new AtomicLong(0);
+ while (iter.hasNext()) {
+ Entry val = iter.next();
- /** {@inheritDoc} */
- @Override public synchronized Data readAll() throws StorageException {
- return new Data(new HashMap<>(map), version.get(), 0);
+ data.put(val.key().toString().substring(LOC_KEYS_START_RANGE.toString().length()),
+ (Serializable)ByteUtils.fromBytes(val.value()));
+ }
+
+ // TODO: Need to restore version from pds when restart will be developed
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
+ return new Data(data, ver.get());
}
/** {@inheritDoc} */
- @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
- if (version != this.version.get())
+ @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+ if (sentVersion != ver.get())
return CompletableFuture.completedFuture(false);
- for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
- if (entry.getValue() != null)
- map.put(entry.getKey(), entry.getValue());
- else
- map.remove(entry.getKey());
+ Map<ByteArray, byte[]> data = new HashMap<>();
+
+ for (Map.Entry<String, Serializable> e: newValues.entrySet()) {
+ ByteArray key = ByteArray.fromString(LOC_PREFIX + e.getKey());
+
+ data.put(key, e.getValue() == null ? null : ByteUtils.toBytes(e.getValue()));
}
- this.version.incrementAndGet();
+ Data entries = new Data(newValues, ver.incrementAndGet());
- listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
+ return vaultMgr.putAll(data).thenApply(res -> {
+ listeners.forEach(listener -> listener.onEntriesChanged(entries));
- return CompletableFuture.completedFuture(true);
+ return true;
+ });
}
/** {@inheritDoc} */
- @Override public void addListener(ConfigurationStorageListener listener) {
- listeners.add(listener);
+ @Override public synchronized void addListener(ConfigurationStorageListener lsnr) {
+ listeners.add(lsnr);
}
/** {@inheritDoc} */
- @Override public void removeListener(ConfigurationStorageListener listener) {
- listeners.remove(listener);
+ @Override public synchronized void removeListener(ConfigurationStorageListener lsnr) {
+ listeners.remove(lsnr);
}
/** {@inheritDoc} */
@Override public void notifyApplied(long storageRevision) {
// No-op.
+ // TODO: implement this method when restart mechanism will be introduced
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
}
/** {@inheritDoc} */
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index 92264c1..cbbc6f5 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -66,7 +66,8 @@ public class VaultManager {
* See {@link VaultService#get(ByteArray)}
*
* @param key Key. Couldn't be {@code null}.
- * @return An entry for the given key. Couldn't be {@code null}.
+ * @return An entry for the given key. Couldn't be {@code null}. If there is no mapping for the provided {@code key},
+ * then {@code Entry} with value that equals to {@code null} will be returned.
*/
@NotNull public CompletableFuture<Entry> get(@NotNull ByteArray key) {
return vaultService.get(key);
@@ -76,8 +77,8 @@ public class VaultManager {
* See {@link VaultService#put(ByteArray, byte[])}
*
* @param key Vault key. Couldn't be {@code null}.
- * @param val Value. Couldn't be {@code null}.
- * @return Future representing pending completion of the operation.
+ * @param val Value. If value is equal to {@code null}, then previous value with key will be deleted if there was any mapping.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
*/
@NotNull public CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val) {
return vaultService.put(key, val);
@@ -87,7 +88,7 @@ public class VaultManager {
* See {@link VaultService#remove(ByteArray)}
*
* @param key Vault key. Couldn't be {@code null}.
- * @return Future representing pending completion of the operation.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
*/
@NotNull public CompletableFuture<Void> remove(@NotNull ByteArray key) {
return vaultService.remove(key);
@@ -105,13 +106,27 @@ public class VaultManager {
}
/**
- * Inserts or updates entries with given keys and given values and non-negative revision.
+ * Inserts or updates entries with given keys and given values. If the given value in {@code vals} is {@code null},
+ * then corresponding value with key will be deleted if there was any mapping.
+ *
+ * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
+ */
+ @NotNull public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
+ synchronized (mux) {
+ return vaultService.putAll(vals);
+ }
+ }
+
+ /**
+ * Inserts or updates entries with given keys and given values and non-negative revision. If the given value in
+ * {@code vals} is {@code null}, then corresponding value with key will be deleted if there was any mapping.
*
* @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
* @param revision Revision for entries. Must be positive.
- * @return Future representing pending completion of the operation.
- * @throws IgniteInternalCheckedException If revision is inconsistent with applied revision from vault or
- * if couldn't get applied revision from vault.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
+ * @throws IgniteInternalCheckedException If revision is inconsistent with applied revision from vault or if
+ * couldn't get applied revision from vault.
*/
public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals, long revision) throws IgniteInternalCheckedException {
synchronized (mux) {
@@ -135,11 +150,10 @@ public class VaultManager {
return vaultService.putAll(mergedMap);
}
-
}
/**
- * @return Applied revision for {@link VaultManager#putAll} operation.
+ * @return Applied revision for {@link VaultManager#putAll(Map, long)} operation.
* @throws IgniteInternalCheckedException If couldn't get applied revision from vault.
*/
@NotNull public Long appliedRevision() throws IgniteInternalCheckedException {
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
index f9b6056..3814d91 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.vault.common.Entry;
import org.apache.ignite.internal.vault.common.VaultWatch;
import org.apache.ignite.internal.vault.common.WatcherImpl;
@@ -42,6 +43,9 @@ public class VaultServiceImpl implements VaultService {
private final WatcherImpl watcher;
+ /**
+ * Default constructor.
+ */
public VaultServiceImpl() {
this.watcher = new WatcherImpl();
this.storage = new TreeMap<>();
@@ -70,6 +74,8 @@ public class VaultServiceImpl implements VaultService {
synchronized (mux) {
storage.remove(key);
+ watcher.notify(new Entry(key, null));
+
return CompletableFuture.allOf();
}
}
@@ -106,7 +112,15 @@ public class VaultServiceImpl implements VaultService {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
synchronized (mux) {
- storage.putAll(vals);
+ vals.forEach((k, v) -> {
+ if (v == null)
+ storage.remove(k);
+ });
+
+ storage.putAll(vals.entrySet()
+ .stream()
+ .filter(e -> e.getValue() != null)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
return CompletableFuture.allOf();
}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
index 247cf4f..45bc884 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
@@ -34,24 +34,26 @@ public interface VaultService {
* Retrieves an entry for the given key.
*
* @param key Key. Couldn't be {@code null}.
- * @return An entry for the given key. Couldn't be {@code null}.
+ * @return An entry for the given key. Couldn't be {@code null}. If there is no mapping for the provided {@code key},
+ * then {@code Entry} with value that equals to null will be returned.
*/
@NotNull CompletableFuture<Entry> get(@NotNull ByteArray key);
/**
- * Write value with key to vault.
+ * Write value with key to vault. If value is equal to null, then previous value with key will be deleted if there
+ * was any mapping.
*
* @param key Vault key. Couldn't be {@code null}.
- * @param val Value. Couldn't be {@code null}.
- * @return Future representing pending completion of the operation.
+ * @param val Value. If value is equal to null, then previous value with key will be deleted if there was any mapping.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
*/
- @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val);
+ @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val);
/**
* Remove value with key from vault.
*
* @param key Vault key. Couldn't be {@code null}.
- * @return Future representing pending completion of the operation.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
*/
@NotNull CompletableFuture<Void> remove(@NotNull ByteArray key);
@@ -76,15 +78,16 @@ public interface VaultService {
* Cancels subscription for the given identifier.
*
* @param id Subscription identifier.
- * @return Completed future in case of operation success. Couldn't be {@code null}.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
*/
@NotNull CompletableFuture<Void> stopWatch(@NotNull Long id);
/**
- * Inserts or updates entries with given keys and given values.
+ * Inserts or updates entries with given keys and given values. If the given value in {@code vals} is null,
+ * then corresponding value with key will be deleted if there was any mapping.
*
* @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
- * @return Completed future.
+ * @return Future representing pending completion of the operation. Couldn't be {@code null}.
*/
@NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals);
}