You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/28 14:51:05 UTC

[GitHub] [ignite-3] sergeyuttsel opened a new pull request, #1485: IGNITE-18470 Exception handling in DistributionZoneManager and was reworked.

sergeyuttsel opened a new pull request, #1485:
URL: https://github.com/apache/ignite-3/pull/1485

   https://issues.apache.org/jira/browse/IGNITE-18470


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1485: IGNITE-18470 Exception handling in DistributionZoneManager was reworked.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060557603


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
                 new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut;
+
+        fut = distributionZoneManager.createZone(
+                new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+        );
+
         try {
-            distributionZoneManager.createZone(
-                    new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-            ).get(5, TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
 
         assertTrue(e != null);
-        assertTrue(e.getCause().getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());
+        assertTrue(e.getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());
     }
 
     @Test
-    public void testDropZoneIfNotExists() {
+    public void testDropZoneIfNotExists() throws NodeStoppingException {

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sanpwc merged pull request #1485: IGNITE-18470 Exception handling in DistributionZoneManager was reworked.

Posted by GitBox <gi...@apache.org>.
sanpwc merged PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sk0x50 commented on a diff in pull request #1485: IGNITE-18470 Exception handling in DistributionZoneManager was reworked.

Posted by GitBox <gi...@apache.org>.
sk0x50 commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060339037


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
                 new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut;
+
+        fut = distributionZoneManager.createZone(
+                new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+        );
+
         try {
-            distributionZoneManager.createZone(
-                    new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-            ).get(5, TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
 
         assertTrue(e != null);
-        assertTrue(e.getCause().getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());
+        assertTrue(e.getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());
     }
 
     @Test
-    public void testDropZoneIfNotExists() {
+    public void testDropZoneIfNotExists() throws NodeStoppingException {

Review Comment:
   This test does not throw `NodeStoppingException`. Please check and update the test in accordance. (here and below)



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -483,47 +560,206 @@ private void initMetaStorageKeysOnStart() {
             }
 
             try {
-                long topologyVersionFromCmg = snapshot.version();
-
-                byte[] topVerFromMetastorage;
+                metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                    }
 
-                try {
-                    topVerFromMetastorage = metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, e);
-                }
+                    try {
+                        long topologyVersionFromCmg = snapshot.version();
 
-                if (topVerFromMetastorage == null || ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
-                    Set<String> topologyFromCmg = snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+                        byte[] topVerFromMetaStorage = topVerEntry.value();
 
-                    Condition topologyVersionCondition = topVerFromMetastorage == null ? notExists(zonesLogicalTopologyVersionKey()) :
-                            value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+                        if (topVerFromMetaStorage == null || bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+                            Set<String> topologyFromCmg = snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
 
-                    If iff = If.iif(topologyVersionCondition,
-                            updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
-                            ops().yield(false)
-                    );
+                            Condition topologyVersionCondition = topVerFromMetaStorage == null
+                                    ? notExists(zonesLogicalTopologyVersionKey()) :
+                                    value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
 
-                    metaStorageManager.invoke(iff).thenAccept(res -> {
-                        if (res.getAsBoolean()) {
-                            LOG.debug(
-                                    "Distribution zones' logical topology and version keys were initialised [topology = {}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
-                            );
-                        } else {
-                            LOG.debug(
-                                    "Failed to initialize distribution zones' logical topology "
-                                            + "and version keys [topology = {}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
+                            If iff = If.iif(topologyVersionCondition,
+                                    updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+                                    ops().yield(false)
                             );
+
+                            metaStorageManager.invoke(iff).thenAccept(res -> {
+                                if (res.getAsBoolean()) {
+                                    LOG.debug(
+                                            "Distribution zones' logical topology and version keys were initialised "
+                                                    + "[topology = {}, version = {}]",
+                                            Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                } else {
+                                    LOG.debug(
+                                            "Failed to initialize distribution zones' logical topology "
+                                                    + "and version keys [topology = {}, version = {}]",
+                                            Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                }
+                            });
                         }
-                    });
-                }
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+
             } finally {
                 busyLock.leaveBusy();
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        vaultMgr.get(APPLIED_REV)
+                .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                .thenAccept(vaultAppliedRevision -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                    }
+
+                    try {
+                        vaultMgr.get(zonesLogicalTopologyKey())
+                                .thenAccept(vaultEntry -> {
+                                    if (!busyLock.enterBusy()) {
+                                        throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                                    }
+
+                                    try {
+                                        if (vaultEntry != null && vaultEntry.value() != null) {
+                                            logicalTopology = ByteUtils.fromBytes(vaultEntry.value());
+
+                                            zonesConfiguration.distributionZones().value().namedListKeys()
+                                                    .forEach(zoneName -> {
+                                                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                                                        saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+                                                    });
+                                        }
+                                    } finally {
+                                        busyLock.leaveBusy();
+                                    }
+                                });
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of distribution zones on logical topology changing event.
+     *
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<?> registerMetaStorageWatchListener() {
+        return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> !newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param dataNodes Data nodes of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long revision) {
+        Update dataNodesAndTriggerKeyUpd = updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+        var iif = If.iif(triggerKeyCondition(revision), dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+        metaStorageManager.invoke(iif).thenAccept(res -> {
+            if (res.getAsBoolean()) {
+                LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+            } else {
+                LOG.debug("Failed to delete zones' dataNodes key [zoneId = {}]", zoneId);
+            }
+        });
+    }
+
+    /**
+     * Unwraps distribution zone exceptions from {@link ConfigurationChangeException} if it is possible.
+     */
+    private static Throwable unwrapDistributionZoneException(Throwable e) {

Review Comment:
   This method looks weird to me, especially `e.getCause().getCause()` part. I would propose something like the following:
   
   ```
       private static Throwable unwrapDistributionZoneException(Throwable e, Class<? extends Throwable>... expectedClz) {
           Throwable ret = unwrapDistributionZoneExceptionRecursively(e, expectedClz);
   
           return ret != null ? ret : e;
       }
   
       private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e, Class<? extends Throwable>... expectedClz) {
           if ((e instanceof CompletionException || e instanceof ConfigurationChangeException) && e.getCause() != null) {
               return unwrapDistributionZoneExceptionRecursively(e.getCause(), expectedClz);
           }
   
           for (Class<?> expected : expectedClz) {
               if (expected.isAssignableFrom(e.getClass()))
                   return e;
           }
   
           return null;
       }
   ```
   
   By the way, it allows you clearly to define expected types of exceptions:
   ```
       public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameters distributionZoneCfg) {
           ....
               })).whenComplete((res, e) -> {
                   if (e != null) {
                       fut.completeExceptionally(unwrapDistributionZoneException(e, DistributionZoneAlreadyExistsException.class, ConfigurationValidationException.class));
                   } else {
                       fut.complete(null);
                   }
               });
   ```
   
   What do you think?



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
                 new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut;
+
+        fut = distributionZoneManager.createZone(
+                new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+        );
+
         try {
-            distributionZoneManager.createZone(
-                    new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-            ).get(5, TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
 
         assertTrue(e != null);
-        assertTrue(e.getCause().getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());
+        assertTrue(e.getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());

Review Comment:
   I like the approach that always requires a message, it looks a lot friendlier.
   ```
       assertTrue("Expected exception was not thrown.", e != null);
       assertTrue("Unexpected type of exception (requires DistributionZoneAlreadyExistsException)", e.getCause() instanceof DistributionZoneAlreadyExistsException);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1485: IGNITE-18470 Exception handling in DistributionZoneManager was reworked.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060559895


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -483,47 +560,206 @@ private void initMetaStorageKeysOnStart() {
             }
 
             try {
-                long topologyVersionFromCmg = snapshot.version();
-
-                byte[] topVerFromMetastorage;
+                metaStorageManager.get(zonesLogicalTopologyVersionKey()).thenAccept(topVerEntry -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                    }
 
-                try {
-                    topVerFromMetastorage = metaStorageManager.get(zonesLogicalTopologyVersionKey()).get().value();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, e);
-                }
+                    try {
+                        long topologyVersionFromCmg = snapshot.version();
 
-                if (topVerFromMetastorage == null || ByteUtils.bytesToLong(topVerFromMetastorage) < topologyVersionFromCmg) {
-                    Set<String> topologyFromCmg = snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
+                        byte[] topVerFromMetaStorage = topVerEntry.value();
 
-                    Condition topologyVersionCondition = topVerFromMetastorage == null ? notExists(zonesLogicalTopologyVersionKey()) :
-                            value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetastorage);
+                        if (topVerFromMetaStorage == null || bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
+                            Set<String> topologyFromCmg = snapshot.nodes().stream().map(ClusterNode::name).collect(Collectors.toSet());
 
-                    If iff = If.iif(topologyVersionCondition,
-                            updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
-                            ops().yield(false)
-                    );
+                            Condition topologyVersionCondition = topVerFromMetaStorage == null
+                                    ? notExists(zonesLogicalTopologyVersionKey()) :
+                                    value(zonesLogicalTopologyVersionKey()).eq(topVerFromMetaStorage);
 
-                    metaStorageManager.invoke(iff).thenAccept(res -> {
-                        if (res.getAsBoolean()) {
-                            LOG.debug(
-                                    "Distribution zones' logical topology and version keys were initialised [topology = {}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
-                            );
-                        } else {
-                            LOG.debug(
-                                    "Failed to initialize distribution zones' logical topology "
-                                            + "and version keys [topology = {}, version = {}]",
-                                    Arrays.toString(topologyFromCmg.toArray()),
-                                    topologyVersionFromCmg
+                            If iff = If.iif(topologyVersionCondition,
+                                    updateLogicalTopologyAndVersion(topologyFromCmg, topologyVersionFromCmg),
+                                    ops().yield(false)
                             );
+
+                            metaStorageManager.invoke(iff).thenAccept(res -> {
+                                if (res.getAsBoolean()) {
+                                    LOG.debug(
+                                            "Distribution zones' logical topology and version keys were initialised "
+                                                    + "[topology = {}, version = {}]",
+                                            Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                } else {
+                                    LOG.debug(
+                                            "Failed to initialize distribution zones' logical topology "
+                                                    + "and version keys [topology = {}, version = {}]",
+                                            Arrays.toString(topologyFromCmg.toArray()),
+                                            topologyVersionFromCmg
+                                    );
+                                }
+                            });
                         }
-                    });
-                }
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+
             } finally {
                 busyLock.leaveBusy();
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        vaultMgr.get(APPLIED_REV)
+                .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                .thenAccept(vaultAppliedRevision -> {
+                    if (!busyLock.enterBusy()) {
+                        throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                    }
+
+                    try {
+                        vaultMgr.get(zonesLogicalTopologyKey())
+                                .thenAccept(vaultEntry -> {
+                                    if (!busyLock.enterBusy()) {
+                                        throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                                    }
+
+                                    try {
+                                        if (vaultEntry != null && vaultEntry.value() != null) {
+                                            logicalTopology = ByteUtils.fromBytes(vaultEntry.value());
+
+                                            zonesConfiguration.distributionZones().value().namedListKeys()
+                                                    .forEach(zoneName -> {
+                                                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                                                        saveDataNodesToMetaStorage(zoneId, vaultEntry.value(), vaultAppliedRevision);
+                                                    });
+                                        }
+                                    } finally {
+                                        busyLock.leaveBusy();
+                                    }
+                                });
+                    } finally {
+                        busyLock.leaveBusy();
+                    }
+                });
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of distribution zones on logical topology changing event.
+     *
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<?> registerMetaStorageWatchListener() {
+        return metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> !newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param dataNodes Data nodes of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void saveDataNodesToMetaStorage(int zoneId, byte[] dataNodes, long revision) {
+        Update dataNodesAndTriggerKeyUpd = updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+        var iif = If.iif(triggerKeyCondition(revision), dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+        metaStorageManager.invoke(iif).thenAccept(res -> {
+            if (res.getAsBoolean()) {
+                LOG.debug("Delete zones' dataNodes key [zoneId = {}", zoneId);
+            } else {
+                LOG.debug("Failed to delete zones' dataNodes key [zoneId = {}]", zoneId);
+            }
+        });
+    }
+
+    /**
+     * Unwraps distribution zone exceptions from {@link ConfigurationChangeException} if it is possible.
+     */
+    private static Throwable unwrapDistributionZoneException(Throwable e) {

Review Comment:
   Yes, I think it's better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1485: IGNITE-18470 Exception handling in DistributionZoneManager was reworked.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1485:
URL: https://github.com/apache/ignite-3/pull/1485#discussion_r1060559440


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java:
##########
@@ -156,30 +157,36 @@ public void testCreateZoneIfExists() throws Exception {
                 new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get(5, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> fut;
+
+        fut = distributionZoneManager.createZone(
+                new Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
+        );
+
         try {
-            distributionZoneManager.createZone(
-                    new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
-            ).get(5, TimeUnit.SECONDS);
+            fut.get(5, TimeUnit.SECONDS);
         } catch (Exception e0) {
             e = e0;
         }
 
         assertTrue(e != null);
-        assertTrue(e.getCause().getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());
+        assertTrue(e.getCause() instanceof DistributionZoneAlreadyExistsException, e.toString());

Review Comment:
   Makes sense. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org