You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "alievmirza (via GitHub)" <gi...@apache.org> on 2023/04/10 14:35:45 UTC

[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1729: IGNITE-18756 Awaiting for data nodes updating in distribution zone manager.

alievmirza commented on code in PR #1729:
URL: https://github.com/apache/ignite-3/pull/1729#discussion_r1161731849


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified zone.
+     * This method must be invoked in a change configuration closure to guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link DistributionZoneNotFoundException} and {@link DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.

Review Comment:
   waits for observing, please add {@link } to topologyWatchListener 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified zone.
+     * This method must be invoked in a change configuration closure to guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link DistributionZoneNotFoundException} and {@link DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.
+     *
+     * @param topVer Topology version.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+        return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+    }
+
+    /**
+     * Transforms {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * and {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to boolean values.
+     * True if it equals to zero and false if it greater than zero. Zero means that data nodes changing must be started immediate.

Review Comment:
   immediately 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -484,6 +537,189 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.

Review Comment:
   Comment from Alex is still actual, you have added implementation details of mechanism of awaiting, which is ok (but this details are too complicated and with grammar mistakes, please try to simplify it, maybe explain only one scenario with scale up, and mention that scale down logic is the same), but at the first place we must add some short explanation of the method purpose. 
   
   > The method for obtaining data nodes of the specified zone.
   
   This is not just a method for obtaining data nodes, but it takes into account provided {@code topVer} and compares it with the explored by DZM topology version, please add short javadoc in the beginning so it could be clear what this method actually does
   
   > @return The data nodes future.
   
   This is also quite short, we should say that the completion of this future depends on the explored by DZM topology version etc.
   
   Also please fix typos, like `than` -> `then`



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultManager;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    @BeforeEach
+    void setUp() {
+        vaultManager = mock(VaultManager.class);
+
+        when(vaultManager.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(vaultManager.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            } else if (Arrays.equals(key.bytes(), zonesDataNodesPrefix().bytes())) {
+                dataNodesWatchListener = watchListener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        when(metaStorageManager.invoke(any()))
+                .thenReturn(completedFuture(StatementResultImpl.builder().result(new byte[] {0}).build()));
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        TablesConfiguration tablesConfiguration = mock(TablesConfiguration.class);
+
+        NamedConfigurationTree<TableConfiguration, TableView, TableChange> tables = mock(NamedConfigurationTree.class);
+
+        when(tablesConfiguration.tables()).thenReturn(tables);
+
+        NamedListView<TableView> tablesValue = mock(NamedListView.class);
+
+        when(tables.value()).thenReturn(tablesValue);
+
+        when(tablesValue.size()).thenReturn(0);
+
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Set.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        clusterCfgMgr.start();
+
+        zonesConfiguration = clusterCfgMgr.configurationRegistry()
+                .getConfiguration(DistributionZonesConfiguration.KEY);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                tablesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultManager,
+                "node"
+        );
+
+        mockCmgLocalNodes();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        distributionZoneManager.stop();
+        clusterCfgMgr.stop();
+        clusterStateStorage.stop();
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with added and removed nodes.
+     */
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUpAndScaleDown() throws Exception {
+        startZoneManager(0);
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        LOG.info("Topology with added and removed nodes.");
+
+        Set<String> dataNodes = Set.of("node0", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        CompletableFuture<Void> revisionUpFut = distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+                .scaleUpRevisionTracker().waitFor(testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(revisionUpFut::isDone, 3000));
+
+        CompletableFuture<Void> revisionDownFut = distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID).scaleDownRevisionTracker()
+                    .waitFor(testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        assertFalse(testData.dataNodesUpFut3.isDone());
+        assertFalse(testData.dataNodesDownFut3.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 2);
+
+        assertTrue(waitForCondition(revisionDownFut::isDone, 3000));
+
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with added nodes.
+     */
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleUp() throws Exception {
+        startZoneManager(0);
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        LOG.info("Topology with added nodes.");
+
+        Set<String> dataNodes = Set.of("node0", "node1", "node2");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        CompletableFuture<Void> revisionUpFut = distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+                    .scaleUpRevisionTracker().waitFor(testData.dataNodesRevision2);
+
+        assertFalse(revisionUpFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, true, testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(revisionUpFut::isDone, 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+    }
+
+    /**
+     * Test checks that data nodes futures are completed on topology with removed nodes.
+     */
+    @Test
+    void testSeveralScaleUpAndSeveralScaleDownThenScaleDown() throws Exception {
+        startZoneManager(0);
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject testData = testSeveralScaleUpAndSeveralScaleDownGeneral();
+
+        LOG.info("Topology with removed nodes.");
+
+        Set<String> dataNodes = Set.of("node0");
+
+        topologyWatchListenerOnUpdate(dataNodes, testData.topVer2, testData.dataNodesRevision2);
+
+        assertTrue(testData.topVerUpFut2.isDone());
+        assertTrue(testData.topVerDownFut2.isDone());
+
+        CompletableFuture<Void> revisionDownFut = distributionZoneManager.zonesState().get(DEFAULT_ZONE_ID)
+                .scaleDownRevisionTracker().waitFor((long) testData.dataNodesRevision2);
+
+        assertFalse(revisionDownFut.isDone());
+
+        dataNodesWatchListenerOnUpdate(DEFAULT_ZONE_ID, dataNodes, false, testData.dataNodesRevision2,
+                testData.dataNodesRevision2 + 1);
+
+        assertTrue(waitForCondition(revisionDownFut::isDone, 3000));
+        assertEquals(dataNodes, testData.dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodes, testData.dataNodesDownFut3.get(3, SECONDS));
+    }
+
+    private static class TestSeveralScaleUpAndSeveralScaleDownDataObject {
+        private final long topVer2;
+        private final long dataNodesRevision2;
+        private final CompletableFuture<Void> topVerUpFut2;
+        private final CompletableFuture<Void> topVerDownFut2;
+        private final CompletableFuture<Set<String>> dataNodesUpFut3;
+        private final CompletableFuture<Set<String>> dataNodesDownFut3;
+
+        TestSeveralScaleUpAndSeveralScaleDownDataObject(
+                long topVer2,
+                long dataNodesRevision2,
+                CompletableFuture<Void> topVerUpFut2,
+                CompletableFuture<Void> topVerDownFut2,
+                CompletableFuture<Set<String>> dataNodesUpFut3,
+                CompletableFuture<Set<String>> dataNodesDownFut3) {
+            this.topVer2 = topVer2;
+            this.dataNodesRevision2 = dataNodesRevision2;
+            this.topVerUpFut2 = topVerUpFut2;
+            this.topVerDownFut2 = topVerDownFut2;
+            this.dataNodesUpFut3 = dataNodesUpFut3;
+            this.dataNodesDownFut3 = dataNodesDownFut3;
+        }
+    }
+
+    /**
+     * This method invokes {@link DistributionZoneManager#topologyVersionedDataNodes(int, long)} with default and non-default zone id
+     * and different logical topology version. Collects data nodes futures.
+     * Simulates new logical topology with new nodes. Check that some of data nodes futures are completed.
+     * Simulates new logical topology with removed nodes. Check that some of data nodes futures are completed.
+     *
+     * @return Structure with data for continue testing.
+     */
+    private TestSeveralScaleUpAndSeveralScaleDownDataObject testSeveralScaleUpAndSeveralScaleDownGeneral()

Review Comment:
   I do not understand this concept with `TestSeveralScaleUpAndSeveralScaleDownDataObject`, many things are done in one method, it is hard to read this code, please do something with that.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified zone.
+     * This method must be invoked in a change configuration closure to guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link DistributionZoneNotFoundException} and {@link DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.
+     *
+     * @param topVer Topology version.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+        return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+    }
+
+    /**
+     * Transforms {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * and {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to boolean values.
+     * True if it equals to zero and false if it greater than zero. Zero means that data nodes changing must be started immediate.
+     *
+     * @param zoneId Zone id.
+     * @return Future with the result.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Boolean>> getImmediateTimers(int zoneId) {
+        return inBusyLock(busyLock, () -> {
+            DistributionZoneConfiguration zoneCfg = getZoneById(zonesConfiguration, zoneId);
+
+            return completedFuture(new IgniteBiTuple<>(
+                    zoneCfg.dataNodesAutoAdjustScaleUp().value() == IMMEDIATE_TIMER_VALUE,
+                    zoneCfg.dataNodesAutoAdjustScaleDown().value() == IMMEDIATE_TIMER_VALUE
+            ));
+        });
+    }
+
+    /**
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 than waits for writing data nodes triggered

Review Comment:
   Also it could be useful to mention case with `DistributionZoneWasRemovedException`



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -898,45 +1083,62 @@ private void initDataNodesFromVaultManager() {
         }
 
         try {
-            vaultMgr.get(zonesLogicalTopologyKey())
-                    .thenAccept(vaultEntry -> {
-                        if (!busyLock.enterBusy()) {
-                            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
-                        }
+            long appliedRevision = metaStorageManager.appliedRevision();
 
-                        try {
-                            long appliedRevision = metaStorageManager.appliedRevision();
+            lastScaleUpRevision = appliedRevision;
 
-                            if (vaultEntry != null && vaultEntry.value() != null) {
-                                logicalTopology = fromBytes(vaultEntry.value());
+            lastScaleDownRevision = appliedRevision;
 
-                                // init keys and data nodes for default zone
-                                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                                        DEFAULT_ZONE_ID,
-                                        appliedRevision,
-                                        logicalTopology
-                                );
+            VaultEntry topVerEntry = vaultMgr.get(zonesLogicalTopologyVersionKey()).join();
 
-                                zonesConfiguration.distributionZones().value().forEach(zone -> {
-                                    int zoneId = zone.zoneId();
+            if (topVerEntry != null && topVerEntry.value() != null) {
+                topVerTracker.update(bytesToLong(topVerEntry.value()));
+            }
 
-                                    saveDataNodesAndUpdateTriggerKeysInMetaStorage(
-                                            zoneId,
-                                            appliedRevision,
-                                            logicalTopology
-                                    );
-                                });
-                            }
-                        } finally {
-                            busyLock.leaveBusy();
-                        }
-                    });
+            VaultEntry topologyEntry = vaultMgr.get(zonesLogicalTopologyKey()).join();
+
+            if (topologyEntry != null && topologyEntry.value() != null) {
+                logicalTopology = fromBytes(topologyEntry.value());
+
+                // init keys and data nodes for default zone
+                saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                        DEFAULT_ZONE_ID,
+                        appliedRevision,
+                        logicalTopology
+                );
+
+                zonesConfiguration.distributionZones().value().forEach(zone -> {
+                    int zoneId = zone.zoneId();
+
+                    saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                            zoneId,
+                            appliedRevision,
+                            logicalTopology
+                    );
+                });
+            }
+
+            zonesState.values().forEach(zoneState -> {
+                zoneState.scaleUpRevisionTracker().update(lastScaleUpRevision);
+
+                zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision);
+
+                zoneState.nodes(logicalTopology);
+            });
+
+            assert topologyEntry == null || topologyEntry.value() == null || logicalTopology.equals(fromBytes(topologyEntry.value()))
+                    : "DistributionZoneManager.logicalTopology was changed after initialization from the vault manager.";

Review Comment:
   Please, remove `DistributionZoneManager.logicalTopology` from this message and try to rephrase it. This code can seen by user



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -505,6 +543,133 @@ public int getZoneId(String name) {
         }
     }
 
+    /**
+     * The method for obtaining data nodes of the specified zone.
+     * The flow for the future completion:
+     * Waiting for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 than wait for writing data nodes triggered
+     * by started nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} equals to 0 than wait for writing data nodes
+     * triggered by stopped nodes and corresponding to the passed topology version or greater topology version
+     * to the data nodes into the meta storage.
+     * After waiting it completes the future with data nodes of the specified zone.
+     * This method must be invoked in a change configuration closure to guarantee that the zone is exists and values of scaleUp/scaleDown
+     * timers are up to date.
+     * The returned future can be completed with {@link DistributionZoneNotFoundException} and {@link DistributionZoneWasRemovedException}
+     * in case when the distribution zone was removed during method execution.
+     *
+     * @param zoneId Zone id.
+     * @param topVer Topology version.
+     * @return The data nodes future.
+     */
+    public CompletableFuture<Set<String>> topologyVersionedDataNodes(int zoneId, long topVer) {
+        CompletableFuture<IgniteBiTuple<Boolean, Boolean>> timerValuesFut = awaitTopologyVersion(topVer)
+                .thenCompose(ignored -> getImmediateTimers(zoneId));
+
+        return allOf(
+                timerValuesFut.thenCompose(timerValues -> scaleUpAwaiting(zoneId, timerValues.get1())),
+                timerValuesFut.thenCompose(timerValues -> scaleDownAwaiting(zoneId, timerValues.get2()))
+        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+    }
+
+    /**
+     * Waits for DistributionZoneManager observe passed topology version or greater version in topologyWatchListener.
+     *
+     * @param topVer Topology version.
+     * @return Future for chaining.
+     */
+    private CompletableFuture<Void> awaitTopologyVersion(long topVer) {
+        return inBusyLock(busyLock, () -> topVerTracker.waitFor(topVer));
+    }
+
+    /**
+     * Transforms {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+     * and {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} values to boolean values.
+     * True if it equals to zero and false if it greater than zero. Zero means that data nodes changing must be started immediate.
+     *
+     * @param zoneId Zone id.
+     * @return Future with the result.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Boolean>> getImmediateTimers(int zoneId) {
+        return inBusyLock(busyLock, () -> {
+            DistributionZoneConfiguration zoneCfg = getZoneById(zonesConfiguration, zoneId);
+
+            return completedFuture(new IgniteBiTuple<>(
+                    zoneCfg.dataNodesAutoAdjustScaleUp().value() == IMMEDIATE_TIMER_VALUE,
+                    zoneCfg.dataNodesAutoAdjustScaleDown().value() == IMMEDIATE_TIMER_VALUE
+            ));
+        });
+    }
+
+    /**
+     * If the {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp} equals to 0 than waits for writing data nodes triggered

Review Comment:
   Please rephrase this javadoc and one below, it is really hard to understand what is going on here 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,24 +1147,57 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
+
+                    Set<String> newLogicalTopology = null;
+
+                    long revision = 0;
+
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
 
-                    long revision = newEntry.revision();
+                        if (Arrays.equals(e.key(), zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
 
-                    Set<String> newLogicalTopology = fromBytes(newLogicalTopologyBytes);
+                            newLogicalTopology = fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null;
+                    assert revision > 0;
+
+                    Set<String> newLogicalTopology0 = newLogicalTopology;
 
                     Set<String> removedNodes =
-                            logicalTopology.stream().filter(node -> !newLogicalTopology.contains(node)).collect(toSet());
+                            logicalTopology.stream().filter(node -> !newLogicalTopology0.contains(node)).collect(toSet());
 
                     Set<String> addedNodes =
                             newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toSet());
 
+                    if (!addedNodes.isEmpty()) {
+                        lastScaleUpRevision = revision;
+                    }
+
+                    if (!removedNodes.isEmpty()) {
+                        lastScaleDownRevision = revision;
+                    }
+
+                    //The topology version must be updated after the lastScaleUpRevision and lastScaleDownRevision are updated.
+                    //This is necessary in order to when topology version waiters will be notified that topology version is updated

Review Comment:
   please try to rephrase the second sentence, it is unclear what you wanted to say



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java:
##########
@@ -92,6 +93,8 @@ protected IgniteSql igniteSql() {
         return CLUSTER_NODES.get(0).sql();
     }
 
+
+    @Disabled

Review Comment:
   Disabled without a ticket. Are this issues still valid? 



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {
+    private static final IgniteLogger LOG = Loggers.forClass(DistributionZoneAwaitDataNodesTest.class);
+
+    private MetaStorageManager metaStorageManager;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private LogicalTopology logicalTopology;
+
+    private ClusterStateStorage clusterStateStorage;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultManager;
+
+    private WatchListener topologyWatchListener;
+
+    private WatchListener dataNodesWatchListener;
+
+    @BeforeEach
+    void setUp() {
+        vaultManager = mock(VaultManager.class);
+
+        when(vaultManager.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(vaultManager.get(zonesLogicalTopologyVersionKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyVersionKey(), longToBytes(0))));
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        doAnswer(invocation -> {
+            ByteArray key = invocation.getArgument(0);
+
+            WatchListener watchListener = invocation.getArgument(1);
+
+            if (Arrays.equals(key.bytes(), zoneLogicalTopologyPrefix().bytes())) {
+                topologyWatchListener = watchListener;
+            } else if (Arrays.equals(key.bytes(), zonesDataNodesPrefix().bytes())) {
+                dataNodesWatchListener = watchListener;
+            }
+
+            return null;
+        }).when(metaStorageManager).registerPrefixWatch(any(), any());
+
+        when(metaStorageManager.invoke(any()))
+                .thenReturn(completedFuture(StatementResultImpl.builder().result(new byte[] {0}).build()));
+
+        clusterStateStorage = new TestClusterStateStorage();
+
+        logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+        LogicalTopologyServiceImpl logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+
+        TablesConfiguration tablesConfiguration = mock(TablesConfiguration.class);

Review Comment:
   Please rewrite it to a `@InjectConfiguration`



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -945,24 +1147,57 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                 }
 
                 try {
-                    assert evt.single() : "Expected an event with one entry but was an event with several entries with keys: "
+                    assert evt.entryEvents().size() == 2 :
+                            "Expected an event with logical topology and logical topology version entries but was events with keys: "
                             + evt.entryEvents().stream().map(entry -> entry.newEntry() == null ? "null" : entry.newEntry().key())
                             .collect(toList());
 
-                    Entry newEntry = evt.entryEvent().newEntry();
+                    long topVer = 0;
+
+                    byte[] newLogicalTopologyBytes = null;
+
+                    Set<String> newLogicalTopology = null;
+
+                    long revision = 0;
+
+                    for (EntryEvent event : evt.entryEvents()) {
+                        Entry e = event.newEntry();
 
-                    long revision = newEntry.revision();
+                        if (Arrays.equals(e.key(), zonesLogicalTopologyVersionKey().bytes())) {
+                            topVer = bytesToLong(e.value());
 
-                    byte[] newLogicalTopologyBytes = newEntry.value();
+                            revision = e.revision();
+                        } else if (Arrays.equals(e.key(), zonesLogicalTopologyKey().bytes())) {
+                            newLogicalTopologyBytes = e.value();
 
-                    Set<String> newLogicalTopology = fromBytes(newLogicalTopologyBytes);
+                            newLogicalTopology = fromBytes(newLogicalTopologyBytes);
+                        }
+                    }
+
+                    assert newLogicalTopology != null;

Review Comment:
   asserts without a message



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java:
##########
@@ -0,0 +1,910 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.Collections.emptySet;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfigurationSchema;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.dsl.StatementResultImpl;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.schema.configuration.TableChange;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests awaiting data nodes algorithm in distribution zone manager in case when
+ * {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleUp}
+ * or {@link DistributionZoneConfigurationSchema#dataNodesAutoAdjustScaleDown} are immediate.
+ */
+public class DistributionZoneAwaitDataNodesTest extends IgniteAbstractTest {

Review Comment:
   Please do something with the readability of this class, it is hard to understand scenarios, javadocs and test names do not help, `TestSeveralScaleUpAndSeveralScaleDownDataObject` looks too complicated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org