You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/02/15 15:02:23 UTC
[ignite-3] branch main updated: IGNITE-14178 Configuration storage
interface made asynchronous. (#52)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7e08e60 IGNITE-14178 Configuration storage interface made asynchronous. (#52)
7e08e60 is described below
commit 7e08e60dc110ee159f306c3be0f98f6666093a23
Author: ibessonov <be...@gmail.com>
AuthorDate: Mon Feb 15 18:01:48 2021 +0300
IGNITE-14178 Configuration storage interface made asynchronous. (#52)
---
.../sample/storage/ConfigurationChangerTest.java | 20 ++++-----
.../sample/storage/TestConfigurationStorage.java | 9 ++--
.../ignite/configuration/ConfigurationChanger.java | 52 +++++++++++++---------
.../storage/ConfigurationStorage.java | 8 ++--
4 files changed, 50 insertions(+), 39 deletions(-)
diff --git a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java
index 8460862..976efa8 100644
--- a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java
+++ b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/ConfigurationChangerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration.sample.storage;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.ConfigurationChanger;
import org.apache.ignite.configuration.Configurator;
@@ -29,7 +30,6 @@ import org.apache.ignite.configuration.annotation.NamedConfigValue;
import org.apache.ignite.configuration.annotation.Value;
import org.apache.ignite.configuration.sample.storage.impl.ANode;
import org.apache.ignite.configuration.storage.Data;
-import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.configuration.validation.ValidationIssue;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -82,7 +82,7 @@ public class ConfigurationChangerTest {
* Test simple change of configuration.
*/
@Test
- public void testSimpleConfigurationChange() {
+ public void testSimpleConfigurationChange() throws Exception {
final TestConfigurationStorage storage = new TestConfigurationStorage();
final ConfiguratorController configuratorController = new ConfiguratorController();
@@ -97,7 +97,7 @@ public class ConfigurationChangerTest {
changer.registerConfiguration(KEY, configurator);
- changer.change(Collections.singletonMap(KEY, data));
+ changer.change(Collections.singletonMap(KEY, data)).get();
final Data dataFromStorage = storage.readAll();
final Map<String, Serializable> dataMap = dataFromStorage.values();
@@ -112,7 +112,7 @@ public class ConfigurationChangerTest {
* Test subsequent change of configuration via different changers.
*/
@Test
- public void testModifiedFromAnotherStorage() {
+ public void testModifiedFromAnotherStorage() throws Exception {
final TestConfigurationStorage storage = new TestConfigurationStorage();
final ConfiguratorController configuratorController = new ConfiguratorController();
@@ -138,8 +138,8 @@ public class ConfigurationChangerTest {
changer1.registerConfiguration(KEY, configurator);
changer2.registerConfiguration(KEY, configurator);
- changer1.change(Collections.singletonMap(KEY, data1));
- changer2.change(Collections.singletonMap(KEY, data2));
+ changer1.change(Collections.singletonMap(KEY, data1)).get();
+ changer2.change(Collections.singletonMap(KEY, data2)).get();
final Data dataFromStorage = storage.readAll();
final Map<String, Serializable> dataMap = dataFromStorage.values();
@@ -155,7 +155,7 @@ public class ConfigurationChangerTest {
* Test that subsequent change of configuration is failed if changes are incompatible.
*/
@Test
- public void testModifiedFromAnotherStorageWithIncompatibleChanges() {
+ public void testModifiedFromAnotherStorageWithIncompatibleChanges() throws Exception {
final TestConfigurationStorage storage = new TestConfigurationStorage();
final ConfiguratorController configuratorController = new ConfiguratorController();
@@ -181,11 +181,11 @@ public class ConfigurationChangerTest {
changer1.registerConfiguration(KEY, configurator);
changer2.registerConfiguration(KEY, configurator);
- changer1.change(Collections.singletonMap(KEY, data1));
+ changer1.change(Collections.singletonMap(KEY, data1)).get();
configuratorController.hasIssues(true);
- assertThrows(ConfigurationValidationException.class, () -> changer2.change(Collections.singletonMap(KEY, data2)));
+ assertThrows(ExecutionException.class, () -> changer2.change(Collections.singletonMap(KEY, data2)).get());
final Data dataFromStorage = storage.readAll();
final Map<String, Serializable> dataMap = dataFromStorage.values();
@@ -222,7 +222,7 @@ public class ConfigurationChangerTest {
storage.fail(true);
- assertThrows(ConfigurationChangeException.class, () -> changer.change(Collections.singletonMap(KEY, data)));
+ assertThrows(ExecutionException.class, () -> changer.change(Collections.singletonMap(KEY, data)).get());
storage.fail(false);
diff --git a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java
index e5b2228..a9f3d32 100644
--- a/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java
+++ b/modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/storage/TestConfigurationStorage.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.configuration.storage.ConfigurationStorage;
@@ -62,12 +63,12 @@ public class TestConfigurationStorage implements ConfigurationStorage {
}
/** {@inheritDoc} */
- @Override public synchronized boolean write(Map<String, Serializable> newValues, int sentVersion) throws StorageException {
+ @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, int sentVersion) throws StorageException {
if (fail)
- throw new StorageException("Failed to write data");
+ return CompletableFuture.failedFuture(new StorageException("Failed to write data"));
if (sentVersion != version.get())
- return false;
+ return CompletableFuture.completedFuture(false);
map.putAll(newValues);
@@ -75,7 +76,7 @@ public class TestConfigurationStorage implements ConfigurationStorage {
listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, version.get())));
- return true;
+ return CompletableFuture.completedFuture(true);
}
/** {@inheritDoc} */
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
index afcc0e6..d97d7c1 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChanger.java
@@ -18,10 +18,11 @@ package org.apache.ignite.configuration;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.internal.util.ConfigurationUtil;
@@ -39,6 +40,9 @@ import org.apache.ignite.configuration.validation.ValidationIssue;
* Class that handles configuration changes, by validating them, passing to storage and listening to storage updates.
*/
public class ConfigurationChanger {
+ /** */
+ private final ForkJoinPool pool = new ForkJoinPool(2);
+
/** Map of configurations' configurators. */
private Map<RootKey<?>, Configurator<?>> registry = new HashMap<>();
@@ -86,37 +90,43 @@ public class ConfigurationChanger {
* Change configuration.
* @param changes Map of changes by root key.
*/
- public void change(Map<RootKey<?>, TraversableTreeNode> changes) throws ConfigurationChangeException,
- ConfigurationValidationException {
+ public CompletableFuture<Void> change(Map<RootKey<?>, TraversableTreeNode> changes) {
+ CompletableFuture<Void> fut = new CompletableFuture<>();
+
+ pool.execute(() -> change0(changes, fut));
+
+ return fut;
+ }
+
+ /** */
+ private void change0(Map<RootKey<?>, TraversableTreeNode> changes, CompletableFuture<?> fut) {
Map<String, Serializable> allChanges = changes.entrySet().stream()
.map((Map.Entry<RootKey<?>, TraversableTreeNode> change) -> convertChangesToMap(change.getKey(), change.getValue()))
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- boolean writing = true;
+ final ValidationResult validationResult = validate(changes);
- List<ValidationIssue> validationIssues = Collections.emptyList();
+ List<ValidationIssue> validationIssues = validationResult.issues();
- while (writing) {
- final ValidationResult validationResult = validate(changes);
+ if (!validationIssues.isEmpty()) {
+ fut.completeExceptionally(new ConfigurationValidationException(validationIssues));
- validationIssues = validationResult.issues();
+ return;
+ }
- final int version = validationResult.version();
+ final int version = validationResult.version();
- if (validationIssues.isEmpty())
- try {
- writing = !configurationStorage.write(allChanges, version);
- }
- catch (StorageException e) {
- throw new ConfigurationChangeException("Failed to change configuration: " + e.getMessage(), e);
- }
- else
- break;
- }
+ CompletableFuture<Boolean> writeFut = configurationStorage.write(allChanges, version);
- if (!validationIssues.isEmpty())
- throw new ConfigurationValidationException(validationIssues);
+ writeFut.whenCompleteAsync((casResult, throwable) -> {
+ if (throwable != null)
+ fut.completeExceptionally(new ConfigurationChangeException("Failed to change configuration", throwable));
+ else if (casResult)
+ fut.complete(null);
+ else
+ change0(changes, fut);
+ }, pool);
}
/**
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
index 66b8e83..386e500 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/storage/ConfigurationStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration.storage;
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
/**
* Common interface for configuration storage.
@@ -35,11 +36,10 @@ public interface ConfigurationStorage {
* Write key-value pairs into the storage with last known version.
* @param newValues Key-value pairs.
* @param version Last known version.
- * @return {@code true} if successfully written, {@code false} if version of the storage is different from the passed
- * argument.
- * @throws StorageException If failed to write data.
+ * @return Future that gives you {@code true} if successfully written, {@code false} if version of the storage is
+ * different from the passed argument and {@link StorageException} if failed to write data.
*/
- boolean write(Map<String, Serializable> newValues, int version) throws StorageException;
+ CompletableFuture<Boolean> write(Map<String, Serializable> newValues, int version);
/**
* Get all the keys of the configuration storage.