You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/01/21 11:50:04 UTC
[ignite-3] branch main updated: IGNITE-16263 Invoke configuration listeners when components were restored (#568)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 73a2f3c IGNITE-16263 Invoke configuration listeners when components were restored (#568)
73a2f3c is described below
commit 73a2f3c9f2792e0a2964cb491e552d57659c903e
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri Jan 21 14:49:57 2022 +0300
IGNITE-16263 Invoke configuration listeners when components were restored (#568)
---
.../ConfigurationNotificationEvent.java | 2 +
.../configuration/ConfigurationChanger.java | 19 +-
.../configuration/ConfigurationRegistry.java | 48 +++-
.../notifications/ConfigurationNotifier.java | 85 +++---
.../notifications/ConfigurationListenerTest.java | 314 +++++++++++++--------
.../org/apache/ignite/internal/app/IgniteImpl.java | 15 +
.../internal/table/distributed/TableManager.java | 88 +++---
7 files changed, 365 insertions(+), 206 deletions(-)
diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationNotificationEvent.java b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationNotificationEvent.java
index 7b1332f..b3e0e4c 100644
--- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationNotificationEvent.java
+++ b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationNotificationEvent.java
@@ -32,6 +32,8 @@ public interface ConfigurationNotificationEvent<VIEWT> {
/**
* Returns the previous value of the updated configuration.
*
+ * <p>NOTE: For a new configuration/property will be {@code null}.
+ *
* @return Previous value of the updated configuration.
*/
@Nullable VIEWT oldValue();
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 8a3b331..9f98434 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -70,7 +70,7 @@ import org.apache.ignite.internal.configuration.validation.MemberKey;
import org.apache.ignite.internal.configuration.validation.ValidationUtil;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Class that handles configuration changes, by validating them, passing to storage and listening to storage updates.
@@ -106,12 +106,12 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
/**
* Invoked every time when the configuration is updated.
*
- * @param oldRoot Old roots values. All these roots always belong to a single storage.
- * @param newRoot New values for the same roots as in {@code oldRoot}.
+ * @param oldRoot Old roots values. All these roots always belong to a single storage.
+ * @param newRoot New values for the same roots as in {@code oldRoot}.
* @param storageRevision Revision of the storage.
* @return Not-null future that must signify when processing is completed. Exceptional completion is not expected.
*/
- @NotNull CompletableFuture<Void> notify(SuperRoot oldRoot, SuperRoot newRoot, long storageRevision);
+ CompletableFuture<Void> notify(@Nullable SuperRoot oldRoot, SuperRoot newRoot, long storageRevision);
}
/**
@@ -565,4 +565,15 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange
});
}
}
+
+ /**
+ * Notifies all listeners of the current configuration.
+ *
+ * @return Future that must signify when processing is completed.
+ */
+ CompletableFuture<Void> notifyCurrentConfigurationListeners() {
+ StorageRoots storageRoots = this.storageRoots;
+
+ return notificator.notify(null, storageRoots.roots, storageRoots.version);
+ }
}
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
index c2362fc..a59eb06 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
@@ -51,6 +51,9 @@ import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.InternalConfiguration;
import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
import org.apache.ignite.configuration.annotation.PolymorphicId;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.configuration.validation.ExceptKeys;
import org.apache.ignite.configuration.validation.Immutable;
import org.apache.ignite.configuration.validation.Max;
@@ -72,6 +75,7 @@ import org.apache.ignite.internal.configuration.validation.MinValidator;
import org.apache.ignite.internal.configuration.validation.OneOfValidator;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
/**
* Configuration registry.
@@ -261,32 +265,46 @@ public class ConfigurationRegistry implements IgniteComponent {
/**
* Configuration change notifier.
*
- * @param oldSuperRoot Old roots values. All these roots always belong to a single storage.
- * @param newSuperRoot New values for the same roots as in {@code oldRoot}.
+ * @param oldSuperRoot Old roots values. All these roots always belong to a single storage.
+ * @param newSuperRoot New values for the same roots as in {@code oldRoot}.
* @param storageRevision Revision of the storage.
* @return Future that must signify when processing is completed.
*/
- private CompletableFuture<Void> notificator(SuperRoot oldSuperRoot, SuperRoot newSuperRoot, long storageRevision) {
- List<CompletableFuture<?>> futures = new ArrayList<>();
+ private CompletableFuture<Void> notificator(
+ @Nullable SuperRoot oldSuperRoot,
+ SuperRoot newSuperRoot,
+ long storageRevision
+ ) {
+ Collection<CompletableFuture<?>> futures = new ArrayList<>();
newSuperRoot.traverseChildren(new ConfigurationVisitor<Void>() {
/** {@inheritDoc} */
@Override
public Void visitInnerNode(String key, InnerNode newRoot) {
- InnerNode oldRoot = oldSuperRoot.traverseChild(key, innerNodeVisitor(), true);
+ DynamicConfiguration<InnerNode, ?> config = (DynamicConfiguration<InnerNode, ?>) configs.get(key);
+
+ assert config != null : key;
- var cfg = (DynamicConfiguration<InnerNode, ?>) configs.get(key);
+ InnerNode oldRoot;
- assert oldRoot != null && cfg != null : key;
+ if (oldSuperRoot != null) {
+ oldRoot = oldSuperRoot.traverseChild(key, innerNodeVisitor(), true);
- if (oldRoot != newRoot) {
- futures.addAll(notifyListeners(oldRoot, newRoot, cfg, storageRevision));
+ assert oldRoot != null : key;
+ } else {
+ oldRoot = null;
}
+ futures.addAll(notifyListeners(oldRoot, newRoot, config, storageRevision));
+
return null;
}
}, true);
+ if (futures.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
// Map futures is only for logging errors.
Function<CompletableFuture<?>, CompletableFuture<?>> mapping = fut -> fut.whenComplete((res, throwable) -> {
if (throwable != null) {
@@ -300,6 +318,18 @@ public class ConfigurationRegistry implements IgniteComponent {
}
/**
+ * Notifies all listeners of the current configuration.
+ *
+ * <p>{@link ConfigurationListener#onUpdate} and {@link ConfigurationNamedListListener#onCreate} will be called and the value will
+ * only be in {@link ConfigurationNotificationEvent#newValue}.
+ *
+ * @return Future that must signify when processing is completed.
+ */
+ public CompletableFuture<Void> notifyCurrentConfigurationListeners() {
+ return changer.notifyCurrentConfigurationListeners();
+ }
+
+ /**
* Get configuration schemas and their validated internal extensions with checks.
*
* @param allSchemas All configuration schemas.
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
index 5bbf7bf..4be6808 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
@@ -56,6 +56,10 @@ public class ConfigurationNotifier {
/**
* Recursive notification of all configuration listeners.
*
+ * <p>NOTE: If {@code oldInnerNode == null}, then {@link ConfigurationListener#onUpdate} and
+ * {@link ConfigurationNamedListListener#onCreate} will be called and the value will only be in
+ * {@link ConfigurationNotificationEvent#newValue}.
+ *
* @param oldInnerNode Old configuration values root.
* @param newInnerNode New configuration values root.
* @param config Public configuration tree node corresponding to the current inner nodes.
@@ -70,19 +74,23 @@ public class ConfigurationNotifier {
DynamicConfiguration<InnerNode, ?> config,
long storageRevision
) {
- if (oldInnerNode == null || oldInnerNode == newInnerNode) {
+ if (oldInnerNode == newInnerNode) {
return List.of();
}
- ConfigurationNotificationContext configCtx = new ConfigurationNotificationContext(storageRevision);
+ ConfigurationNotificationContext notificationCtx = new ConfigurationNotificationContext(storageRevision);
- configCtx.addContainer(config, null);
+ notificationCtx.addContainer(config, null);
- notifyListeners(oldInnerNode, newInnerNode, config, List.of(), configCtx);
+ if (oldInnerNode == null) {
+ notifyListeners(newInnerNode, config, List.of(), notificationCtx);
+ } else {
+ notifyListeners(oldInnerNode, newInnerNode, config, List.of(), notificationCtx);
+ }
- configCtx.removeContainer(config);
+ notificationCtx.removeContainer(config);
- return configCtx.futures;
+ return notificationCtx.futures;
}
private static void notifyListeners(
@@ -208,7 +216,7 @@ public class ConfigurationNotifier {
);
}
- notifyAnyListenersOnCreate(
+ notifyListeners(
newVal,
newNodeCfg,
newAnyConfigs,
@@ -318,16 +326,23 @@ public class ConfigurationNotifier {
}, true);
}
- private static void notifyAnyListenersOnCreate(
+ /**
+ * Recursive notification of all configuration listeners.
+ *
+ * <p>NOTE: Only {@link ConfigurationListener#onUpdate} and {@link ConfigurationNamedListListener#onCreate} will be called.
+ *
+ * <p>NOTE: Value will only be in {@link ConfigurationNotificationEvent#newValue}.
+ */
+ private static void notifyListeners(
InnerNode innerNode,
- DynamicConfiguration<InnerNode, ?> cfgNode,
+ DynamicConfiguration<InnerNode, ?> config,
Collection<DynamicConfiguration<InnerNode, ?>> anyConfigs,
ConfigurationNotificationContext notificationCtx
) {
- assert !(cfgNode instanceof NamedListConfiguration);
+ assert !(config instanceof NamedListConfiguration);
notifyPublicListeners(
- List.of(),
+ config.listeners(),
viewReadOnly(anyConfigs, ConfigurationNode::listeners),
null,
innerNode.specificNode(),
@@ -338,12 +353,12 @@ public class ConfigurationNotifier {
innerNode.traverseChildren(new ConfigurationVisitor<Void>() {
/** {@inheritDoc} */
@Override
- public Void visitLeafNode(String key, Serializable newLeaf) {
+ public Void visitLeafNode(String key, Serializable leaf) {
notifyPublicListeners(
- List.of(),
+ listeners(dynamicProperty(config, key)),
viewReadOnly(anyConfigs, anyConfig -> listeners(dynamicProperty(anyConfig, key))),
null,
- newLeaf,
+ leaf,
notificationCtx,
ConfigurationListener::onUpdate
);
@@ -353,19 +368,19 @@ public class ConfigurationNotifier {
/** {@inheritDoc} */
@Override
- public Void visitInnerNode(String key, InnerNode newNode) {
- DynamicConfiguration<InnerNode, ?> innerNodeCfg = dynamicConfig(cfgNode, key);
+ public Void visitInnerNode(String key, InnerNode nestedInnerNode) {
+ DynamicConfiguration<InnerNode, ?> nestedNodeConfig = dynamicConfig(config, key);
- notificationCtx.addContainer(innerNodeCfg, null);
+ notificationCtx.addContainer(nestedNodeConfig, null);
- notifyAnyListenersOnCreate(
- newNode,
- innerNodeCfg,
- viewReadOnly(anyConfigs, cfg -> dynamicConfig(cfg, key)),
+ notifyListeners(
+ nestedInnerNode,
+ nestedNodeConfig,
+ viewReadOnly(anyConfigs, anyConfig -> dynamicConfig(anyConfig, key)),
notificationCtx
);
- notificationCtx.removeContainer(innerNodeCfg);
+ notificationCtx.removeContainer(nestedNodeConfig);
return null;
}
@@ -374,7 +389,7 @@ public class ConfigurationNotifier {
@Override
public Void visitNamedListNode(String key, NamedListNode<?> newNamedList) {
notifyPublicListeners(
- List.of(),
+ listeners(namedDynamicConfig(config, key)),
viewReadOnly(anyConfigs, anyConfig -> listeners(namedDynamicConfig(anyConfig, key))),
null,
newNamedList,
@@ -388,37 +403,37 @@ public class ConfigurationNotifier {
Collection<DynamicConfiguration<InnerNode, ?>> newAnyConfigs = null;
for (String name : newNamedList.namedListKeys()) {
- DynamicConfiguration<InnerNode, ?> newNodeCfg =
- (DynamicConfiguration<InnerNode, ?>) namedDynamicConfig(cfgNode, key).getConfig(name);
+ DynamicConfiguration<InnerNode, ?> namedNodeConfig =
+ (DynamicConfiguration<InnerNode, ?>) namedDynamicConfig(config, key).getConfig(name);
- notificationCtx.addContainer(newNodeCfg, name);
+ notificationCtx.addContainer(namedNodeConfig, name);
- InnerNode newVal = newNamedList.getInnerNode(name);
+ InnerNode namedInnerNode = newNamedList.getInnerNode(name);
notifyPublicListeners(
- List.of(),
+ extendedListeners(namedDynamicConfig(config, key)),
viewReadOnly(anyConfigs, anyConfig -> extendedListeners(namedDynamicConfig(anyConfig, key))),
null,
- newVal.specificNode(),
+ namedInnerNode.specificNode(),
notificationCtx,
ConfigurationNamedListListener::onCreate
);
if (newAnyConfigs == null) {
newAnyConfigs = mergeAnyConfigs(
- viewReadOnly(anyConfigs, cfg -> any(namedDynamicConfig(cfg, key))),
- any(namedDynamicConfig(cfgNode, key))
+ viewReadOnly(anyConfigs, anyConfig -> any(namedDynamicConfig(anyConfig, key))),
+ any(namedDynamicConfig(config, key))
);
}
- notifyAnyListenersOnCreate(
- newVal,
- newNodeCfg,
+ notifyListeners(
+ namedInnerNode,
+ namedNodeConfig,
newAnyConfigs,
notificationCtx
);
- notificationCtx.removeContainer(newNodeCfg);
+ notificationCtx.removeContainer(namedNodeConfig);
}
return null;
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java
index 8ee75f9..f876844 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java
@@ -27,6 +27,7 @@ import static org.apache.ignite.internal.configuration.notifications.Configurati
import static org.apache.ignite.internal.configuration.notifications.ConfigurationListenerTestUtils.configNamedListenerOnRename;
import static org.apache.ignite.internal.configuration.notifications.ConfigurationListenerTestUtils.configNamedListenerOnUpdate;
import static org.apache.ignite.internal.configuration.notifications.ConfigurationListenerTestUtils.doNothingConsumer;
+import static org.apache.ignite.internal.configuration.notifications.ConfigurationNotifier.notifyListeners;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -40,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -63,7 +65,9 @@ import org.apache.ignite.configuration.notifications.ConfigurationListener;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.configuration.DynamicConfiguration;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.configuration.tree.InnerNode;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -144,7 +148,7 @@ public class ConfigurationListenerTest {
private ConfigurationRegistry registry;
- private ParentConfiguration configuration;
+ private ParentConfiguration config;
/**
* Before each.
@@ -165,7 +169,7 @@ public class ConfigurationListenerTest {
registry.initializeDefaults();
- configuration = registry.getConfiguration(ParentConfiguration.KEY);
+ config = registry.getConfiguration(ParentConfiguration.KEY);
}
@AfterEach
@@ -177,7 +181,7 @@ public class ConfigurationListenerTest {
public void childNode() throws Exception {
List<String> log = new ArrayList<>();
- configuration.listen(ctx -> {
+ config.listen(ctx -> {
assertEquals(ctx.oldValue().child().str(), "default");
assertEquals(ctx.newValue().child().str(), "foo");
@@ -186,7 +190,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.child().listen(ctx -> {
+ config.child().listen(ctx -> {
assertEquals(ctx.oldValue().str(), "default");
assertEquals(ctx.newValue().str(), "foo");
@@ -195,7 +199,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.child().str().listen(ctx -> {
+ config.child().str().listen(ctx -> {
assertEquals(ctx.oldValue(), "default");
assertEquals(ctx.newValue(), "foo");
@@ -204,13 +208,13 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.children().listen(ctx -> {
+ config.children().listen(ctx -> {
log.add("elements");
return completedFuture(null);
});
- configuration.change(parent -> parent.changeChild(child -> child.changeStr("foo"))).get(1, SECONDS);
+ config.change(parent -> parent.changeChild(child -> child.changeStr("foo"))).get(1, SECONDS);
assertEquals(List.of("parent", "child", "str"), log);
}
@@ -222,19 +226,19 @@ public class ConfigurationListenerTest {
public void namedListNodeOnCreate() throws Exception {
List<String> log = new ArrayList<>();
- configuration.listen(ctx -> {
+ config.listen(ctx -> {
log.add("parent");
return completedFuture(null);
});
- configuration.child().listen(ctx -> {
+ config.child().listen(ctx -> {
log.add("child");
return completedFuture(null);
});
- configuration.children().listen(ctx -> {
+ config.children().listen(ctx -> {
assertEquals(0, ctx.oldValue().size());
ChildView newValue = ctx.newValue().get("name");
@@ -247,7 +251,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
+ config.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
/** {@inheritDoc} */
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<ChildView> ctx) {
@@ -292,7 +296,7 @@ public class ConfigurationListenerTest {
}
});
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.create("name", element -> {
}))
).get(1, SECONDS);
@@ -305,26 +309,26 @@ public class ConfigurationListenerTest {
*/
@Test
public void namedListNodeOnUpdate() throws Exception {
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.create("name", element -> {
}))
).get(1, SECONDS);
List<String> log = new ArrayList<>();
- configuration.listen(ctx -> {
+ config.listen(ctx -> {
log.add("parent");
return completedFuture(null);
});
- configuration.child().listen(ctx -> {
+ config.child().listen(ctx -> {
log.add("child");
return completedFuture(null);
});
- configuration.children().listen(ctx -> {
+ config.children().listen(ctx -> {
ChildView oldValue = ctx.oldValue().get("name");
@@ -341,7 +345,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
+ config.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
/** {@inheritDoc} */
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<ChildView> ctx) {
@@ -389,7 +393,7 @@ public class ConfigurationListenerTest {
}
});
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.createOrUpdate("name", element -> element.changeStr("foo")))
).get(1, SECONDS);
@@ -401,26 +405,26 @@ public class ConfigurationListenerTest {
*/
@Test
public void namedListNodeOnRename() throws Exception {
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.create("name", element -> {
}))
).get(1, SECONDS);
List<String> log = new ArrayList<>();
- configuration.listen(ctx -> {
+ config.listen(ctx -> {
log.add("parent");
return completedFuture(null);
});
- configuration.child().listen(ctx -> {
+ config.child().listen(ctx -> {
log.add("child");
return completedFuture(null);
});
- configuration.children().listen(ctx -> {
+ config.children().listen(ctx -> {
assertEquals(1, ctx.oldValue().size());
ChildView oldValue = ctx.oldValue().get("name");
@@ -439,7 +443,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
+ config.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
/** {@inheritDoc} */
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<ChildView> ctx) {
@@ -489,7 +493,7 @@ public class ConfigurationListenerTest {
}
});
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.rename("name", "newName"))
).get(1, SECONDS);
@@ -501,26 +505,26 @@ public class ConfigurationListenerTest {
*/
@Test
public void namedListNodeOnRenameAndUpdate() throws Exception {
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.create("name", element -> {
}))
).get(1, SECONDS);
List<String> log = new ArrayList<>();
- configuration.listen(ctx -> {
+ config.listen(ctx -> {
log.add("parent");
return completedFuture(null);
});
- configuration.child().listen(ctx -> {
+ config.child().listen(ctx -> {
log.add("child");
return completedFuture(null);
});
- configuration.children().listen(ctx -> {
+ config.children().listen(ctx -> {
assertEquals(1, ctx.oldValue().size());
ChildView oldValue = ctx.oldValue().get("name");
@@ -540,7 +544,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
+ config.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
/** {@inheritDoc} */
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<ChildView> ctx) {
@@ -591,7 +595,7 @@ public class ConfigurationListenerTest {
}
});
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements
.rename("name", "newName")
.createOrUpdate("newName", element -> element.changeStr("foo"))
@@ -606,26 +610,26 @@ public class ConfigurationListenerTest {
*/
@Test
public void namedListNodeOnDelete() throws Exception {
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.create("name", element -> {
}))
).get(1, SECONDS);
List<String> log = new ArrayList<>();
- configuration.listen(ctx -> {
+ config.listen(ctx -> {
log.add("parent");
return completedFuture(null);
});
- configuration.child().listen(ctx -> {
+ config.child().listen(ctx -> {
log.add("child");
return completedFuture(null);
});
- configuration.children().listen(ctx -> {
+ config.children().listen(ctx -> {
assertEquals(0, ctx.newValue().size());
ChildView oldValue = ctx.oldValue().get("name");
@@ -638,7 +642,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
+ config.children().listenElements(new ConfigurationNamedListListener<ChildView>() {
/** {@inheritDoc} */
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<ChildView> ctx) {
@@ -683,11 +687,11 @@ public class ConfigurationListenerTest {
}
});
- configuration.children().get("name").listen(ctx -> {
+ config.children().get("name").listen(ctx -> {
return completedFuture(null);
});
- configuration.change(parent ->
+ config.change(parent ->
parent.changeChildren(elements -> elements.delete("name"))
).get(1, SECONDS);
@@ -699,7 +703,7 @@ public class ConfigurationListenerTest {
*/
@Test
public void dataRace() throws Exception {
- configuration.change(parent -> parent.changeChildren(elements ->
+ config.change(parent -> parent.changeChildren(elements ->
elements.create("name", e -> {
}))
).get(1, SECONDS);
@@ -709,7 +713,7 @@ public class ConfigurationListenerTest {
List<String> log = new ArrayList<>();
- configuration.listen(ctx -> {
+ config.listen(ctx -> {
try {
wait.await(1, SECONDS);
} catch (InterruptedException e) {
@@ -721,7 +725,7 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- configuration.children().get("name").listen(ctx -> {
+ config.children().get("name").listen(ctx -> {
assertNull(ctx.newValue());
log.add("deleted");
@@ -729,13 +733,13 @@ public class ConfigurationListenerTest {
return completedFuture(null);
});
- final Future<Void> fut = configuration.change(parent -> parent.changeChildren(elements ->
+ final Future<Void> fut = config.change(parent -> parent.changeChildren(elements ->
elements.delete("name"))
);
wait.countDown();
- configuration.children();
+ config.children();
release.await(1, SECONDS);
@@ -754,36 +758,36 @@ public class ConfigurationListenerTest {
ConfigurationNamedListListener<ChildView> listener2 = configNamedListenerOnUpdate(ctx -> events.add("2"));
final ConfigurationNamedListListener<ChildView> listener3 = configNamedListenerOnUpdate(ctx -> events.add("3"));
- configuration.listen(listener0);
- configuration.listen(listener1);
+ config.listen(listener0);
+ config.listen(listener1);
- configuration.children().listenElements(listener2);
- configuration.children().listenElements(listener3);
+ config.children().listenElements(listener2);
+ config.children().listenElements(listener3);
- configuration.children().change(c -> c.create("0", doNothingConsumer())).get(1, SECONDS);
+ config.children().change(c -> c.create("0", doNothingConsumer())).get(1, SECONDS);
checkContainsListeners(
- () -> configuration.children().get("0").str().update(UUID.randomUUID().toString()),
+ () -> config.children().get("0").str().update(UUID.randomUUID().toString()),
events,
List.of("0", "1", "2", "3"),
List.of()
);
- configuration.stopListen(listener0);
- configuration.children().stopListenElements(listener2);
+ config.stopListen(listener0);
+ config.children().stopListenElements(listener2);
checkContainsListeners(
- () -> configuration.children().get("0").str().update(UUID.randomUUID().toString()),
+ () -> config.children().get("0").str().update(UUID.randomUUID().toString()),
events,
List.of("1", "3"),
List.of("0", "2")
);
- configuration.stopListen(listener1);
- configuration.children().stopListenElements(listener3);
+ config.stopListen(listener1);
+ config.children().stopListenElements(listener3);
checkContainsListeners(
- () -> configuration.children().get("0").str().update(UUID.randomUUID().toString()),
+ () -> config.children().get("0").str().update(UUID.randomUUID().toString()),
events,
List.of(),
List.of("0", "1", "2", "3")
@@ -794,7 +798,7 @@ public class ConfigurationListenerTest {
void testGetConfigFromNotificationEvent() throws Exception {
String newVal = UUID.randomUUID().toString();
- configuration.listen(configListener(ctx -> {
+ config.listen(configListener(ctx -> {
ParentConfiguration parent = ctx.config(ParentConfiguration.class);
assertNotNull(parent);
@@ -803,7 +807,7 @@ public class ConfigurationListenerTest {
assertEquals(newVal, parent.child().str().value());
}));
- configuration.child().listen(configListener(ctx -> {
+ config.child().listen(configListener(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
ChildConfiguration child = ctx.config(ChildConfiguration.class);
@@ -814,7 +818,7 @@ public class ConfigurationListenerTest {
assertEquals(newVal, child.str().value());
}));
- configuration.child().str().listen(configListener(ctx -> {
+ config.child().str().listen(configListener(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
ChildConfiguration child = ctx.config(ChildConfiguration.class);
@@ -825,7 +829,7 @@ public class ConfigurationListenerTest {
assertEquals(newVal, child.str().value());
}));
- configuration.change(c0 -> c0.changeChild(c1 -> c1.changeStr(newVal))).get(1, SECONDS);
+ config.change(c0 -> c0.changeChild(c1 -> c1.changeStr(newVal))).get(1, SECONDS);
}
@Test
@@ -833,7 +837,7 @@ public class ConfigurationListenerTest {
String newVal = UUID.randomUUID().toString();
String key = UUID.randomUUID().toString();
- configuration.children().listen(configListener(ctx -> {
+ config.children().listen(configListener(ctx -> {
ParentConfiguration parent = ctx.config(ParentConfiguration.class);
assertNotNull(parent);
@@ -845,7 +849,7 @@ public class ConfigurationListenerTest {
assertEquals(newVal, parent.children().get(key).str().value());
}));
- configuration.children().listenElements(configNamedListenerOnCreate(ctx -> {
+ config.children().listenElements(configNamedListenerOnCreate(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
assertNull(ctx.name(ParentConfiguration.class));
@@ -857,7 +861,7 @@ public class ConfigurationListenerTest {
assertEquals(newVal, child.str().value());
}));
- configuration.children().change(c -> c.create(key, c1 -> c1.changeStr(newVal))).get(1, SECONDS);
+ config.children().change(c -> c.create(key, c1 -> c1.changeStr(newVal))).get(1, SECONDS);
}
@Test
@@ -866,9 +870,9 @@ public class ConfigurationListenerTest {
String oldKey = UUID.randomUUID().toString();
String newKey = UUID.randomUUID().toString();
- configuration.children().change(c -> c.create(oldKey, doNothingConsumer())).get(1, SECONDS);
+ config.children().change(c -> c.create(oldKey, doNothingConsumer())).get(1, SECONDS);
- configuration.children().listen(configListener(ctx -> {
+ config.children().listen(configListener(ctx -> {
ParentConfiguration parent = ctx.config(ParentConfiguration.class);
assertNotNull(parent);
@@ -881,7 +885,7 @@ public class ConfigurationListenerTest {
assertEquals(val, parent.children().get(newKey).str().value());
}));
- configuration.children().listenElements(configNamedListenerOnRename(ctx -> {
+ config.children().listenElements(configNamedListenerOnRename(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
assertNull(ctx.name(ParentConfiguration.class));
@@ -893,16 +897,16 @@ public class ConfigurationListenerTest {
assertEquals(val, child.str().value());
}));
- configuration.children().change(c -> c.rename(oldKey, newKey));
+ config.children().change(c -> c.rename(oldKey, newKey));
}
@Test
void testGetConfigFromNotificationEventOnDelete() throws Exception {
String key = UUID.randomUUID().toString();
- configuration.children().change(c -> c.create(key, doNothingConsumer())).get(1, SECONDS);
+ config.children().change(c -> c.create(key, doNothingConsumer())).get(1, SECONDS);
- configuration.children().listen(configListener(ctx -> {
+ config.children().listen(configListener(ctx -> {
ParentConfiguration parent = ctx.config(ParentConfiguration.class);
assertNotNull(parent);
@@ -914,7 +918,7 @@ public class ConfigurationListenerTest {
assertNull(parent.children().get(key));
}));
- configuration.children().listenElements(configNamedListenerOnDelete(ctx -> {
+ config.children().listenElements(configNamedListenerOnDelete(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
assertNull(ctx.name(ParentConfiguration.class));
@@ -922,7 +926,7 @@ public class ConfigurationListenerTest {
assertEquals(key, ctx.name(ChildConfiguration.class));
}));
- configuration.children().get(key).listen(configListener(ctx -> {
+ config.children().get(key).listen(configListener(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
assertNull(ctx.name(ParentConfiguration.class));
@@ -930,7 +934,7 @@ public class ConfigurationListenerTest {
assertEquals(key, ctx.name(ChildConfiguration.class));
}));
- configuration.children().change(c -> c.delete(key)).get(1, SECONDS);
+ config.children().change(c -> c.delete(key)).get(1, SECONDS);
}
@Test
@@ -938,9 +942,9 @@ public class ConfigurationListenerTest {
String newVal = UUID.randomUUID().toString();
String key = UUID.randomUUID().toString();
- configuration.children().change(c -> c.create(key, doNothingConsumer())).get(1, SECONDS);
+ config.children().change(c -> c.create(key, doNothingConsumer())).get(1, SECONDS);
- configuration.children().listen(configListener(ctx -> {
+ config.children().listen(configListener(ctx -> {
ParentConfiguration parent = ctx.config(ParentConfiguration.class);
assertNotNull(parent);
@@ -952,7 +956,7 @@ public class ConfigurationListenerTest {
assertEquals(newVal, parent.children().get(key).str().value());
}));
- configuration.children().listenElements(configNamedListenerOnUpdate(ctx -> {
+ config.children().listenElements(configNamedListenerOnUpdate(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
assertNull(ctx.name(ParentConfiguration.class));
@@ -964,7 +968,7 @@ public class ConfigurationListenerTest {
assertEquals(newVal, child.str().value());
}));
- configuration.children().get(key).listen(configListener(ctx -> {
+ config.children().get(key).listen(configListener(ctx -> {
assertNotNull(ctx.config(ParentConfiguration.class));
assertNull(ctx.name(ParentConfiguration.class));
@@ -976,26 +980,26 @@ public class ConfigurationListenerTest {
assertEquals(newVal, child.str().value());
}));
- configuration.children().get(key).str().update(newVal).get(1, SECONDS);
+ config.children().get(key).str().update(newVal).get(1, SECONDS);
}
@Test
void polymorphicParentFieldChangeNotificationHappens() throws Exception {
AtomicInteger intHolder = new AtomicInteger();
- configuration.polyChild().commonIntVal().listen(event -> {
+ config.polyChild().commonIntVal().listen(event -> {
intHolder.set(event.newValue());
return CompletableFuture.completedFuture(null);
});
- configuration.polyChild().commonIntVal().update(42).get(1, SECONDS);
+ config.polyChild().commonIntVal().update(42).get(1, SECONDS);
assertThat(intHolder.get(), is(42));
}
@Test
void testNotificationEventConfigForNestedConfiguration() throws Exception {
- configuration.child().listen(ctx -> {
+ config.child().listen(ctx -> {
assertInstanceOf(ChildConfiguration.class, ctx.config(ChildConfiguration.class));
assertInstanceOf(InternalChildConfiguration.class, ctx.config(InternalChildConfiguration.class));
@@ -1005,12 +1009,12 @@ public class ConfigurationListenerTest {
return CompletableFuture.completedFuture(null);
});
- configuration.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS);
+ config.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS);
}
@Test
void testNotificationEventConfigForNamedConfiguration() throws Exception {
- configuration.children().listenElements(new ConfigurationNamedListListener<>() {
+ config.children().listenElements(new ConfigurationNamedListListener<>() {
/** {@inheritDoc} */
@Override
public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<ChildView> ctx) {
@@ -1064,15 +1068,15 @@ public class ConfigurationListenerTest {
}
});
- configuration.children().change(c -> c.create("0", c1 -> {})).get(1, SECONDS);
- configuration.children().change(c -> c.rename("0", "1")).get(1, SECONDS);
- configuration.children().change(c -> c.update("1", c1 -> c1.changeStr(UUID.randomUUID().toString()))).get(1, SECONDS);
- configuration.children().change(c -> c.delete("1")).get(1, SECONDS);
+ config.children().change(c -> c.create("0", c1 -> {})).get(1, SECONDS);
+ config.children().change(c -> c.rename("0", "1")).get(1, SECONDS);
+ config.children().change(c -> c.update("1", c1 -> c1.changeStr(UUID.randomUUID().toString()))).get(1, SECONDS);
+ config.children().change(c -> c.delete("1")).get(1, SECONDS);
}
@Test
void testNotificationEventConfigForNestedPolymorphicConfiguration() throws Exception {
- configuration.polyChild().listen(ctx -> {
+ config.polyChild().listen(ctx -> {
assertInstanceOf(PolyConfiguration.class, ctx.config(PolyConfiguration.class));
assertInstanceOf(StringPolyConfiguration.class, ctx.config(StringPolyConfiguration.class));
@@ -1085,12 +1089,12 @@ public class ConfigurationListenerTest {
return CompletableFuture.completedFuture(null);
});
- configuration.polyChild().commonIntVal().update(22).get(1, SECONDS);
+ config.polyChild().commonIntVal().update(22).get(1, SECONDS);
}
@Test
void testNotificationEventConfigForNamedPolymorphicConfiguration() throws Exception {
- configuration.polyChildren().listenElements(new ConfigurationNamedListListener<>() {
+ config.polyChildren().listenElements(new ConfigurationNamedListListener<>() {
/** {@inheritDoc} */
@Override
public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<PolyView> ctx) {
@@ -1159,17 +1163,17 @@ public class ConfigurationListenerTest {
}
});
- configuration.polyChildren().change(c -> c.create("0", c1 -> {})).get(1, SECONDS);
- configuration.polyChildren().change(c -> c.rename("0", "1")).get(1, SECONDS);
- configuration.polyChildren().change(c -> c.update("1", c1 -> c1.changeCommonIntVal(22))).get(1, SECONDS);
- configuration.polyChildren().change(c -> c.delete("1")).get(1, SECONDS);
+ config.polyChildren().change(c -> c.create("0", c1 -> {})).get(1, SECONDS);
+ config.polyChildren().change(c -> c.rename("0", "1")).get(1, SECONDS);
+ config.polyChildren().change(c -> c.update("1", c1 -> c1.changeCommonIntVal(22))).get(1, SECONDS);
+ config.polyChildren().change(c -> c.delete("1")).get(1, SECONDS);
}
@Test
void testNotificationListenerForNestedPolymorphicConfig() throws Exception {
AtomicBoolean invokeListener = new AtomicBoolean();
- configuration.polyChild().listen(configListener(ctx -> {
+ config.polyChild().listen(configListener(ctx -> {
invokeListener.set(true);
assertInstanceOf(PolyView.class, ctx.newValue());
@@ -1188,7 +1192,7 @@ public class ConfigurationListenerTest {
assertNull(ctx.name(StringPolyConfiguration.class));
}));
- configuration.polyChild()
+ config.polyChild()
.change(c -> c.convert(LongPolyChange.class).changeSpecificVal(0).changeCommonIntVal(0))
.get(1, SECONDS);
@@ -1199,7 +1203,7 @@ public class ConfigurationListenerTest {
void testNotificationListenerOnCreateNamedPolymorphicConfig() throws Exception {
AtomicBoolean invokeListener = new AtomicBoolean();
- configuration.polyChildren().listenElements(configNamedListenerOnCreate(ctx -> {
+ config.polyChildren().listenElements(configNamedListenerOnCreate(ctx -> {
invokeListener.set(true);
assertInstanceOf(PolyView.class, ctx.newValue());
@@ -1218,7 +1222,7 @@ public class ConfigurationListenerTest {
assertNull(ctx.name(LongPolyConfiguration.class));
}));
- configuration.polyChildren()
+ config.polyChildren()
.change(c -> c.create("0", c1 -> c1.convert(StringPolyChange.class).changeSpecificVal("").changeCommonIntVal(0)))
.get(1, SECONDS);
@@ -1227,13 +1231,13 @@ public class ConfigurationListenerTest {
@Test
void testNotificationListenerOnUpdateNamedPolymorphicConfig() throws Exception {
- configuration.polyChildren()
+ config.polyChildren()
.change(c -> c.create("0", c1 -> c1.convert(StringPolyChange.class).changeSpecificVal("").changeCommonIntVal(0)))
.get(1, SECONDS);
AtomicBoolean invokeListener = new AtomicBoolean();
- configuration.polyChildren().listenElements(configNamedListenerOnUpdate(ctx -> {
+ config.polyChildren().listenElements(configNamedListenerOnUpdate(ctx -> {
invokeListener.set(true);
assertInstanceOf(PolyView.class, ctx.newValue());
@@ -1253,7 +1257,7 @@ public class ConfigurationListenerTest {
assertNull(ctx.name(StringPolyConfiguration.class));
}));
- configuration.polyChildren()
+ config.polyChildren()
.change(c -> c.update("0", c1 -> c1.convert(LongPolyChange.class).changeSpecificVal(0).changeCommonIntVal(0)))
.get(1, SECONDS);
@@ -1262,13 +1266,13 @@ public class ConfigurationListenerTest {
@Test
void testNotificationListenerOnRenameNamedPolymorphicConfig() throws Exception {
- configuration.polyChildren()
+ config.polyChildren()
.change(c -> c.create("0", c1 -> c1.convert(StringPolyChange.class).changeSpecificVal("").changeCommonIntVal(0)))
.get(1, SECONDS);
AtomicBoolean invokeListener = new AtomicBoolean();
- configuration.polyChildren().listenElements(configNamedListenerOnRename(ctx -> {
+ config.polyChildren().listenElements(configNamedListenerOnRename(ctx -> {
invokeListener.set(true);
assertInstanceOf(PolyView.class, ctx.newValue());
@@ -1288,7 +1292,7 @@ public class ConfigurationListenerTest {
assertNull(ctx.name(LongPolyConfiguration.class));
}));
- configuration.polyChildren()
+ config.polyChildren()
.change(c -> c.rename("0", "1"))
.get(1, SECONDS);
@@ -1297,13 +1301,13 @@ public class ConfigurationListenerTest {
@Test
void testNotificationListenerOnDeleteNamedPolymorphicConfig() throws Exception {
- configuration.polyChildren()
+ config.polyChildren()
.change(c -> c.create("0", c1 -> c1.convert(StringPolyChange.class).changeSpecificVal("").changeCommonIntVal(0)))
.get(1, SECONDS);
AtomicBoolean invokeListener = new AtomicBoolean();
- configuration.polyChildren().listenElements(configNamedListenerOnDelete(ctx -> {
+ config.polyChildren().listenElements(configNamedListenerOnDelete(ctx -> {
invokeListener.set(true);
assertNull(ctx.newValue());
@@ -1321,7 +1325,7 @@ public class ConfigurationListenerTest {
assertNull(ctx.name(LongPolyConfiguration.class));
}));
- configuration.polyChildren()
+ config.polyChildren()
.change(c -> c.delete("0"))
.get(1, SECONDS);
@@ -1332,9 +1336,9 @@ public class ConfigurationListenerTest {
void testNotificationEventForNestedConfigAfterNotifyListeners() throws Exception {
AtomicReference<ConfigurationNotificationEvent<?>> eventRef = new AtomicReference<>();
- configuration.child().listen(configListener(eventRef::set));
+ config.child().listen(configListener(eventRef::set));
- configuration.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS);
+ config.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS);
ConfigurationNotificationEvent<?> event = eventRef.get();
@@ -1359,9 +1363,9 @@ public class ConfigurationListenerTest {
void testNotificationEventForNamedConfigAfterNotifyListeners() throws Exception {
AtomicReference<ConfigurationNotificationEvent<?>> eventRef = new AtomicReference<>();
- configuration.children().listenElements(configNamedListenerOnCreate(eventRef::set));
+ config.children().listenElements(configNamedListenerOnCreate(eventRef::set));
- configuration.children().change(c -> c.create("0", doNothingConsumer())).get(1, SECONDS);
+ config.children().change(c -> c.create("0", doNothingConsumer())).get(1, SECONDS);
ConfigurationNotificationEvent<?> event = eventRef.get();
@@ -1383,13 +1387,13 @@ public class ConfigurationListenerTest {
@Test
void testGetErrorFromListener() {
- configuration.child().listen(configListener(ctx -> {
+ config.child().listen(configListener(ctx -> {
throw new RuntimeException("from test");
}));
ExecutionException ex = assertThrows(
ExecutionException.class,
- () -> configuration.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS)
+ () -> config.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS)
);
assertTrue(hasCause(ex, RuntimeException.class, "from test"));
@@ -1397,13 +1401,99 @@ public class ConfigurationListenerTest {
@Test
void testGetErrorFromListenerFuture() {
- configuration.child().listen(ctx -> CompletableFuture.failedFuture(new RuntimeException("from test")));
+ config.child().listen(ctx -> CompletableFuture.failedFuture(new RuntimeException("from test")));
ExecutionException ex = assertThrows(
ExecutionException.class,
- () -> configuration.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS)
+ () -> config.child().str().update(UUID.randomUUID().toString()).get(1, SECONDS)
);
assertTrue(hasCause(ex, RuntimeException.class, "from test"));
}
+
+ @Test
+ void testNotifyListenersOnCurrentConfigWithoutChange() throws Exception {
+ config.children().change(c -> c.create("0", doNothingConsumer())).get(1, SECONDS);
+
+ config.polyChildren().change(c -> c.create("0", doNothingConsumer())).get(1, SECONDS);
+
+ List<String> events = new ArrayList<>();
+
+ config.listen(configListener(ctx -> events.add("root")));
+
+ config.child().listen(configListener(ctx -> events.add("child")));
+ config.child().str().listen(configListener(ctx -> events.add("child.str")));
+
+ config.children().listen(configListener(ctx -> events.add("children")));
+ config.children().listenElements(configNamedListenerOnCreate(ctx -> events.add("children.onCreate")));
+ config.children().listenElements(configNamedListenerOnUpdate(ctx -> events.add("children.onUpdate")));
+ config.children().listenElements(configNamedListenerOnRename(ctx -> events.add("children.onRename")));
+ config.children().listenElements(configNamedListenerOnDelete(ctx -> events.add("children.onDelete")));
+
+ config.children().get("0").listen(configListener(ctx -> events.add("children.0")));
+ config.children().get("0").str().listen(configListener(ctx -> events.add("children.0.str")));
+
+ config.children().any().listen(configListener(ctx -> events.add("children.any")));
+ config.children().any().str().listen(configListener(ctx -> events.add("children.any.str")));
+
+ // Polymorphic configs.
+
+ config.polyChild().listen(configListener(ctx -> events.add("polyChild")));
+ config.polyChild().commonIntVal().listen(configListener(ctx -> events.add("polyChild.int")));
+ ((StringPolyConfiguration) config.polyChild()).specificVal().listen(configListener(ctx -> events.add("polyChild.str")));
+
+ config.polyChildren().listen(configListener(ctx -> events.add("polyChildren")));
+ config.polyChildren().listenElements(configNamedListenerOnCreate(ctx -> events.add("polyChildren.onCreate")));
+ config.polyChildren().listenElements(configNamedListenerOnUpdate(ctx -> events.add("polyChildren.onUpdate")));
+ config.polyChildren().listenElements(configNamedListenerOnRename(ctx -> events.add("polyChildren.onRename")));
+ config.polyChildren().listenElements(configNamedListenerOnDelete(ctx -> events.add("polyChildren.onDelete")));
+
+ config.polyChildren().get("0").listen(configListener(ctx -> events.add("polyChildren.0")));
+ config.polyChildren().get("0").commonIntVal().listen(configListener(ctx -> events.add("polyChildren.0.int")));
+ ((StringPolyConfiguration) config.polyChildren().get("0")).specificVal()
+ .listen(configListener(ctx -> events.add("polyChildren.0.str")));
+
+ config.polyChildren().any().listen(configListener(ctx -> events.add("polyChildren.any")));
+ config.polyChildren().any().commonIntVal().listen(configListener(ctx -> events.add("polyChildren.any.int")));
+
+ Collection<CompletableFuture<?>> futs = notifyListeners(
+ null,
+ (InnerNode) config.value(),
+ (DynamicConfiguration) config,
+ 0
+ );
+
+ for (CompletableFuture<?> fut : futs) {
+ fut.get(1, SECONDS);
+ }
+
+ assertEquals(
+ List.of(
+ "root",
+ "child", "child.str",
+ "children", "children.onCreate", "children.any", "children.0",
+ "children.any.str", "children.0.str",
+ "polyChild", "polyChild.int", "polyChild.str",
+ "polyChildren", "polyChildren.onCreate", "polyChildren.any", "polyChildren.0",
+ "polyChildren.any.int", "polyChildren.0.int", "polyChildren.0.str"
+ ),
+ events
+ );
+ }
+
+ @Test
+ void testNotifyCurrentConfigurationListeners() throws Exception {
+ AtomicBoolean invokeListener = new AtomicBoolean();
+
+ config.listen(configListener(ctx -> {
+ invokeListener.set(true);
+
+ assertNull(ctx.oldValue());
+ assertNotNull(ctx.newValue());
+ }));
+
+ registry.notifyCurrentConfigurationListeners().get(1, SECONDS);
+
+ assertTrue(invokeListener.get());
+ }
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a98bc0c..0a5afe3 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
@@ -326,6 +327,8 @@ public class IgniteImpl implements Ignite {
doStartComponent(name, startedComponents, component);
}
+ notifyConfigurationListeners();
+
// Deploy all registered watches because all components are ready and have registered their listeners.
metaStorageMgr.deployWatches();
@@ -479,6 +482,18 @@ public class IgniteImpl implements Ignite {
}
/**
+ * Notify all listeners of current configurations.
+ *
+ * @throws Exception If failed.
+ */
+ private void notifyConfigurationListeners() throws Exception {
+ CompletableFuture.allOf(
+ nodeConfiguration().notifyCurrentConfigurationListeners(),
+ clusterConfiguration().notifyCurrentConfigurationListeners()
+ ).get();
+ }
+
+ /**
* Starts the Vault component.
*/
private static VaultManager createVault(Path workDir) {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b8659cd..86fd5c8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -193,7 +193,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override
public void start() {
tablesCfg.tables()
- .listenElements(new ConfigurationNamedListListener<TableView>() {
+ .listenElements(new ConfigurationNamedListListener<>() {
@Override
public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
if (!busyLock.enterBusy()) {
@@ -202,7 +202,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
fireEvent(TableEvent.CREATE,
new TableEventParameters(tblId, tblName),
- new NodeStoppingException());
+ new NodeStoppingException()
+ );
return CompletableFuture.completedFuture(new NodeStoppingException());
}
@@ -235,21 +236,29 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
.listenElements(new ConfigurationNamedListListener<>() {
@Override
public @NotNull CompletableFuture<?> onCreate(
- @NotNull ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ @NotNull ConfigurationNotificationEvent<SchemaView> schemasCtx
+ ) {
if (!busyLock.enterBusy()) {
- fireEvent(TableEvent.ALTER, new TableEventParameters(tblId, tblName),
- new NodeStoppingException());
+ fireEvent(
+ TableEvent.ALTER,
+ new TableEventParameters(tblId, tblName),
+ new NodeStoppingException()
+ );
return CompletableFuture.completedFuture(new NodeStoppingException());
}
try {
- ((SchemaRegistryImpl) tables.get(tblName).schemaView())
- .onSchemaRegistered(
- SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
- );
-
- fireEvent(TableEvent.ALTER, new TableEventParameters(tablesById.get(tblId)), null);
+ // Avoid calling listener immediately after the listener completes to create the current table.
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-16231
+ if (ctx.storageRevision() != schemasCtx.storageRevision()) {
+ ((SchemaRegistryImpl) tables.get(tblName).schemaView())
+ .onSchemaRegistered(
+ SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
+ );
+
+ fireEvent(TableEvent.ALTER, new TableEventParameters(tablesById.get(tblId)), null);
+ }
} catch (Exception e) {
fireEvent(TableEvent.ALTER, new TableEventParameters(tblId, tblName), e);
} finally {
@@ -258,25 +267,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return CompletableFuture.completedFuture(null);
}
-
- @Override
- public @NotNull CompletableFuture<?> onRename(@NotNull String oldName,
- @NotNull String newName,
- @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public @NotNull CompletableFuture<?> onDelete(
- @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public @NotNull CompletableFuture<?> onUpdate(
- @NotNull ConfigurationNotificationEvent<SchemaView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
});
((ExtendedTableConfiguration) tablesCfg.tables().get(tblName)).assignments()
@@ -286,7 +276,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
try {
- return updateAssignmentInternal(tblId, assignmentsCtx);
+ // Avoid calling listener immediately after the listener completes to create the current table.
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-16231
+ if (ctx.storageRevision() == assignmentsCtx.storageRevision()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return updateAssignmentInternal(tblId, assignmentsCtx);
+ }
} finally {
busyLock.leaveBusy();
}
@@ -302,8 +298,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
@NotNull
- private CompletableFuture<?> updateAssignmentInternal(IgniteUuid tblId,
- @NotNull ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
+ private CompletableFuture<?> updateAssignmentInternal(
+ IgniteUuid tblId,
+ @NotNull ConfigurationNotificationEvent<byte[]> assignmentsCtx
+ ) {
List<List<ClusterNode>> oldAssignments =
(List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.oldValue());
@@ -368,8 +366,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String tblName = ctx.oldValue().name();
IgniteUuid tblId = IgniteUuid.fromString(((ExtendedTableView) ctx.oldValue()).id());
- fireEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName),
- new NodeStoppingException());
+ fireEvent(
+ TableEvent.DROP,
+ new TableEventParameters(tblId, tblName),
+ new NodeStoppingException()
+ );
return CompletableFuture.completedFuture(new NodeStoppingException());
}
@@ -386,11 +387,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return CompletableFuture.completedFuture(null);
}
-
- @Override
- public @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<TableView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
});
engine.start();
@@ -580,13 +576,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return getSchemaDescriptorLocally(schemaVer, tblCfg);
}
- CompletableFuture<SchemaDescriptor> fur = new CompletableFuture<>();
+ CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
var clo = new EventListener<TableEventParameters>() {
@Override
public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
if (tblId.equals(parameters.tableId()) && schemaVer <= parameters.table().schemaView().lastSchemaVersion()) {
- fur.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+ fut.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
return true;
}
@@ -596,21 +592,21 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override
public void remove(@NotNull Throwable exception) {
- fur.completeExceptionally(exception);
+ fut.completeExceptionally(exception);
}
};
listen(TableEvent.ALTER, clo);
if (schemaVer <= table.schemaView().lastSchemaVersion()) {
- fur.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+ fut.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
}
- if (!isSchemaExists(tblId, schemaVer) && fur.complete(null)) {
+ if (!isSchemaExists(tblId, schemaVer) && fut.complete(null)) {
removeListener(TableEvent.ALTER, clo);
}
- return fur.get();
+ return fut.get();
} catch (InterruptedException | ExecutionException e) {
throw new SchemaException("Can't read schema from vault: ver=" + schemaVer, e);
}