You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/12 16:21:03 UTC

[ignite-3] branch main updated: IGNITE-14666 Added proper listener handling in DistributedConfigurationStorage and LocalConfigurationStorage. Fixes #114

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 33d0385  IGNITE-14666 Added proper listener handling in DistributedConfigurationStorage and LocalConfigurationStorage. Fixes #114
33d0385 is described below

commit 33d03850962ee0172e38f7a7e9d11007035cc960
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Wed May 12 19:20:28 2021 +0300

    IGNITE-14666 Added proper listener handling in DistributedConfigurationStorage and LocalConfigurationStorage. Fixes #114
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../storage/TestConfigurationStorage.java          |   4 +-
 .../ignite/configuration/ConfigurationChanger.java |   6 +-
 .../storage/ConfigurationStorage.java              |  16 +-
 .../apache/ignite/configuration/storage/Data.java  |  24 +--
 .../metastorage/common/KeyValueStorageImpl.java    |  18 +-
 .../json/TestConfigurationStorage.java             |   4 +-
 .../ignite/internal/runner/app/IgnitionTest.java   |   4 +-
 .../storage/DistributedConfigurationStorage.java   | 205 ++++++++++++++++++---
 .../storage/LocalConfigurationStorage.java         |  79 +++++---
 .../apache/ignite/internal/vault/VaultManager.java |  34 +++-
 .../internal/vault/impl/VaultServiceImpl.java      |  16 +-
 .../internal/vault/service/VaultService.java       |  21 ++-
 12 files changed, 328 insertions(+), 103 deletions(-)

diff --git a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java
index 82d47f6..5d66b64 100644
--- a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java
+++ b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/storage/TestConfigurationStorage.java
@@ -54,7 +54,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
         if (fail)
             throw new StorageException("Failed to read data");
 
-        return new Data(new HashMap<>(map), version.get(), 0);
+        return new Data(new HashMap<>(map), version.get());
     }
 
     /** {@inheritDoc} */
@@ -74,7 +74,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
 
         version.incrementAndGet();
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get(), 0)));
+        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get())));
 
         return CompletableFuture.completedFuture(true);
     }
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
index 5f2926b..d7e09ff 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
@@ -189,7 +189,7 @@ public final class ConfigurationChanger {
             superRoot.addRoot(rootKey, rootNode);
         }
 
-        StorageRoots storageRoots = new StorageRoots(superRoot, data.cfgVersion());
+        StorageRoots storageRoots = new StorageRoots(superRoot, data.changeId());
 
         storagesRootsMap.put(configurationStorage.type(), storageRoots);
 
@@ -422,13 +422,13 @@ public final class ConfigurationChanger {
 
         fillFromPrefixMap(newSuperRoot, dataValuesPrefixMap);
 
-        StorageRoots newStorageRoots = new StorageRoots(newSuperRoot, changedEntries.cfgVersion());
+        StorageRoots newStorageRoots = new StorageRoots(newSuperRoot, changedEntries.changeId());
 
         storagesRootsMap.put(storageType, newStorageRoots);
 
         ConfigurationStorage storage = storageInstances.get(storageType);
 
-        long storageRevision = changedEntries.storageRevision();
+        long storageRevision = changedEntries.changeId();
 
         // This will also be updated during the metastorage integration.
         notificator.notify(
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
index 78db7a1..f0bbe00 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
@@ -34,23 +34,27 @@ public interface ConfigurationStorage {
     /**
      * Write key-value pairs into the storage with last known version.
      * @param newValues Key-value pairs.
-     * @param version Last known version.
+     * @param ver Last known version.
      * @return Future that gives you {@code true} if successfully written, {@code false} if version of the storage is
      *      different from the passed argument and {@link StorageException} if failed to write data.
      */
-    CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version);
+    CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long ver);
 
     /**
      * Add listener to the storage that notifies of data changes.
-     * @param listener Listener.
+     * @param lsnr Listener.
      */
-    void addListener(ConfigurationStorageListener listener);
+    // TODO: seems that it's not needed to have an ability to set several listeners to storage, as far as only one is responsible
+    // TODO: for updating configuration and others are not needed. https://issues.apache.org/jira/browse/IGNITE-14689
+    void addListener(ConfigurationStorageListener lsnr);
 
     /**
      * Remove storage listener.
-     * @param listener Listener.
+     * @param lsnr Listener.
      */
-    void removeListener(ConfigurationStorageListener listener);
+    // TODO: seems that it's not needed to have an ability to set several listeners to storage, as far as only one is responsible
+    // TODO: for updating configuration and others are not needed. https://issues.apache.org/jira/browse/IGNITE-14689
+    void removeListener(ConfigurationStorageListener lsnr);
 
     /**
      * Notify storage that this specific revision was successfully handled and it is not necessary to repeat the same
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java
index a12a295..22e409a 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/Data.java
@@ -27,21 +27,16 @@ public class Data {
     private final Map<String, Serializable> values;
 
     /** Configuration storage version. */
-    private final long cfgVersion;
-
-    /** */
-    private final long storageRevision;
+    private final long changeId;
 
     /**
      * Constructor.
      * @param values Values.
-     * @param cfgVersion Version.
-     * @param storageRevision Storage revision.
+     * @param changeId Version.
      */
-    public Data(Map<String, Serializable> values, long cfgVersion, long storageRevision) {
+    public Data(Map<String, Serializable> values, long changeId) {
         this.values = values;
-        this.cfgVersion = cfgVersion;
-        this.storageRevision = storageRevision;
+        this.changeId = changeId;
     }
 
     /**
@@ -56,14 +51,7 @@ public class Data {
      * Get version.
      * @return version.
      */
-    public long cfgVersion() {
-        return cfgVersion;
-    }
-
-    /**
-     * @return Storage revision.
-     */
-    public long storageRevision() {
-        return storageRevision;
+    public long changeId() {
+        return changeId;
     }
 }
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
index 8708731..cb59227 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
@@ -111,7 +111,23 @@ public class KeyValueStorageImpl implements KeyValueStorage {
 
     /** {@inheritDoc} */
     @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
-        return null;
+        return new Cursor<Entry>() {
+            @NotNull @Override public Iterator<Entry> iterator() {
+                return new Iterator<Entry>() {
+                    @Override public boolean hasNext() {
+                        return false;
+                    }
+
+                    @Override public Entry next() {
+                        return null;
+                    }
+                };
+            }
+
+            @Override public void close() throws Exception {
+
+            }
+        };
     }
 
     /** {@inheritDoc} */
diff --git a/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java b/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java
index f525b1a..628882a 100644
--- a/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java
+++ b/modules/rest/src/test/java/org/apache/ignite/rest/presentation/json/TestConfigurationStorage.java
@@ -36,13 +36,13 @@ class TestConfigurationStorage implements ConfigurationStorage {
 
     /** {@inheritDoc} */
     @Override public Data readAll() throws StorageException {
-        return new Data(Collections.emptyMap(), 0, 0);
+        return new Data(Collections.emptyMap(), 0);
     }
 
     /** {@inheritDoc} */
     @Override public CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
         for (ConfigurationStorageListener listener : listeners)
-            listener.onEntriesChanged(new Data(newValues, version + 1, 0));
+            listener.onEntriesChanged(new Data(newValues, version + 1));
 
         return CompletableFuture.completedFuture(true);
     }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
index 6e7657a..f914c51 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.ignite.app.Ignite;
 import org.apache.ignite.app.IgnitionManager;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -84,7 +85,8 @@ class IgnitionTest {
      * Check that Ignition.start() with bootstrap configuration returns Ignite instance.
      */
     @Test
-    void testNodeStartWithoutBootstrapConfiguartion() {
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-14709")
+    void testNodeStartWithoutBootstrapConfiguration() {
         Ignite ignite = IgnitionManager.start(null);
 
         Assertions.assertNotNull(ignite);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
index 9d53731..bd76c33 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
@@ -18,12 +18,15 @@
 package org.apache.ignite.internal.storage;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
@@ -31,15 +34,53 @@ import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in metastorage. Must end with dot. */
+    private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
+
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(DistributedConfigurationStorage.class);
+
+    /**
+     * Key for CAS-ing configuration keys to metastorage. This key is expected to be the first key in lexicographical
+     * order of distributed configuration keys.
+     */
+    private static final Key MASTER_KEY = new Key(DISTRIBUTED_PREFIX);
+
+    /**
+     * This key is expected to be the last key in lexicographical order of distributed configuration keys. It is
+     * possible because keys are in lexicographical order in metastorage and adding {@code (char)('.' + 1)} to the end
+     * will produce all keys with prefix {@link DistributedConfigurationStorage#DISTRIBUTED_PREFIX}
+     */
+    private static final Key DST_KEYS_END_RANGE = new Key(DISTRIBUTED_PREFIX.substring(0, DISTRIBUTED_PREFIX.length() - 1) + (char)('.' + 1));
+
+    /** Id of watch that is responsible for configuration update. */
+    private CompletableFuture<Long> watchId;
+
     /** MetaStorage manager */
     private final MetaStorageManager metaStorageMgr;
 
+    /** Change listeners. */
+    private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong ver = new AtomicLong(0L);
+
     /**
      * Constructor.
      *
@@ -49,56 +90,172 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager;
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
+    /** {@inheritDoc} */
+    @Override public synchronized Data readAll() throws StorageException {
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        Iterator<Entry> entries = allDistributedConfigKeys().iterator();
 
-    /** Change listeners. */
-    private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+        long maxRevision = 0L;
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+        if (!entries.hasNext())
+            return new Data(data, ver.get());
 
-    /** {@inheritDoc} */
-    @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Entry entryForMasterKey = entries.next();
+
+        // First key must be the masterKey because it's supposed to be the first in lexicographical order
+        assert entryForMasterKey.key().equals(MASTER_KEY);
+
+        while (entries.hasNext()) {
+            Entry entry = entries.next();
+
+            data.put(entry.key().toString().substring((DISTRIBUTED_PREFIX).length()), (Serializable)ByteUtils.fromBytes(entry.value()));
+
+            // Move to stream
+            if (maxRevision < entry.revision())
+                maxRevision = entry.revision();
+
+        }
+
+        if (!data.isEmpty()) {
+            assert maxRevision == entryForMasterKey.revision();
+
+            assert maxRevision >= ver.get();
+
+            return new Data(data, maxRevision);
+        }
+
+        return new Data(data, ver.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        assert sentVersion <= ver.get();
+
+        if (sentVersion != ver.get())
+            // This means that sentVersion is less than version and other node has already updated configuration and
+            // write should be retried. Actual version will be set when watch and corresponding configuration listener
+            // updates configuration and notifyApplied is triggered afterwards.
             return CompletableFuture.completedFuture(false);
 
+        HashSet<Operation> operations = new HashSet<>();
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            Key key = new Key(DISTRIBUTED_PREFIX + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                // TODO: investigate overhead when serialize int, long, double, boolean, string, arrays of above
+                // TODO: https://issues.apache.org/jira/browse/IGNITE-14698
+                operations.add(Operations.put(key, ByteUtils.toBytes(entry.getValue())));
             else
-                map.remove(entry.getKey());
+                operations.add(Operations.remove(key));
         }
 
-        this.version.incrementAndGet();
+        operations.add(Operations.put(MASTER_KEY, ByteUtils.longToBytes(sentVersion)));
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
-
-        return CompletableFuture.completedFuture(true);
+        return metaStorageMgr.invoke(
+            Conditions.key(MASTER_KEY).revision().eq(ver.get()),
+            operations,
+            Collections.singleton(Operations.noop()));
     }
 
     /** {@inheritDoc} */
-    @Override public void addListener(ConfigurationStorageListener listener) {
+    @Override public synchronized void addListener(ConfigurationStorageListener listener) {
         listeners.add(listener);
+
+        if (watchId == null) {
+            // TODO: registerWatchByPrefix could throw OperationTimeoutException and CompactedException and we should
+            // TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
+            watchId = metaStorageMgr.registerWatchByPrefix(MASTER_KEY, new WatchListener() {
+                @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                    HashMap<String, Serializable> data = new HashMap<>();
+
+                    long maxRevision = 0L;
+
+                    Entry entryForMasterKey = null;
+
+                    for (WatchEvent event : events) {
+                        Entry e = event.newEntry();
+
+                        if (!e.key().equals(MASTER_KEY)) {
+                            data.put(e.key().toString().substring((DISTRIBUTED_PREFIX).length()),
+                                (Serializable)ByteUtils.fromBytes(e.value()));
+
+                            if (maxRevision < e.revision())
+                                maxRevision = e.revision();
+                        } else
+                            entryForMasterKey = e;
+                    }
+
+                    // Contract of metastorage ensures that all updates of one revision will come in one batch.
+                    // Also masterKey should be updated every time when we update cfg.
+                    // That means that masterKey update must be included in the batch.
+                    assert entryForMasterKey != null;
+
+                    assert maxRevision == entryForMasterKey.revision();
+
+                    assert maxRevision >= ver.get();
+
+                    long finalMaxRevision = maxRevision;
+
+                    listeners.forEach(listener -> listener.onEntriesChanged(new Data(data, finalMaxRevision)));
+
+                    return true;
+                }
+
+                @Override public void onError(@NotNull Throwable e) {
+                    // TODO: need to handle this case and there should some mechanism for registering new watch as far as
+                    // TODO: onError unregisters failed watch https://issues.apache.org/jira/browse/IGNITE-14604
+                    LOG.error("Metastorage listener issue", e);
+                }
+            });
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void removeListener(ConfigurationStorageListener listener) {
+    @Override public synchronized void removeListener(ConfigurationStorageListener listener) {
         listeners.remove(listener);
+
+        if (listeners.isEmpty()) {
+            try {
+                metaStorageMgr.unregisterWatch(watchId.get());
+            }
+            catch (InterruptedException | ExecutionException e) {
+                LOG.error("Failed to unregister watch in meta storage.", e);
+            }
+
+            watchId = null;
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void notifyApplied(long storageRevision) {
-        // No-op.
+    @Override public synchronized void notifyApplied(long storageRevision) {
+        assert ver.get() <= storageRevision;
+
+        ver.set(storageRevision);
+
+        // TODO: Also we should persist version,
+        // TODO: this should be done when nodes restart is introduced.
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
     }
 
     /** {@inheritDoc} */
     @Override public ConfigurationType type() {
         return ConfigurationType.DISTRIBUTED;
     }
+
+    /**
+     * Method that returns all distributed configuration keys from metastorage filtered out by the current applied
+     * revision as an upper bound. Applied revision is a revision of the last successful vault update.
+     * <p>
+     * This is possible to distinguish cfg keys from metastorage because we add special prefix {@link
+     * DistributedConfigurationStorage#DISTRIBUTED_PREFIX} to all configuration keys that we put to metastorage.
+     *
+     * @return Cursor built upon all distributed configuration entries.
+     */
+    private Cursor<Entry> allDistributedConfigKeys() {
+        // TODO: rangeWithAppliedRevision could throw OperationTimeoutException and CompactedException and we should
+        // TODO: properly handle such cases https://issues.apache.org/jira/browse/IGNITE-14604
+        return metaStorageMgr.rangeWithAppliedRevision(MASTER_KEY, DST_KEYS_END_RANGE);
+    }
 }
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
index eb99781..eb5822c 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
@@ -19,10 +19,10 @@ package org.apache.ignite.internal.storage;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
@@ -30,16 +30,33 @@ import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.lang.ByteArray;
 
 /**
  * Local configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class LocalConfigurationStorage implements ConfigurationStorage {
+public class LocalConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in metastorage. */
+    private static final String LOC_PREFIX = "loc-cfg.";
+
     /** Vault manager. */
     private final VaultManager vaultMgr;
 
+    /** Change listeners. */
+    private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+
+    /** Storage version. */
+    private AtomicLong ver = new AtomicLong(0);
+
+    /** Start key in range for searching local configuration keys. */
+    private static final ByteArray LOC_KEYS_START_RANGE = ByteArray.fromString(LOC_PREFIX);
+
+    /** End key in range for searching local configuration keys. */
+    private static final ByteArray LOC_KEYS_END_RANGE = ByteArray.fromString(LOC_PREFIX.substring(0, LOC_PREFIX.length() - 1) + (char)('.' + 1));
+
     /**
      * Constructor.
      *
@@ -49,52 +66,62 @@ import org.apache.ignite.internal.vault.VaultManager;
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
+    /** {@inheritDoc} */
+    @Override public synchronized Data readAll() throws StorageException {
+        Iterator<Entry> iter =
+            vaultMgr.range(LOC_KEYS_START_RANGE, LOC_KEYS_END_RANGE);
 
-    /** Change listeners. */
-    private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
+        HashMap<String, Serializable> data = new HashMap<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+        while (iter.hasNext()) {
+            Entry val = iter.next();
 
-    /** {@inheritDoc} */
-    @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+            data.put(val.key().toString().substring(LOC_KEYS_START_RANGE.toString().length()),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        // TODO: Need to restore version from pds when restart will be developed
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
+        return new Data(data, ver.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        if (sentVersion != ver.get())
             return CompletableFuture.completedFuture(false);
 
-        for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
-            if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
-            else
-                map.remove(entry.getKey());
+        Map<ByteArray, byte[]> data = new HashMap<>();
+
+        for (Map.Entry<String, Serializable> e: newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOC_PREFIX + e.getKey());
+
+            data.put(key, e.getValue() == null ? null : ByteUtils.toBytes(e.getValue()));
         }
 
-        this.version.incrementAndGet();
+        Data entries = new Data(newValues, ver.incrementAndGet());
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
+        return vaultMgr.putAll(data).thenApply(res -> {
+            listeners.forEach(listener -> listener.onEntriesChanged(entries));
 
-        return CompletableFuture.completedFuture(true);
+            return true;
+        });
     }
 
     /** {@inheritDoc} */
-    @Override public void addListener(ConfigurationStorageListener listener) {
-        listeners.add(listener);
+    @Override public synchronized void addListener(ConfigurationStorageListener lsnr) {
+        listeners.add(lsnr);
     }
 
     /** {@inheritDoc} */
-    @Override public void removeListener(ConfigurationStorageListener listener) {
-        listeners.remove(listener);
+    @Override public synchronized void removeListener(ConfigurationStorageListener lsnr) {
+        listeners.remove(lsnr);
     }
 
     /** {@inheritDoc} */
     @Override public void notifyApplied(long storageRevision) {
         // No-op.
+        // TODO: implement this method when restart mechanism will be introduced
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-14697
     }
 
     /** {@inheritDoc} */
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index 92264c1..cbbc6f5 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -66,7 +66,8 @@ public class VaultManager {
      * See {@link VaultService#get(ByteArray)}
      *
      * @param key Key. Couldn't be {@code null}.
-     * @return An entry for the given key. Couldn't be {@code null}.
+     * @return An entry for the given key. Couldn't be {@code null}. If there is no mapping for the provided {@code key},
+     * then {@code Entry} with value that equals to {@code null} will be returned.
      */
     @NotNull public CompletableFuture<Entry> get(@NotNull ByteArray key) {
         return vaultService.get(key);
@@ -76,8 +77,8 @@ public class VaultManager {
      * See {@link VaultService#put(ByteArray, byte[])}
      *
      * @param key Vault key. Couldn't be {@code null}.
-     * @param val Value. Couldn't be {@code null}.
-     * @return Future representing pending completion of the operation.
+     * @param val Value. If value is equal to {@code null}, then previous value with key will be deleted if there was any mapping.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
      */
     @NotNull public CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val) {
         return vaultService.put(key, val);
@@ -87,7 +88,7 @@ public class VaultManager {
      * See {@link VaultService#remove(ByteArray)}
      *
      * @param key Vault key. Couldn't be {@code null}.
-     * @return Future representing pending completion of the operation.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
      */
     @NotNull public CompletableFuture<Void> remove(@NotNull ByteArray key) {
         return vaultService.remove(key);
@@ -105,13 +106,27 @@ public class VaultManager {
     }
 
     /**
-     * Inserts or updates entries with given keys and given values and non-negative revision.
+     * Inserts or updates entries with given keys and given values. If the given value in {@code vals} is {@code null},
+     * then corresponding value with key will be deleted if there was any mapping.
+     *
+     * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
+     */
+    @NotNull public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
+        synchronized (mux) {
+            return vaultService.putAll(vals);
+        }
+    }
+
+    /**
+     * Inserts or updates entries with given keys and given values and non-negative revision. If the given value in
+     * {@code vals} is {@code null}, then corresponding value with key will be deleted if there was any mapping.
      *
      * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
      * @param revision Revision for entries. Must be positive.
-     * @return Future representing pending completion of the operation.
-     * @throws IgniteInternalCheckedException If revision is inconsistent with applied revision from vault or
-     * if couldn't get applied revision from vault.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
+     * @throws IgniteInternalCheckedException If revision is inconsistent with applied revision from vault or if
+     * couldn't get applied revision from vault.
      */
     public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals, long revision) throws IgniteInternalCheckedException {
         synchronized (mux) {
@@ -135,11 +150,10 @@ public class VaultManager {
 
             return vaultService.putAll(mergedMap);
         }
-
     }
 
     /**
-     * @return Applied revision for {@link VaultManager#putAll} operation.
+     * @return Applied revision for {@link VaultManager#putAll(Map, long)} operation.
      * @throws IgniteInternalCheckedException If couldn't get applied revision from vault.
      */
     @NotNull public Long appliedRevision() throws IgniteInternalCheckedException {
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
index f9b6056..3814d91 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.vault.common.Entry;
 import org.apache.ignite.internal.vault.common.VaultWatch;
 import org.apache.ignite.internal.vault.common.WatcherImpl;
@@ -42,6 +43,9 @@ public class VaultServiceImpl implements VaultService {
 
     private final WatcherImpl watcher;
 
+    /**
+     * Default constructor.
+     */
     public VaultServiceImpl() {
         this.watcher = new WatcherImpl();
         this.storage = new TreeMap<>();
@@ -70,6 +74,8 @@ public class VaultServiceImpl implements VaultService {
         synchronized (mux) {
             storage.remove(key);
 
+            watcher.notify(new Entry(key, null));
+
             return CompletableFuture.allOf();
         }
     }
@@ -106,7 +112,15 @@ public class VaultServiceImpl implements VaultService {
     /** {@inheritDoc} */
     @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
         synchronized (mux) {
-            storage.putAll(vals);
+            vals.forEach((k, v) -> {
+                if (v == null)
+                    storage.remove(k);
+            });
+
+            storage.putAll(vals.entrySet()
+                .stream()
+                .filter(e -> e.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
 
             return CompletableFuture.allOf();
         }
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
index 247cf4f..45bc884 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
@@ -34,24 +34,26 @@ public interface VaultService {
      * Retrieves an entry for the given key.
      *
      * @param key Key. Couldn't be {@code null}.
-     * @return An entry for the given key. Couldn't be {@code null}.
+     * @return An entry for the given key. Couldn't be {@code null}. If there is no mapping for the provided {@code key},
+     * then {@code Entry} with value that equals to null will be returned.
      */
     @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key);
 
     /**
-     * Write value with key to vault.
+     * Write value with key to vault. If value is equal to null, then previous value with key will be deleted if there
+     * was any mapping.
      *
      * @param key Vault key. Couldn't be {@code null}.
-     * @param val Value. Couldn't be {@code null}.
-     * @return Future representing pending completion of the operation.
+     * @param val Value. If value is equal to null, then previous value with key will be deleted if there was any mapping.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
      */
-    @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val);
+    @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val);
 
     /**
      * Remove value with key from vault.
      *
      * @param key Vault key. Couldn't be {@code null}.
-     * @return Future representing pending completion of the operation.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
      */
     @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key);
 
@@ -76,15 +78,16 @@ public interface VaultService {
      * Cancels subscription for the given identifier.
      *
      * @param id Subscription identifier.
-     * @return Completed future in case of operation success. Couldn't be {@code null}.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
      */
     @NotNull CompletableFuture<Void> stopWatch(@NotNull Long id);
 
     /**
-     * Inserts or updates entries with given keys and given values.
+     * Inserts or updates entries with given keys and given values. If the given value in {@code vals} is null,
+     * then corresponding value with key will be deleted if there was any mapping.
      *
      * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
-     * @return Completed future.
+     * @return Future representing pending completion of the operation. Couldn't be {@code null}.
      */
     @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals);
 }