You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/02/15 15:02:23 UTC

[ignite-3] branch main updated: IGNITE-14178 Configuration storage interface made asynchronous. (#52)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e08e60  IGNITE-14178 Configuration storage interface made asynchronous. (#52)
7e08e60 is described below

commit 7e08e60dc110ee159f306c3be0f98f6666093a23
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Feb 15 18:01:48 2021 +0300

    IGNITE-14178 Configuration storage interface made asynchronous. (#52)
---
 .../sample/storage/ConfigurationChangerTest.java   | 20 ++++-----
 .../sample/storage/TestConfigurationStorage.java   |  9 ++--
 .../ignite/configuration/ConfigurationChanger.java | 52 +++++++++++++---------
 .../storage/ConfigurationStorage.java              |  8 ++--
 4 files changed, 50 insertions(+), 39 deletions(-)

diff --git a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java
index 8460862..976efa8 100644
--- a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java
+++ b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration.sample.storage;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import org.apache.ignite.configuration.ConfigurationChangeException;
 import org.apache.ignite.configuration.ConfigurationChanger;
 import org.apache.ignite.configuration.Configurator;
@@ -29,7 +30,6 @@ import org.apache.ignite.configuration.annotation.NamedConfigValue;
 import org.apache.ignite.configuration.annotation.Value;
 import org.apache.ignite.configuration.sample.storage.impl.ANode;
 import org.apache.ignite.configuration.storage.Data;
-import org.apache.ignite.configuration.validation.ConfigurationValidationException;
 import org.apache.ignite.configuration.validation.ValidationIssue;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
@@ -82,7 +82,7 @@ public class ConfigurationChangerTest {
      * Test simple change of configuration.
      */
     @Test
-    public void testSimpleConfigurationChange() {
+    public void testSimpleConfigurationChange() throws Exception {
         final TestConfigurationStorage storage = new TestConfigurationStorage();
 
         final ConfiguratorController configuratorController = new ConfiguratorController();
@@ -97,7 +97,7 @@ public class ConfigurationChangerTest {
 
         changer.registerConfiguration(KEY, configurator);
 
-        changer.change(Collections.singletonMap(KEY, data));
+        changer.change(Collections.singletonMap(KEY, data)).get();
 
         final Data dataFromStorage = storage.readAll();
         final Map<String, Serializable> dataMap = dataFromStorage.values();
@@ -112,7 +112,7 @@ public class ConfigurationChangerTest {
      * Test subsequent change of configuration via different changers.
      */
     @Test
-    public void testModifiedFromAnotherStorage() {
+    public void testModifiedFromAnotherStorage() throws Exception {
         final TestConfigurationStorage storage = new TestConfigurationStorage();
 
         final ConfiguratorController configuratorController = new ConfiguratorController();
@@ -138,8 +138,8 @@ public class ConfigurationChangerTest {
         changer1.registerConfiguration(KEY, configurator);
         changer2.registerConfiguration(KEY, configurator);
 
-        changer1.change(Collections.singletonMap(KEY, data1));
-        changer2.change(Collections.singletonMap(KEY, data2));
+        changer1.change(Collections.singletonMap(KEY, data1)).get();
+        changer2.change(Collections.singletonMap(KEY, data2)).get();
 
         final Data dataFromStorage = storage.readAll();
         final Map<String, Serializable> dataMap = dataFromStorage.values();
@@ -155,7 +155,7 @@ public class ConfigurationChangerTest {
      * Test that subsequent change of configuration is failed if changes are incompatible.
      */
     @Test
-    public void testModifiedFromAnotherStorageWithIncompatibleChanges() {
+    public void testModifiedFromAnotherStorageWithIncompatibleChanges() throws Exception {
         final TestConfigurationStorage storage = new TestConfigurationStorage();
 
         final ConfiguratorController configuratorController = new ConfiguratorController();
@@ -181,11 +181,11 @@ public class ConfigurationChangerTest {
         changer1.registerConfiguration(KEY, configurator);
         changer2.registerConfiguration(KEY, configurator);
 
-        changer1.change(Collections.singletonMap(KEY, data1));
+        changer1.change(Collections.singletonMap(KEY, data1)).get();
 
         configuratorController.hasIssues(true);
 
-        assertThrows(ConfigurationValidationException.class, () -> changer2.change(Collections.singletonMap(KEY, data2)));
+        assertThrows(ExecutionException.class, () -> changer2.change(Collections.singletonMap(KEY, data2)).get());
 
         final Data dataFromStorage = storage.readAll();
         final Map<String, Serializable> dataMap = dataFromStorage.values();
@@ -222,7 +222,7 @@ public class ConfigurationChangerTest {
 
         storage.fail(true);
 
-        assertThrows(ConfigurationChangeException.class, () -> changer.change(Collections.singletonMap(KEY, data)));
+        assertThrows(ExecutionException.class, () -> changer.change(Collections.singletonMap(KEY, data)).get());
 
         storage.fail(false);
 
diff --git a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java
index e5b2228..a9f3d32 100644
--- a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java
+++ b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.configuration.storage.ConfigurationStorage;
@@ -62,12 +63,12 @@ public class TestConfigurationStorage implements ConfigurationStorage {
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized boolean write(Map<String, Serializable> newValues, int sentVersion) throws StorageException {
+    @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, int sentVersion) throws StorageException {
         if (fail)
-            throw new StorageException("Failed to write data");
+            return CompletableFuture.failedFuture(new StorageException("Failed to write data"));
 
         if (sentVersion != version.get())
-            return false;
+            return CompletableFuture.completedFuture(false);
 
         map.putAll(newValues);
 
@@ -75,7 +76,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
 
         listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get())));
 
-        return true;
+        return CompletableFuture.completedFuture(true);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
index afcc0e6..d97d7c1 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
@@ -18,10 +18,11 @@ package org.apache.ignite.configuration;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.internal.util.ConfigurationUtil;
@@ -39,6 +40,9 @@ import org.apache.ignite.configuration.validation.ValidationIssue;
  * Class that handles configuration changes, by validating them, passing to storage and listening to storage updates.
  */
 public class ConfigurationChanger {
+    /** */
+    private final ForkJoinPool pool = new ForkJoinPool(2);
+
     /** Map of configurations' configurators. */
     private Map<RootKey<?>, Configurator<?>> registry = new HashMap<>();
 
@@ -86,37 +90,43 @@ public class ConfigurationChanger {
      * Change configuration.
      * @param changes Map of changes by root key.
      */
-    public void change(Map<RootKey<?>, TraversableTreeNode> changes) throws ConfigurationChangeException,
-        ConfigurationValidationException {
+    public CompletableFuture<Void> change(Map<RootKey<?>, TraversableTreeNode> changes) {
+        CompletableFuture<Void> fut = new CompletableFuture<>();
+
+        pool.execute(() -> change0(changes, fut));
+
+        return fut;
+    }
+
+    /** */
+    private void change0(Map<RootKey<?>, TraversableTreeNode> changes, CompletableFuture<?> fut) {
         Map<String, Serializable> allChanges = changes.entrySet().stream()
             .map((Map.Entry<RootKey<?>, TraversableTreeNode> change) -> convertChangesToMap(change.getKey(), change.getValue()))
             .flatMap(map -> map.entrySet().stream())
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
-        boolean writing = true;
+        final ValidationResult validationResult = validate(changes);
 
-        List<ValidationIssue> validationIssues = Collections.emptyList();
+        List<ValidationIssue> validationIssues = validationResult.issues();
 
-        while (writing) {
-            final ValidationResult validationResult = validate(changes);
+        if (!validationIssues.isEmpty()) {
+            fut.completeExceptionally(new ConfigurationValidationException(validationIssues));
 
-            validationIssues = validationResult.issues();
+            return;
+        }
 
-            final int version = validationResult.version();
+        final int version = validationResult.version();
 
-            if (validationIssues.isEmpty())
-                try {
-                    writing = !configurationStorage.write(allChanges, version);
-                }
-                catch (StorageException e) {
-                    throw new ConfigurationChangeException("Failed to change configuration: " + e.getMessage(), e);
-                }
-            else
-                break;
-        }
+        CompletableFuture<Boolean> writeFut = configurationStorage.write(allChanges, version);
 
-        if (!validationIssues.isEmpty())
-            throw new ConfigurationValidationException(validationIssues);
+        writeFut.whenCompleteAsync((casResult, throwable) -> {
+            if (throwable != null)
+                fut.completeExceptionally(new ConfigurationChangeException("Failed to change configuration", throwable));
+            else if (casResult)
+                fut.complete(null);
+            else
+                change0(changes, fut);
+        }, pool);
     }
 
     /**
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
index 66b8e83..386e500 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration.storage;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Common interface for configuration storage.
@@ -35,11 +36,10 @@ public interface ConfigurationStorage {
      * Write key-value pairs into the storage with last known version.
      * @param newValues Key-value pairs.
      * @param version Last known version.
-     * @return {@code true} if successfully written, {@code false} if version of the storage is different from the passed
-     * argument.
-     * @throws StorageException If failed to write data.
+     * @return Future that gives you {@code true} if successfully written, {@code false} if version of the storage is
+     *      different from the passed argument and {@link StorageException} if failed to write data.
      */
-    boolean write(Map<String, Serializable> newValues, int version) throws StorageException;
+    CompletableFuture<Boolean> write(Map<String, Serializable> newValues, int version);
 
     /**
      * Get all the keys of the configuration storage.