You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/06 08:51:49 UTC

[GitHub] [ignite-3] ibessonov commented on a change in pull request #114: IGNITE-14666 Add proper listener handling in DistributedConfigurationStorage and LocalConfigurationStorage

ibessonov commented on a change in pull request #114:
URL: https://github.com/apache/ignite-3/pull/114#discussion_r627190228



##########
File path: modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
##########
@@ -428,7 +428,7 @@ private void updateFromListener(
 
         ConfigurationStorage storage = storageInstances.get(storageType);
 
-        long storageRevision = changedEntries.storageRevision();
+        long storageRevision = changedEntries.cfgVersion();

Review comment:
       Can we rename it to "changeId" to avoid further confusion?

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -19,24 +19,47 @@
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in metastorage. */
+    private static String DISTRIBUTED_PREFIX = ConfigurationType.DISTRIBUTED.name() + "-cfg";

Review comment:
       not final

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -19,24 +19,47 @@
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in metastorage. */
+    private static String DISTRIBUTED_PREFIX = ConfigurationType.DISTRIBUTED.name() + "-cfg";
+
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(DistributedConfigurationStorage.class);
+
+    /** Key for CAS-ing configuration keys to metastorage. */
+    private static Key masterKey = new Key(DISTRIBUTED_PREFIX + ".");

Review comment:
       not final

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),

Review comment:
       can we reuse "masterKey" constant here?

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;

Review comment:
       this should be a revision of master key, right? You don't need to explicitly compute max value.

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));

Review comment:
       How exactly do you serialize these values? You should do this effectively!

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX + ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + (char)('.' + 1));

Review comment:
       Cool, make similar constant for metastorage

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX + ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + (char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, ByteUtils.toBytes(entry.getValue()));

Review comment:
       I told you why I don't like this solution.

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX + ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + (char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();
+
+            latch = new CountDownLatch(newValues.size());
+
+            latch.await();
+
+            for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+                ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + entry.getKey());
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
+                Entry e = vaultMgr.get(key).get();
+
+                if (e.value() != ByteUtils.toBytes(entry.getValue()))

Review comment:
       We should discuss this in person, current approach doesn't look right.

##########
File path: modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
##########
@@ -44,6 +44,8 @@
      * Add listener to the storage that notifies of data changes.
      * @param listener Listener.
      */
+    // TODO: seems that it's not needed to have an ability to set several listeners to storage, as far as only one is responsible

Review comment:
       Please assign ticket to every TODO that yo put in the code, otherwise we'll lose them

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));

Review comment:
       I know why you did this, but please extract it into method and add comment about range :)

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));
+
+                if (maxRevision < entry.revision())
+                    maxRevision = entry.revision();
+            } else
+                entryForMasterKey = entry;
+        }
+
+        if (!data.isEmpty()) {
+            assert entryForMasterKey != null;
+
+            assert maxRevision == entryForMasterKey.revision();
+
+            assert maxRevision >= version.get();
+
+            return new Data(data, maxRevision);
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        assert sentVersion <= version.get();
+
+        if (sentVersion != version.get())
+            // This means that sentVersion is less than version and other node has already updated configuration and
+            // write should be retried. Actual version will be set when watch and corresponding configuration listener
+            // updates configuration and notifyApplied is triggered afterwards.
             return CompletableFuture.completedFuture(false);
 
+        HashSet<Operation> operations = new HashSet<>();
+
+        HashSet<Operation> failures = new HashSet<>();
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            Key key = new Key(DISTRIBUTED_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                operations.add(Operations.put(key, ByteUtils.toBytes(entry.getValue())));
             else
-                map.remove(entry.getKey());
-        }
+                operations.add(Operations.remove(key));
 
-        this.version.incrementAndGet();
+            failures.add(Operations.noop());
+        }
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
+        operations.add(Operations.put(masterKey, new byte[1]));
 
-        return CompletableFuture.completedFuture(true);
+        return metaStorageMgr.invoke(Conditions.key(masterKey).revision().eq(version.get()), operations, failures);
     }
 
     /** {@inheritDoc} */
-    @Override public void addListener(ConfigurationStorageListener listener) {
+    @Override public synchronized void addListener(ConfigurationStorageListener listener) {
         listeners.add(listener);
+
+        if (watchId == null) {
+            watchId = metaStorageMgr.registerWatchByPrefix(masterKey, new WatchListener() {
+                @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                    HashMap<String, Serializable> data = new HashMap<>();
+
+                    long maxRevision = 0L;
+
+                    Entry entryForMasterKey = null;
+
+                    for (WatchEvent event : events) {
+                        Entry e = event.newEntry();
+
+                        if (!e.key().equals(masterKey)) {
+                            data.put(e.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),

Review comment:
       Same here

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));
+
+                if (maxRevision < entry.revision())
+                    maxRevision = entry.revision();
+            } else
+                entryForMasterKey = entry;
+        }
+
+        if (!data.isEmpty()) {
+            assert entryForMasterKey != null;
+
+            assert maxRevision == entryForMasterKey.revision();
+
+            assert maxRevision >= version.get();
+
+            return new Data(data, maxRevision);
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        assert sentVersion <= version.get();
+
+        if (sentVersion != version.get())
+            // This means that sentVersion is less than version and other node has already updated configuration and
+            // write should be retried. Actual version will be set when watch and corresponding configuration listener
+            // updates configuration and notifyApplied is triggered afterwards.
             return CompletableFuture.completedFuture(false);
 
+        HashSet<Operation> operations = new HashSet<>();
+
+        HashSet<Operation> failures = new HashSet<>();
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            Key key = new Key(DISTRIBUTED_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                operations.add(Operations.put(key, ByteUtils.toBytes(entry.getValue())));
             else
-                map.remove(entry.getKey());
-        }
+                operations.add(Operations.remove(key));
 
-        this.version.incrementAndGet();
+            failures.add(Operations.noop());
+        }
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
+        operations.add(Operations.put(masterKey, new byte[1]));

Review comment:
       Will metastorage update revision if value is the same every time?

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                    (Serializable)ByteUtils.fromBytes(entry.value()));
+
+                if (maxRevision < entry.revision())
+                    maxRevision = entry.revision();
+            } else
+                entryForMasterKey = entry;
+        }
+
+        if (!data.isEmpty()) {
+            assert entryForMasterKey != null;
+
+            assert maxRevision == entryForMasterKey.revision();
+
+            assert maxRevision >= version.get();
+
+            return new Data(data, maxRevision);
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        assert sentVersion <= version.get();
+
+        if (sentVersion != version.get())
+            // This means that sentVersion is less than version and other node has already updated configuration and
+            // write should be retried. Actual version will be set when watch and corresponding configuration listener
+            // updates configuration and notifyApplied is triggered afterwards.
             return CompletableFuture.completedFuture(false);
 
+        HashSet<Operation> operations = new HashSet<>();
+
+        HashSet<Operation> failures = new HashSet<>();
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            Key key = new Key(DISTRIBUTED_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                operations.add(Operations.put(key, ByteUtils.toBytes(entry.getValue())));
             else
-                map.remove(entry.getKey());
-        }
+                operations.add(Operations.remove(key));
 
-        this.version.incrementAndGet();
+            failures.add(Operations.noop());
+        }
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
+        operations.add(Operations.put(masterKey, new byte[1]));
 
-        return CompletableFuture.completedFuture(true);
+        return metaStorageMgr.invoke(Conditions.key(masterKey).revision().eq(version.get()), operations, failures);
     }
 
     /** {@inheritDoc} */
-    @Override public void addListener(ConfigurationStorageListener listener) {
+    @Override public synchronized void addListener(ConfigurationStorageListener listener) {
         listeners.add(listener);
+
+        if (watchId == null) {
+            watchId = metaStorageMgr.registerWatchByPrefix(masterKey, new WatchListener() {
+                @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+                    HashMap<String, Serializable> data = new HashMap<>();
+
+                    long maxRevision = 0L;
+
+                    Entry entryForMasterKey = null;
+
+                    for (WatchEvent event : events) {
+                        Entry e = event.newEntry();
+
+                        if (!e.key().equals(masterKey)) {
+                            data.put(e.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),
+                                (Serializable)ByteUtils.fromBytes(e.value()));
+
+                            if (maxRevision < e.revision())
+                                maxRevision = e.revision();
+                        } else
+                            entryForMasterKey = e;
+                    }
+
+                    // Contract of metastorage ensures that all updates of one revision will come in one batch.
+                    // Also masterKey should be updated every time when we update cfg.
+                    // That means that masterKey update must be included in the batch.
+                    assert entryForMasterKey != null;
+
+                    assert maxRevision == entryForMasterKey.revision();
+
+                    assert maxRevision >= version.get();
+
+                    long finalMaxRevision = maxRevision;
+
+                    listeners.forEach(listener -> listener.onEntriesChanged(new Data(data, finalMaxRevision)));
+
+                    return true;
+                }
+
+                @Override public void onError(@NotNull Throwable e) {
+                    LOG.error("Metastorage listener issue", e);
+                }
+            });
+
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void removeListener(ConfigurationStorageListener listener) {
+    @Override public synchronized void removeListener(ConfigurationStorageListener listener) {

Review comment:
       I'm not sure that we even need this method :( No one uses it

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX + ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + (char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();
+
+            latch = new CountDownLatch(newValues.size());
+
+            latch.await();

Review comment:
       WHAT?
   Let me explain, how do you avoid races?

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX + ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + (char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();
+
+            latch = new CountDownLatch(newValues.size());
+
+            latch.await();
+
+            for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+                ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + entry.getKey());
 
-        listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
+                Entry e = vaultMgr.get(key).get();
+
+                if (e.value() != ByteUtils.toBytes(entry.getValue()))
+                    // value by some key was overwritten, that means that changes not
+                    // from LocalConfigurationStorage.write overlapped with current changes, so write should be retried.
+                    return CompletableFuture.completedFuture(false);
+            }
+        }
+        catch (InterruptedException | ExecutionException e) {
+            return CompletableFuture.completedFuture(false);

Review comment:
       hm...

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX + ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + (char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
-        if (version != this.version.get())
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long sentVersion) {
+        if (sentVersion != version.get())
             return CompletableFuture.completedFuture(false);
 
+        CompletableFuture[] futs = new CompletableFuture[newValues.size()];
+
+        int i = 0;
+
         for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
+            ByteArray key = ByteArray.fromString(LOCAL_PREFIX + "." + entry.getKey());
+
             if (entry.getValue() != null)
-                map.put(entry.getKey(), entry.getValue());
+                futs[i++] = vaultMgr.put(key, ByteUtils.toBytes(entry.getValue()));
             else
-                map.remove(entry.getKey());
+                futs[i++] = vaultMgr.remove(key);
         }
 
-        this.version.incrementAndGet();
+        try {
+            CompletableFuture.allOf(futs).get();

Review comment:
       What? 

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/LocalConfigurationStorage.java
##########
@@ -49,47 +69,121 @@ public LocalConfigurationStorage(VaultManager vaultMgr) {
         this.vaultMgr = vaultMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
     /** Storage version. */
     private AtomicLong version = new AtomicLong(0);
 
+    /** Start key in ragne for searching local configuration keys. */
+    private ByteArray localKeysStartRange = ByteArray.fromString(LOCAL_PREFIX + ".");
+
+    /** End key in range for searching local configuration keys. */
+    private ByteArray localKeysEndRange = ByteArray.fromString(LOCAL_PREFIX + (char)('.' + 1));
+
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+        Iterator<Entry> iter =
+            vaultMgr.range(localKeysStartRange, localKeysEndRange);
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        while (iter.hasNext()) {
+            Entry val = iter.next();
+
+            data.put(val.key().toString().replaceFirst(LOCAL_PREFIX + ".", ""),
+                (Serializable)ByteUtils.fromBytes(val.value()));
+        }
+
+        return new Data(data, version.get());

Review comment:
       version is not persisted, right?

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -49,52 +72,152 @@ public DistributedConfigurationStorage(MetaStorageManager metaStorageMgr) {
         this.metaStorageMgr = metaStorageMgr;
     }
 
-    /** Map to store values. */
-    private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
     /** Change listeners. */
     private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
 
-    /** Storage version. */
-    private AtomicLong version = new AtomicLong(0);
+    /** Storage version. It stores actual metastorage revision, that is applied to configuration manager. */
+    private AtomicLong version = new AtomicLong(0L);
 
     /** {@inheritDoc} */
     @Override public synchronized Data readAll() throws StorageException {
-        return new Data(new HashMap<>(map), version.get(), 0);
+
+        Cursor<Entry> cur = metaStorageMgr.rangeWithAppliedRevision(new Key(DISTRIBUTED_PREFIX + "."),
+            new Key(DISTRIBUTED_PREFIX + (char)('.' + 1)));
+
+        HashMap<String, Serializable> data = new HashMap<>();
+
+        long maxRevision = 0L;
+
+        Entry entryForMasterKey = null;
+
+        for (Entry entry : cur) {
+            if (!entry.key().equals(masterKey)) {
+                data.put(entry.key().toString().replaceFirst(DISTRIBUTED_PREFIX + ".", ""),

Review comment:
       This method expects regexp, so there are two issues:
   - it is very slow;
   - "." symbol is not properly escaped.
   Why won't you use "substring"?

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/storage/DistributedConfigurationStorage.java
##########
@@ -19,24 +19,47 @@
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
 import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.configuration.storage.ConfigurationType;
 import org.apache.ignite.configuration.storage.Data;
 import org.apache.ignite.configuration.storage.StorageException;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Distributed configuration storage.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class DistributedConfigurationStorage implements ConfigurationStorage {
+public class DistributedConfigurationStorage implements ConfigurationStorage {
+    /** Prefix that we add to configuration keys to distinguish them in metastorage. */
+    private static String DISTRIBUTED_PREFIX = ConfigurationType.DISTRIBUTED.name() + "-cfg";

Review comment:
       I'd prefer shorter string, because this part will be duplicated literally hundreds of times in many messages




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org