You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2023/01/12 18:18:20 UTC

[GitHub] [ignite-3] sanpwc commented on a diff in pull request #1508: IGNITE-18121 add scale up scheduler

sanpwc commented on code in PR #1508:
URL: https://github.com/apache/ignite-3/pull/1508#discussion_r1067796294


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -103,6 +123,14 @@ public class DistributionZoneManager implements IgniteComponent {
     /** Logical topology service to track topology changes. */
     private final LogicalTopologyService logicalTopologyService;
 
+    private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(
+            8,

Review Comment:
   It's better to use an adjustable value, something like `Math.min(Utils.cpus() * 3, 20);`



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -362,13 +390,21 @@ public void start() {
         }
 
         try {
+            // Init timers after restart.
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        zonesTimers.put(zoneId, new ZoneState());

Review Comment:
   And don't forget about the default zone, that is not the part of `zonesConfiguration.distributionZones()` but `zonesConfiguration.defaultDistributionZone()`



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -362,13 +390,21 @@ public void start() {
         }
 
         try {
+            // Init timers after restart.
+            zonesConfiguration.distributionZones().value().namedListKeys()

Review Comment:
   There's a race, please populate initial state after registering corresponding listeners.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -402,22 +446,35 @@ public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<Distribution
         public CompletableFuture<?> onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
             updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), ctx.storageRevision());
 
+            zonesTimers.remove(ctx.oldValue().zoneId());
+
             return completedFuture(null);
         }
 
         @Override
         public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneUpdate(ctx.storageRevision());
+            int zoneId = ctx.newValue().zoneId();
+
+            updateMetaStorageOnZoneUpdate(ctx.storageRevision(), zoneId);
+
+            int oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
 
-            //TODO: Also add here rescheduling for the existing timers https://issues.apache.org/jira/browse/IGNITE-18121
+            int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
+
+            if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
+                // It is safe to zonesTimers.get(zoneId) in term of NPE because meta storage notifications are one-threaded
+                zonesTimers.get(zoneId).rescheduleOngoingScaleUp(newScaleUp);
+            } else {
+                updateMetaStorageOnZoneUpdate(ctx.storageRevision(), zoneId);

Review Comment:
   Why should we update dataNodes **immediately** if newScaleUp != Integer.MAX_VALUE? We either should skip current timer as is or just discard it. For the consistency purposes the latter is preferable one from my point of view.
   
   And what about rescheduling to immediate one, will zonesTimers.get(zoneId).rescheduleOngoingScaleUp(0) will work?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -402,22 +446,35 @@ public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<Distribution
         public CompletableFuture<?> onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
             updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), ctx.storageRevision());
 
+            zonesTimers.remove(ctx.oldValue().zoneId());

Review Comment:
   Should we stop the timers before removing them from the map?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -806,4 +866,230 @@ private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e,
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up timer timeout,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private CompletableFuture<Boolean> saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) {
+        ZoneState zoneState = zonesTimers.get(zoneId);
+
+        if (zoneState == null) {

Review Comment:
   I'd rather stop the timers before zone removal and change it with an assert.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -806,4 +866,230 @@ private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e,
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up timer timeout,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private CompletableFuture<Boolean> saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) {
+        ZoneState zoneState = zonesTimers.get(zoneId);
+
+        if (zoneState == null) {
+            // Zone was deleted
+            return completedFuture(false);
+        }
+
+        ReentrantLock lockForTimers = zoneState.lockForTimers();
+
+        lockForTimers.lock();
+
+        Set<ByteArray> keysToGetFromMs = Set.of(
+                zoneDataNodesKey(zoneId),
+                zoneScaleUpChangeTriggerKey(zoneId),
+                zoneScaleDownChangeTriggerKey(zoneId)
+        );
+
+        return metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> {
+            if (values.containsValue(null)) {
+                // Zone was deleted
+                return completedFuture(false);
+            }
+
+            Set<String> dataNodesFromMetaStorage = fromBytes(values.get(zoneDataNodesKey(zoneId)).value());
+
+            long scaleUpTriggerRevision = bytesToLong(values.get(zoneScaleUpChangeTriggerKey(zoneId)).value());
+
+            long scaleDownTriggerRevision = bytesToLong(values.get(zoneScaleDownChangeTriggerKey(zoneId)).value());
+
+            if (revision <= scaleUpTriggerRevision) {
+                return completedFuture(false);
+            }
+
+            ReentrantLock lockForZone = zoneState.lock();
+
+            lockForZone.lock();
+
+            Set<String> deltaToAdd;
+
+            try {
+                deltaToAdd = new HashSet<>(zoneState.nodesToAdd());
+            } finally {
+                lockForZone.unlock();
+            }
+
+            Set<String> newDataNodes = new HashSet<>(dataNodesFromMetaStorage);
+
+            newDataNodes.addAll(deltaToAdd);
+
+            Update dataNodesAndTriggerKeyUpd = updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+
+            If iif = If.iif(
+                    triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, scaleDownTriggerRevision, zoneId),
+                    dataNodesAndTriggerKeyUpd,
+                    ops().yield(false)
+            );
+
+            return metaStorageManager.invoke(iif).thenApply(StatementResult::getAsBoolean).thenCompose(invokeResult -> {
+                if (invokeResult) {
+                    lockForZone.lock();
+                    try {
+                        zoneState.nodesToAdd().clear();

Review Comment:
   I don't think that it's valid. Let's imagine that while we are here, trying to add +D, new node was added to zoneState.nodesToAdd(), so it's +D,E. We should just exclude the delta we've successfully added.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -806,4 +866,230 @@ private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e,
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up timer timeout,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private CompletableFuture<Boolean> saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) {
+        ZoneState zoneState = zonesTimers.get(zoneId);
+
+        if (zoneState == null) {
+            // Zone was deleted
+            return completedFuture(false);
+        }
+
+        ReentrantLock lockForTimers = zoneState.lockForTimers();
+
+        lockForTimers.lock();
+
+        Set<ByteArray> keysToGetFromMs = Set.of(
+                zoneDataNodesKey(zoneId),
+                zoneScaleUpChangeTriggerKey(zoneId),
+                zoneScaleDownChangeTriggerKey(zoneId)
+        );
+
+        return metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> {
+            if (values.containsValue(null)) {
+                // Zone was deleted
+                return completedFuture(false);
+            }
+
+            Set<String> dataNodesFromMetaStorage = fromBytes(values.get(zoneDataNodesKey(zoneId)).value());
+
+            long scaleUpTriggerRevision = bytesToLong(values.get(zoneScaleUpChangeTriggerKey(zoneId)).value());
+
+            long scaleDownTriggerRevision = bytesToLong(values.get(zoneScaleDownChangeTriggerKey(zoneId)).value());
+
+            if (revision <= scaleUpTriggerRevision) {
+                return completedFuture(false);
+            }
+
+            ReentrantLock lockForZone = zoneState.lock();
+
+            lockForZone.lock();
+
+            Set<String> deltaToAdd;
+
+            try {
+                deltaToAdd = new HashSet<>(zoneState.nodesToAdd());
+            } finally {
+                lockForZone.unlock();
+            }
+
+            Set<String> newDataNodes = new HashSet<>(dataNodesFromMetaStorage);
+
+            newDataNodes.addAll(deltaToAdd);
+
+            Update dataNodesAndTriggerKeyUpd = updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+
+            If iif = If.iif(
+                    triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, scaleDownTriggerRevision, zoneId),
+                    dataNodesAndTriggerKeyUpd,
+                    ops().yield(false)
+            );
+
+            return metaStorageManager.invoke(iif).thenApply(StatementResult::getAsBoolean).thenCompose(invokeResult -> {
+                if (invokeResult) {
+                    lockForZone.lock();
+                    try {
+                        zoneState.nodesToAdd().clear();
+                    } finally {
+                        lockForZone.unlock();
+                    }
+                } else {
+                    return saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
+                }
+
+                return completedFuture(invokeResult);
+            });
+        }).handle((v, e) -> {
+            lockForTimers.unlock();
+
+            if (e != null) {
+                return CompletableFuture.<Boolean>failedFuture(e);
+            }
+
+            return completedFuture(v);
+        }).thenCompose(Function.identity());
+    }
+
+    private class ZoneState {
+        private Timer scaleUp;
+
+        private Timer scaleDown;
+
+        private final ReentrantLock lockDeltas;
+
+        private final ReentrantLock lockForTimers;
+
+        private final Set<String> nodesToAdd;
+
+        private final Set<String> nodesToRemove;
+
+        ZoneState() {
+            this.lockDeltas = new ReentrantLock();
+            this.lockForTimers = new ReentrantLock();
+            this.nodesToAdd = new HashSet<>();
+            this.nodesToRemove = new HashSet<>();
+        }
+
+        private void rescheduleScaleUp(long delay, Runnable runnable) {
+            if (scaleUp == null) {
+                scaleUp = new Timer(runnable);
+            }
+
+            scaleUp.reschedule(delay);
+        }
+
+        private void rescheduleOngoingScaleUp(long delay) {

Review Comment:
   Provided logic with (ongoing)rescheduling is error-prone and cumbersome, please rethink and reimplement it. There are 3 possible states for timers:
   1. no scheduled task. 
   2. task is scheduled but not fired yet.
   3. task is executing right now.
   
   In case of 1 and 3 we should schedule new task, so it's possible for current timer to execute one right now and have one more in a pending queue.
   In case of option 2 we should reschedule the task. 
   Aforementioned scheduling and rescheduling should be thread-safe.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -806,4 +866,230 @@ private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e,
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up timer timeout,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private CompletableFuture<Boolean> saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) {
+        ZoneState zoneState = zonesTimers.get(zoneId);
+
+        if (zoneState == null) {
+            // Zone was deleted
+            return completedFuture(false);
+        }
+
+        ReentrantLock lockForTimers = zoneState.lockForTimers();
+
+        lockForTimers.lock();
+
+        Set<ByteArray> keysToGetFromMs = Set.of(
+                zoneDataNodesKey(zoneId),
+                zoneScaleUpChangeTriggerKey(zoneId),
+                zoneScaleDownChangeTriggerKey(zoneId)
+        );
+
+        return metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> {
+            if (values.containsValue(null)) {
+                // Zone was deleted
+                return completedFuture(false);
+            }
+
+            Set<String> dataNodesFromMetaStorage = fromBytes(values.get(zoneDataNodesKey(zoneId)).value());
+
+            long scaleUpTriggerRevision = bytesToLong(values.get(zoneScaleUpChangeTriggerKey(zoneId)).value());
+
+            long scaleDownTriggerRevision = bytesToLong(values.get(zoneScaleDownChangeTriggerKey(zoneId)).value());
+
+            if (revision <= scaleUpTriggerRevision) {
+                return completedFuture(false);
+            }
+
+            ReentrantLock lockForZone = zoneState.lock();
+
+            lockForZone.lock();
+
+            Set<String> deltaToAdd;
+
+            try {
+                deltaToAdd = new HashSet<>(zoneState.nodesToAdd());
+            } finally {
+                lockForZone.unlock();
+            }
+
+            Set<String> newDataNodes = new HashSet<>(dataNodesFromMetaStorage);
+
+            newDataNodes.addAll(deltaToAdd);
+
+            Update dataNodesAndTriggerKeyUpd = updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+
+            If iif = If.iif(
+                    triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, scaleDownTriggerRevision, zoneId),
+                    dataNodesAndTriggerKeyUpd,
+                    ops().yield(false)
+            );
+
+            return metaStorageManager.invoke(iif).thenApply(StatementResult::getAsBoolean).thenCompose(invokeResult -> {
+                if (invokeResult) {
+                    lockForZone.lock();
+                    try {
+                        zoneState.nodesToAdd().clear();
+                    } finally {
+                        lockForZone.unlock();
+                    }
+                } else {
+                    return saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);
+                }
+
+                return completedFuture(invokeResult);
+            });
+        }).handle((v, e) -> {
+            lockForTimers.unlock();
+
+            if (e != null) {
+                return CompletableFuture.<Boolean>failedFuture(e);
+            }
+
+            return completedFuture(v);
+        }).thenCompose(Function.identity());
+    }
+
+    private class ZoneState {
+        private Timer scaleUp;
+
+        private Timer scaleDown;
+
+        private final ReentrantLock lockDeltas;
+
+        private final ReentrantLock lockForTimers;
+
+        private final Set<String> nodesToAdd;
+
+        private final Set<String> nodesToRemove;
+
+        ZoneState() {
+            this.lockDeltas = new ReentrantLock();
+            this.lockForTimers = new ReentrantLock();
+            this.nodesToAdd = new HashSet<>();
+            this.nodesToRemove = new HashSet<>();
+        }
+
+        private void rescheduleScaleUp(long delay, Runnable runnable) {
+            if (scaleUp == null) {
+                scaleUp = new Timer(runnable);
+            }
+
+            scaleUp.reschedule(delay);
+        }
+
+        private void rescheduleOngoingScaleUp(long delay) {
+            if (scaleUp != null) {
+                scaleUp.rescheduleOngoingTask(delay);
+            }
+        }
+
+        private void rescheduleScaleDown(long delay, Runnable runnable) {
+            if (scaleDown == null) {
+                scaleDown = new Timer(runnable);
+            }
+
+            scaleDown.reschedule(delay);
+        }
+
+        public ReentrantLock lock() {
+            return lockDeltas;
+        }
+
+        public ReentrantLock lockForTimers() {
+            return lockForTimers;
+        }
+
+        public Set<String> nodesToAdd() {
+            return nodesToAdd;
+        }
+
+        public Set<String> nodesToRemove() {
+            return nodesToRemove;
+        }
+
+        public void nodesToAdd(Set<String> nodes) {
+            lockDeltas.lock();
+
+            try {
+                nodesToAdd.addAll(nodes);
+            } finally {
+                lockDeltas.unlock();
+            }
+        }
+
+        public void nodesToRemove(Set<String> nodes) {
+            lockDeltas.lock();
+
+            try {
+                nodesToRemove.addAll(nodes);
+            } finally {
+                lockDeltas.unlock();
+            }
+        }
+    }
+
+    private class Timer implements Runnable {
+        private final Runnable runnable;
+
+        ScheduledFuture<?> task;
+
+        Timer(Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            runnable.run();
+        }
+
+        private void reschedule(long delay) {
+            if (task != null) {
+                task.cancel(false);
+            }
+
+            task = executor.schedule(this, delay, TimeUnit.MILLISECONDS);
+        }
+
+        private void rescheduleOngoingTask(long delay) {
+            if (task != null) {
+                if (task.cancel(false)) {
+                    task = executor.schedule(this, delay, TimeUnit.MILLISECONDS);
+                }
+            }
+        }
+    }
+
+    private void initTriggerKeysAndDataNodes(int zoneId) {
+        If iifScaleUp = If.iif(
+                notExists(zoneScaleUpChangeTriggerKey(zoneId)),
+                ops(put(zoneScaleUpChangeTriggerKey(zoneId), ByteUtils.longToBytes(0L))).yield(true),
+                ops().yield(false)
+        );
+
+        metaStorageManager.invoke(iifScaleUp);

Review Comment:
   Here and in updateMetaStorage... you are ignoring invoke result. It's probably not a good idea ;)



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -103,6 +123,14 @@ public class DistributionZoneManager implements IgniteComponent {
     /** Logical topology service to track topology changes. */
     private final LogicalTopologyService logicalTopologyService;
 
+    private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(
+            8,
+            new NamedThreadFactory("dst-zones-scheduler", LOG),

Review Comment:
   Let's also add node name as a prefix, e.g.
   `NamedThreadFactory.threadPrefix(clusterNetSvc.localConfiguration().getName(),`
   and extract dst-zones-scheduler to a constant.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -393,7 +429,15 @@ public void stop() throws Exception {
     private class ZonesConfigurationListener implements ConfigurationNamedListListener<DistributionZoneView> {
         @Override
         public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneCreate(ctx.newValue().zoneId(), ctx.storageRevision());
+            int zoneId = ctx.newValue().zoneId();
+
+            ZoneState timers = new ZoneState();
+
+            zonesTimers.put(zoneId, timers);
+
+            initTriggerKeysAndDataNodes(zoneId);

Review Comment:
   We should avoid awaiting network operations from within ms notification thread. Currently there's no await logic, but it's probably a bug, because you just ignore ms.invoke() CompletableFuture result, so, please, be aware that when you will fix that you shouldn't await the result inside the ms notification thread.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -393,7 +429,15 @@ public void stop() throws Exception {
     private class ZonesConfigurationListener implements ConfigurationNamedListListener<DistributionZoneView> {
         @Override
         public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneCreate(ctx.newValue().zoneId(), ctx.storageRevision());
+            int zoneId = ctx.newValue().zoneId();
+
+            ZoneState timers = new ZoneState();

Review Comment:
   Well it's not only timers.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -393,7 +429,15 @@ public void stop() throws Exception {
     private class ZonesConfigurationListener implements ConfigurationNamedListListener<DistributionZoneView> {
         @Override
         public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneCreate(ctx.newValue().zoneId(), ctx.storageRevision());
+            int zoneId = ctx.newValue().zoneId();
+
+            ZoneState timers = new ZoneState();
+
+            zonesTimers.put(zoneId, timers);
+
+            initTriggerKeysAndDataNodes(zoneId);
+
+            updateMetaStorageOnZoneCreate(zoneId, ctx.storageRevision());

Review Comment:
   Naming is confusing, meaning that initTriggerKeysAndDataNodes also updates ms state. So what actuality updateMetaStorageOnZoneCreate does? Seems that it's behavior clashes with initTriggerKeysAndDataNodes and yep despite it's name initTriggerKeysAnd**DataNodes** doesn't update data nodes.
   So, 
   
   - what's the point of calling both updateMetaStorageOnZoneCreate and initTriggerKeysAndDataNodes with populating triggers with zeros and then with an appopriate values.
   - what's the point of having two very similar methods?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -724,14 +781,17 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
 
                                         if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
                                             //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
-                                            saveDataNodesToMetaStorage(
-                                                    zoneId, newEntry.value(), newEntry.revision()
-                                            );
+                                            saveDataNodesToMetaStorage(zoneId, newEntry.value(), newEntry.revision());
                                         } else {
                                             if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
                                                 //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
-                                                saveDataNodesToMetaStorage(
-                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                zonesTimers.get(zoneId).nodesToAdd(new HashSet<>(addedNodes));

Review Comment:
   Confusing naming: nodesToAdd() sounds like a getter, but it's actually an accumulator. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -724,14 +781,17 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
 
                                         if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
                                             //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
-                                            saveDataNodesToMetaStorage(
-                                                    zoneId, newEntry.value(), newEntry.revision()
-                                            );
+                                            saveDataNodesToMetaStorage(zoneId, newEntry.value(), newEntry.revision());
                                         } else {
                                             if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
                                                 //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
-                                                saveDataNodesToMetaStorage(
-                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                zonesTimers.get(zoneId).nodesToAdd(new HashSet<>(addedNodes));
+
+                                                zonesTimers.get(zoneId).rescheduleScaleUp(autoAdjustScaleUp,

Review Comment:
   Formatting.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -724,14 +781,17 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
 
                                         if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
                                             //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
-                                            saveDataNodesToMetaStorage(
-                                                    zoneId, newEntry.value(), newEntry.revision()
-                                            );
+                                            saveDataNodesToMetaStorage(zoneId, newEntry.value(), newEntry.revision());
                                         } else {
                                             if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
                                                 //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
-                                                saveDataNodesToMetaStorage(
-                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                zonesTimers.get(zoneId).nodesToAdd(new HashSet<>(addedNodes));
+
+                                                zonesTimers.get(zoneId).rescheduleScaleUp(autoAdjustScaleUp,

Review Comment:
   The proposed behavior differs from the original one in a way that we don't trigger dataNoddes updates on removing nodes. Some tests may fail.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -806,4 +866,230 @@ private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e,
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up timer timeout,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private CompletableFuture<Boolean> saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) {

Review Comment:
   Where and how do you use method result?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -806,4 +866,230 @@ private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e,
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up timer timeout,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private CompletableFuture<Boolean> saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) {
+        ZoneState zoneState = zonesTimers.get(zoneId);
+
+        if (zoneState == null) {
+            // Zone was deleted
+            return completedFuture(false);
+        }
+
+        ReentrantLock lockForTimers = zoneState.lockForTimers();
+
+        lockForTimers.lock();
+
+        Set<ByteArray> keysToGetFromMs = Set.of(
+                zoneDataNodesKey(zoneId),
+                zoneScaleUpChangeTriggerKey(zoneId),
+                zoneScaleDownChangeTriggerKey(zoneId)
+        );
+
+        return metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> {
+            if (values.containsValue(null)) {
+                // Zone was deleted
+                return completedFuture(false);

Review Comment:
   Same as above.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -806,4 +866,230 @@ private static Throwable unwrapDistributionZoneExceptionRecursively(Throwable e,
 
         return null;
     }
+
+    /**
+     * Method updates data nodes value for the specified zone after scale up timer timeout,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private CompletableFuture<Boolean> saveDataNodesToMetaStorageOnScaleUp(int zoneId, long revision) {
+        ZoneState zoneState = zonesTimers.get(zoneId);
+
+        if (zoneState == null) {
+            // Zone was deleted
+            return completedFuture(false);
+        }
+
+        ReentrantLock lockForTimers = zoneState.lockForTimers();
+
+        lockForTimers.lock();
+
+        Set<ByteArray> keysToGetFromMs = Set.of(
+                zoneDataNodesKey(zoneId),
+                zoneScaleUpChangeTriggerKey(zoneId),
+                zoneScaleDownChangeTriggerKey(zoneId)
+        );
+
+        return metaStorageManager.getAll(keysToGetFromMs).thenCompose(values -> {
+            if (values.containsValue(null)) {
+                // Zone was deleted
+                return completedFuture(false);
+            }
+
+            Set<String> dataNodesFromMetaStorage = fromBytes(values.get(zoneDataNodesKey(zoneId)).value());
+
+            long scaleUpTriggerRevision = bytesToLong(values.get(zoneScaleUpChangeTriggerKey(zoneId)).value());
+
+            long scaleDownTriggerRevision = bytesToLong(values.get(zoneScaleDownChangeTriggerKey(zoneId)).value());
+
+            if (revision <= scaleUpTriggerRevision) {
+                return completedFuture(false);
+            }
+
+            ReentrantLock lockForZone = zoneState.lock();
+
+            lockForZone.lock();
+
+            Set<String> deltaToAdd;
+
+            try {
+                deltaToAdd = new HashSet<>(zoneState.nodesToAdd());
+            } finally {
+                lockForZone.unlock();
+            }
+
+            Set<String> newDataNodes = new HashSet<>(dataNodesFromMetaStorage);
+
+            newDataNodes.addAll(deltaToAdd);
+
+            Update dataNodesAndTriggerKeyUpd = updateDataNodesAndScaleUpTriggerKey(zoneId, revision, toBytes(newDataNodes));
+
+            If iif = If.iif(
+                    triggerScaleUpScaleDownKeysCondition(scaleUpTriggerRevision, scaleDownTriggerRevision, zoneId),
+                    dataNodesAndTriggerKeyUpd,
+                    ops().yield(false)
+            );
+
+            return metaStorageManager.invoke(iif).thenApply(StatementResult::getAsBoolean).thenCompose(invokeResult -> {
+                if (invokeResult) {
+                    lockForZone.lock();
+                    try {
+                        zoneState.nodesToAdd().clear();
+                    } finally {
+                        lockForZone.unlock();
+                    }
+                } else {
+                    return saveDataNodesToMetaStorageOnScaleUp(zoneId, revision);

Review Comment:
   Logging is missing.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -402,22 +446,35 @@ public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<Distribution
         public CompletableFuture<?> onDelete(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
             updateMetaStorageOnZoneDelete(ctx.oldValue().zoneId(), ctx.storageRevision());
 
+            zonesTimers.remove(ctx.oldValue().zoneId());
+
             return completedFuture(null);
         }
 
         @Override
         public CompletableFuture<?> onUpdate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
-            updateMetaStorageOnZoneUpdate(ctx.storageRevision());
+            int zoneId = ctx.newValue().zoneId();
+
+            updateMetaStorageOnZoneUpdate(ctx.storageRevision(), zoneId);
+
+            int oldScaleUp = ctx.oldValue().dataNodesAutoAdjustScaleUp();
 
-            //TODO: Also add here rescheduling for the existing timers https://issues.apache.org/jira/browse/IGNITE-18121
+            int newScaleUp = ctx.newValue().dataNodesAutoAdjustScaleUp();
+
+            if (newScaleUp != Integer.MAX_VALUE && oldScaleUp != newScaleUp) {
+                // It is safe to zonesTimers.get(zoneId) in term of NPE because meta storage notifications are one-threaded
+                zonesTimers.get(zoneId).rescheduleOngoingScaleUp(newScaleUp);

Review Comment:
   Why do you expect mandatory ongoing timer at this point? Let's say that that timer was already fired, should we consider it as an ongoing one, probably not. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org