You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/12/02 10:06:47 UTC

[GitHub] [ignite-3] ibessonov commented on a change in pull request #483: IGNITE-15891 Configuration does not concerned global state when handling a consumer

ibessonov commented on a change in pull request #483:
URL: https://github.com/apache/ignite-3/pull/483#discussion_r760930600



##########
File path: modules/configuration/src/main/java/org/apache/ignite/internal/configuration/storage/ConfigurationStorage.java
##########
@@ -67,4 +67,9 @@
      * @return Type of this configuration storage.
      */
     ConfigurationType type();
+
+    /**
+     * Returns a future that will be completed when the latest revision of the storage is received.
+     */
+    CompletableFuture<Long> revisionLatest();

Review comment:
       Interesting words order

##########
File path: modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
##########
@@ -374,68 +374,94 @@ public SuperRoot superRoot() {
     }
 
     /**
-     * Internal configuration change method that completes provided future.
+     * Entry point for configuration changes.
      *
      * @param src Configuration source.
      * @return fut Future that will be completed after changes are written to the storage.
      */
     private CompletableFuture<Void> changeInternally(ConfigurationSource src) {
         StorageRoots localRoots = storageRoots;
 
+        return storage.revisionLatest()
+            .thenCompose(storageRevision -> {
+                assert storageRevision != null;
+
+                if (localRoots.version < storageRevision) {
+                    // Need to wait for the configuration updates from the storage, then try to update again (loop).
+                    return localRoots.changeFuture.thenCompose(v -> changeInternally(src));
+                } else {
+                    return changeInternally0(localRoots, src);
+                }
+            })
+            .exceptionally(throwable -> {
+                Throwable cause = throwable.getCause();
+
+                if (cause instanceof ConfigurationChangeException) {
+                    throw ((ConfigurationChangeException) cause);
+                } else {
+                    throw new ConfigurationChangeException("Failed to change configuration", cause);
+                }
+            });
+    }
+
+    /**
+     * Internal configuration change method that completes provided future.
+     *
+     * @param src Configuration source.
+     * @return fut Future that will be completed after changes are written to the storage.
+     */
+    private CompletableFuture<Void> changeInternally0(StorageRoots localRoots, ConfigurationSource src) {
         return CompletableFuture
-                .supplyAsync(() -> {
-                    SuperRoot curRoots = localRoots.roots;
+            .supplyAsync(() -> {
+                SuperRoot curRoots = localRoots.roots;
 
-                    SuperRoot changes = curRoots.copy();
+                SuperRoot changes = curRoots.copy();
 
-                    src.reset();
+                src.reset();
 
-                    src.descend(changes);
+                src.descend(changes);
 
-                    addDefaults(changes);
+                addDefaults(changes);
 
-                    Map<String, Serializable> allChanges = createFlattenedUpdatesMap(curRoots, changes);
+                Map<String, Serializable> allChanges = createFlattenedUpdatesMap(curRoots, changes);
 
-                    // Unlikely but still possible.
-                    if (allChanges.isEmpty()) {
-                        return null;
-                    }
+                // Unlikely but still possible.
+                if (allChanges.isEmpty()) {
+                    return null;
+                }
 
-                    dropNulls(changes);
+                dropNulls(changes);
 
-                    List<ValidationIssue> validationIssues = ValidationUtil.validate(
-                            curRoots,
-                            changes,
-                            this::getRootNode,
-                            cachedAnnotations,
-                            validators
-                    );
+                List<ValidationIssue> validationIssues = ValidationUtil.validate(
+                        curRoots,
+                        changes,
+                        this::getRootNode,
+                        cachedAnnotations,
+                        validators
+                );
 
-                    if (!validationIssues.isEmpty()) {
-                        throw new ConfigurationValidationException(validationIssues);
-                    }
+                if (!validationIssues.isEmpty()) {
+                    throw new ConfigurationValidationException(validationIssues);
+                }
 
-                    return allChanges;
-                }, pool)
-                .thenCompose(allChanges -> {
-                    if (allChanges == null) {
-                        return completedFuture(null);
-                    }
+                return allChanges;
+            }, pool)
+            .thenCompose(allChanges -> {
+                if (allChanges == null) {

Review comment:
       Maybe we need to remove this "optimization" as I asked you

##########
File path: modules/configuration/src/test/java/org/apache/ignite/internal/configuration/hocon/HoconConverterTest.java
##########
@@ -614,7 +614,7 @@ private void change(String hocon) throws Throwable {
         try {
             registry.change(hoconSource(ConfigFactory.parseString(hocon).root())).get(1, SECONDS);
         } catch (ExecutionException e) {
-            throw e.getCause();
+            throw e.getCause().getCause();

Review comment:
       Wow, please add a comment

##########
File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -784,20 +785,26 @@ public void remove(@NotNull Throwable e) {
                     }
             );
         }).exceptionally(t -> {
-            Throwable ex = t.getCause();
+            Throwable cause = t.getCause();

Review comment:
       What a mess, I think this code deserves some comments as well

##########
File path: modules/configuration/src/test/java/org/apache/ignite/internal/configuration/storage/TestConfigurationStorage.java
##########
@@ -128,4 +128,18 @@ public synchronized void registerConfigurationListener(@NotNull ConfigurationSto
     public ConfigurationType type() {
         return configurationType;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public synchronized CompletableFuture<Long> revisionLatest() {
+        return CompletableFuture.completedFuture(version);
+    }
+
+    public synchronized long incrementAndGetRevision() {

Review comment:
       please document what it is, people won't understand these methods by themselves

##########
File path: modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
##########
@@ -466,6 +470,60 @@ public void testGetLatestMissingKey() throws Exception {
         assertThat(e.getMessage(), containsString("def.childrenList.name.defStr"));
     }
 
+    /**
+     * Check that {@link ConfigurationStorage#revisionLatest} always returns the latest revision of the storage,
+     * and that the change will only be applied on the last revision of the storage.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    void testLatestRevision() throws Exception {
+        ConfigurationChanger changer = createChanger(DefaultsConfiguration.KEY);
+
+        changer.start();
+
+        changer.initializeDefaults();
+
+        assertEquals(0, storage.revisionLatest().get(1, SECONDS));
+
+        changer.change(source(DefaultsConfiguration.KEY, (DefaultsChange c) -> c.changeDefStr("test0"))).get(1, SECONDS);
+        assertEquals(1, storage.revisionLatest().get(1, SECONDS));
+
+        // Increase the revision so that the change waits for the latest revision of the configuration to be received from the storage.
+        storage.incrementAndGetRevision();
+        assertEquals(2, storage.revisionLatest().get(1, SECONDS));
+
+        AtomicInteger invokeConsumerCnt = new AtomicInteger();
+
+        CompletableFuture<Void> changeFut = changer.change(source(
+                DefaultsConfiguration.KEY,
+                (DefaultsChange c) -> {
+                    invokeConsumerCnt.incrementAndGet();
+
+                    try {
+                        // Let's check that the consumer will be called on the last revision of the repository.
+                        assertEquals(2, storage.revisionLatest().get(1, SECONDS));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);

Review comment:
       `fail(e.getMessage())` maybe?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org