You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/12/08 12:22:14 UTC

[GitHub] [ignite-3] sergeyuttsel opened a new pull request, #1426: IGNITE-18115

sergeyuttsel opened a new pull request, #1426:
URL: https://github.com/apache/ignite-3/pull/1426

   Populate DistributionZoneManager with MetaStorage listeners to logical topology events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1050679847


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);

Review Comment:
   I created new error code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1057776809


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -172,7 +192,12 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter
                 } catch (IllegalArgumentException e) {
                     throw new DistributionZoneAlreadyExistsException(distributionZoneCfg.name(), e);
                 } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, distributionZoneCfg.name(), e);
+                    throw withCause(

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sanpwc commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1057492859


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -108,24 +120,32 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private volatile Set<String> logicalTopology;

Review Comment:
   Please add comment that will explain why volatile is enough here.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -192,7 +217,7 @@ public CompletableFuture<Void> alterZone(String name, DistributionZoneConfigurat
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());

Review Comment:
   What's the point of given code?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -275,7 +310,9 @@ public void start() {
 
         logicalTopologyService.addEventListener(topologyEventListener);
 
-        initMetaStorageKeysOnStart();
+        registerMetaStorageWatchListener()

Review Comment:
   What will happen in case of concurrent start/stop calls?
   Busy locks are missing almost everywhere.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -204,7 +229,12 @@ public CompletableFuture<Void> alterZone(String name, DistributionZoneConfigurat
                 } catch (IllegalArgumentException e) {
                     throw new DistributionZoneRenameException(name, distributionZoneCfg.name(), e);
                 } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, distributionZoneCfg.name(), e);
+                    throw withCause(

Review Comment:
   Same as above.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -137,7 +156,7 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());

Review Comment:
   How did you fix it?



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -172,7 +192,12 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter
                 } catch (IllegalArgumentException e) {
                     throw new DistributionZoneAlreadyExistsException(distributionZoneCfg.name(), e);
                 } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, distributionZoneCfg.name(), e);
+                    throw withCause(

Review Comment:
   Please add all possible exceptions to the @throws clause explaining why it's happening.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -172,7 +192,12 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter
                 } catch (IllegalArgumentException e) {
                     throw new DistributionZoneAlreadyExistsException(distributionZoneCfg.name(), e);
                 } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, distributionZoneCfg.name(), e);
+                    throw withCause(

Review Comment:
   What's the point in catching an exception and re-throwing IgniteInternalException here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1057776995


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -204,7 +229,12 @@ public CompletableFuture<Void> alterZone(String name, DistributionZoneConfigurat
                 } catch (IllegalArgumentException e) {
                     throw new DistributionZoneRenameException(name, distributionZoneCfg.name(), e);
                 } catch (Exception e) {
-                    throw new IgniteInternalException(UNEXPECTED_ERR, distributionZoneCfg.name(), e);
+                    throw withCause(

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1054509462


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -137,7 +156,7 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());

Review Comment:
   Fixed, I hope.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1049805801


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(4)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.isEmpty());
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockCreateZone();
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockCreateZone();
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(2, newDataNodes.size());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockCreateZone() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(List.of(ZONE_NAME));
+
+        DistributionZoneConfiguration distributionZoneConfiguration1 = mock(DistributionZoneConfiguration.class);
+        when(namedConfigurationTree.get(ZONE_NAME)).thenReturn(distributionZoneConfiguration1);
+
+        ConfigurationValue<Integer> zoneIdValue1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.zoneId()).thenReturn(zoneIdValue1);
+        when(zoneIdValue1.value()).thenReturn(1);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust1);
+        when(dataNodesAutoAdjust1.value()).thenReturn(100);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp1);
+        when(dataNodesAutoAdjustScaleUp1.value()).thenReturn(200);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown1);
+        when(dataNodesAutoAdjustScaleDown1.value()).thenReturn(300);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newlogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        org.apache.ignite.internal.metastorage.client.Entry newEntry =

Review Comment:
   It is necessary because we have `org.apache.ignite.internal.metastorage.server.Entry` and `org.apache.ignite.internal.metastorage.client.Entry` in one class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1050680613


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -373,4 +487,39 @@ private void updateMetaStorageOnZoneDelete(int zoneId, long revision) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Returns applied vault revision.
+     */
+    private long vaultAppliedRevision() {
+        try {
+            return vaultMgr.get(APPLIED_REV)
+                    .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);

Review Comment:
   I created new error code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
alievmirza commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1048327003


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -373,4 +487,39 @@ private void updateMetaStorageOnZoneDelete(int zoneId, long revision) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Returns applied vault revision.
+     */
+    private long vaultAppliedRevision() {
+        try {
+            return vaultMgr.get(APPLIED_REV)
+                    .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);

Review Comment:
   Do not use deprecated exception



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);

Review Comment:
   Please do not use deprecated exception



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            byte[] newLogicalTopology = vaultEntry.value();
+
+            logicalTopology = ByteUtils.fromBytes(newLogicalTopology);
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetastorage(zoneId, newLogicalTopology, vaultAppliedRevision);
+                    });
+        }
+
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteInternalException(new NodeStoppingException());

Review Comment:
   Do not use deprecated exception



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            byte[] newLogicalTopology = vaultEntry.value();
+
+            logicalTopology = ByteUtils.fromBytes(newLogicalTopology);
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetastorage(zoneId, newLogicalTopology, vaultAppliedRevision);
+                    });
+        }
+
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteInternalException(new NodeStoppingException());
+                }
+
+                try {
+                    assert evt.single();
+
+                    Entry newEntry = evt.entryEvent().newEntry();
+
+                    Set<String> newlogicalTopology = ByteUtils.fromBytes(newEntry.value());

Review Comment:
   newLogicalTopology



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            byte[] newLogicalTopology = vaultEntry.value();
+
+            logicalTopology = ByteUtils.fromBytes(newLogicalTopology);
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetastorage(zoneId, newLogicalTopology, vaultAppliedRevision);
+                    });
+        }
+
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteInternalException(new NodeStoppingException());
+                }
+
+                try {
+                    assert evt.single();
+
+                    Entry newEntry = evt.entryEvent().newEntry();
+
+                    Set<String> newlogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                    List<String> removedNodes =
+                            logicalTopology.stream().filter(node -> !newlogicalTopology.contains(node)).collect(toList());
+
+                    List<String> addedNodes =
+                            newlogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                    logicalTopology = newlogicalTopology;
+
+                    zonesConfiguration.distributionZones().value().namedListKeys()
+                            .forEach(zoneName -> {
+                                DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                Integer zoneId = zoneCfg.zoneId().value();
+
+                                if (!removedNodes.isEmpty()) {
+                                    if (autoAdjust != Integer.MAX_VALUE) {
+                                        saveDataNodesToMetastorage(

Review Comment:
   here and below lets add todos with the corresponding tickets to timers 



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(4)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.isEmpty());
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockCreateZone();
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockCreateZone();
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(2, newDataNodes.size());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockCreateZone() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(List.of(ZONE_NAME));
+
+        DistributionZoneConfiguration distributionZoneConfiguration1 = mock(DistributionZoneConfiguration.class);
+        when(namedConfigurationTree.get(ZONE_NAME)).thenReturn(distributionZoneConfiguration1);
+
+        ConfigurationValue<Integer> zoneIdValue1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.zoneId()).thenReturn(zoneIdValue1);
+        when(zoneIdValue1.value()).thenReturn(1);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust1);
+        when(dataNodesAutoAdjust1.value()).thenReturn(100);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp1);
+        when(dataNodesAutoAdjustScaleUp1.value()).thenReturn(200);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown1);
+        when(dataNodesAutoAdjustScaleDown1.value()).thenReturn(300);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newlogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        org.apache.ignite.internal.metastorage.client.Entry newEntry =

Review Comment:
   `org.apache.ignite.internal.metastorage.client.` is redundant



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(4)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.isEmpty());
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockCreateZone();
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockCreateZone();
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(2, newDataNodes.size());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockCreateZone() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(List.of(ZONE_NAME));
+
+        DistributionZoneConfiguration distributionZoneConfiguration1 = mock(DistributionZoneConfiguration.class);
+        when(namedConfigurationTree.get(ZONE_NAME)).thenReturn(distributionZoneConfiguration1);
+
+        ConfigurationValue<Integer> zoneIdValue1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.zoneId()).thenReturn(zoneIdValue1);
+        when(zoneIdValue1.value()).thenReturn(1);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust1);
+        when(dataNodesAutoAdjust1.value()).thenReturn(100);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp1);
+        when(dataNodesAutoAdjustScaleUp1.value()).thenReturn(200);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown1);
+        when(dataNodesAutoAdjustScaleDown1.value()).thenReturn(300);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newlogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        org.apache.ignite.internal.metastorage.client.Entry newEntry =
+                new org.apache.ignite.internal.metastorage.client.EntryImpl(null, newlogicalTopology, rev, 1);
+
+        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+        WatchEvent evt = new WatchEvent(entryEvent);
+
+        watchListener.onUpdate(evt);
+    }
+
+    private void mockVaultAppliedRevision(long revision) {
+        when(vaultMgr.get(APPLIED_REV)).thenReturn(completedFuture(new VaultEntry(APPLIED_REV, longToBytes(revision))));
+    }
+
+    private LogicalTopologySnapshot mockCmgLocalNodes(Set<ClusterNode> clusterNodes) {
+        LogicalTopologySnapshot logicalTopologySnapshot = mock(LogicalTopologySnapshot.class);
+
+        when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
+
+        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes);
+
+        return logicalTopologySnapshot;
+    }
+
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode> clusterNodes) throws InterruptedException {
+        byte[] nodes = clusterNodes == null
+                ? null
+                : ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));

Review Comment:
   please, check all PR with such kind of redundancies, there are several places 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {

Review Comment:
   What if this condition do not pass? We will come to the situation that `logicalTopology` will be null and after that you'll got NPE in the metastorage watch handler. I would recommend to initialise it with `cmgManager.logicalTopology().get().nodes();` if we cannot read it from vault (seems like it is possible only on the first start). And please add test scenario for that 
   
   Also please use `logicalTopology` in the `DistributionZoneManager#updateMetaStorageOnZoneCreate` instead of `cmgManager.logicalTopology().get().nodes();` (I've added TODO in that place)



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            byte[] newLogicalTopology = vaultEntry.value();
+
+            logicalTopology = ByteUtils.fromBytes(newLogicalTopology);
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetastorage(zoneId, newLogicalTopology, vaultAppliedRevision);
+                    });
+        }
+
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteInternalException(new NodeStoppingException());
+                }
+
+                try {
+                    assert evt.single();
+
+                    Entry newEntry = evt.entryEvent().newEntry();
+
+                    Set<String> newlogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                    List<String> removedNodes =
+                            logicalTopology.stream().filter(node -> !newlogicalTopology.contains(node)).collect(toList());
+
+                    List<String> addedNodes =
+                            newlogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                    logicalTopology = newlogicalTopology;
+
+                    zonesConfiguration.distributionZones().value().namedListKeys()
+                            .forEach(zoneName -> {
+                                DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                Integer zoneId = zoneCfg.zoneId().value();
+
+                                if (!removedNodes.isEmpty()) {

Review Comment:
   please, consider refactoring of this code, to something like this, so we don't need to save data nodes twice when `autoAdjust != Integer.MAX_VALUE`
   
   ```
                                   if (autoAdjust != Integer.MAX_VALUE) {
                                       saveDataNodesToMetastorage(zoneId, newEntry.value(), newEntry.revision());
                                       
                                       return;
                                   } 
   
                                   if (!removedNodes.isEmpty()) {
                                       if (autoAdjustScaleDown != Integer.MAX_VALUE) {
                                           saveDataNodesToMetastorage(zoneId, newEntry.value(), newEntry.revision());
                                       }
                                   }
   
                                   if (!addedNodes.isEmpty()) {
                                       if (autoAdjustScaleUp != Integer.MAX_VALUE) {
                                           saveDataNodesToMetastorage(zoneId, newEntry.value(), newEntry.revision());
                                       }
                                   }
   
   ```



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {

Review Comment:
   Please add other test scenarios with several zones, so the are all updated.
   
   Also please separate this test with scenarios when we have autoAdjust, scaleUp and scaleDown 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();

Review Comment:
   Please refactor code and extract new logic form start method to separate methods. Start method looks overloaded 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -373,4 +487,39 @@ private void updateMetaStorageOnZoneDelete(int zoneId, long revision) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Returns applied vault revision.
+     */
+    private long vaultAppliedRevision() {
+        try {
+            return vaultMgr.get(APPLIED_REV)
+                    .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+
+    /**
+     * Method updates data nodes value for the specified zone,
+     * also sets {@code revision} to the {@link DistributionZonesUtil#zonesChangeTriggerKey()} if it passes the condition.
+     *
+     * @param zoneId Unique id of a zone
+     * @param dataNodes Data nodes of a zone
+     * @param revision Revision of an event that has triggered this method.
+     */
+    private void saveDataNodesToMetastorage(int zoneId, byte[] dataNodes, long revision) {
+        Update dataNodesAndTriggerKeyUpd = updateDataNodesAndTriggerKey(zoneId, revision, dataNodes);
+
+        var iif = If.iif(triggerKeyCondition(revision), dataNodesAndTriggerKeyUpd, ops().yield(false));
+
+        metaStorageManager.invoke(iif).thenAccept(res -> {
+            if (res.getAsBoolean()) {
+                LOG.info("Delete zones' dataNodes key [zoneId = {}", zoneId);

Review Comment:
   lets use debug level here, otherwise we will spam logs 



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);

Review Comment:
   return completedFuture(null);



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            byte[] newLogicalTopology = vaultEntry.value();
+
+            logicalTopology = ByteUtils.fromBytes(newLogicalTopology);
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetastorage(zoneId, newLogicalTopology, vaultAppliedRevision);
+                    });
+        }
+
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {

Review Comment:
   should we `MetaStorageManager#unregisterWatch` in the stop section? 



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(4)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.isEmpty());
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockCreateZone();
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockCreateZone();
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(2, newDataNodes.size());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockCreateZone() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(List.of(ZONE_NAME));
+
+        DistributionZoneConfiguration distributionZoneConfiguration1 = mock(DistributionZoneConfiguration.class);
+        when(namedConfigurationTree.get(ZONE_NAME)).thenReturn(distributionZoneConfiguration1);
+
+        ConfigurationValue<Integer> zoneIdValue1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.zoneId()).thenReturn(zoneIdValue1);
+        when(zoneIdValue1.value()).thenReturn(1);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust1);
+        when(dataNodesAutoAdjust1.value()).thenReturn(100);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp1);
+        when(dataNodesAutoAdjustScaleUp1.value()).thenReturn(200);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown1);
+        when(dataNodesAutoAdjustScaleDown1.value()).thenReturn(300);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newlogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        org.apache.ignite.internal.metastorage.client.Entry newEntry =
+                new org.apache.ignite.internal.metastorage.client.EntryImpl(null, newlogicalTopology, rev, 1);
+
+        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+        WatchEvent evt = new WatchEvent(entryEvent);
+
+        watchListener.onUpdate(evt);
+    }
+
+    private void mockVaultAppliedRevision(long revision) {
+        when(vaultMgr.get(APPLIED_REV)).thenReturn(completedFuture(new VaultEntry(APPLIED_REV, longToBytes(revision))));
+    }
+
+    private LogicalTopologySnapshot mockCmgLocalNodes(Set<ClusterNode> clusterNodes) {
+        LogicalTopologySnapshot logicalTopologySnapshot = mock(LogicalTopologySnapshot.class);
+
+        when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
+
+        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes);
+
+        return logicalTopologySnapshot;
+    }
+
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode> clusterNodes) throws InterruptedException {
+        byte[] nodes = clusterNodes == null
+                ? null
+                : ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));

Review Comment:
   ByteUtils is redundat



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(nodes.size(), newDataNodes.size());
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(4)).invoke(any());
+
+        entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.isEmpty());
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        distributionZoneManager.start();
+
+        mockCreateZone();
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockCreateZone();
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockCreateZone();
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        Set<String> newDataNodes = ByteUtils.fromBytes(entry.value());
+
+        assertTrue(newDataNodes.containsAll(nodes));
+        assertEquals(2, newDataNodes.size());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockCreateZone() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(List.of(ZONE_NAME));
+
+        DistributionZoneConfiguration distributionZoneConfiguration1 = mock(DistributionZoneConfiguration.class);
+        when(namedConfigurationTree.get(ZONE_NAME)).thenReturn(distributionZoneConfiguration1);
+
+        ConfigurationValue<Integer> zoneIdValue1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.zoneId()).thenReturn(zoneIdValue1);
+        when(zoneIdValue1.value()).thenReturn(1);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust1);
+        when(dataNodesAutoAdjust1.value()).thenReturn(100);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp1);
+        when(dataNodesAutoAdjustScaleUp1.value()).thenReturn(200);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown1 = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration1.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown1);
+        when(dataNodesAutoAdjustScaleDown1.value()).thenReturn(300);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newlogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newlogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newlogicalTopology = toBytes(nodes);

Review Comment:
   newLogicalTopology



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1049902475


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            byte[] newLogicalTopology = vaultEntry.value();
+
+            logicalTopology = ByteUtils.fromBytes(newLogicalTopology);
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetastorage(zoneId, newLogicalTopology, vaultAppliedRevision);
+                    });
+        }
+
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteInternalException(new NodeStoppingException());
+                }
+
+                try {
+                    assert evt.single();
+
+                    Entry newEntry = evt.entryEvent().newEntry();
+
+                    Set<String> newlogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                    List<String> removedNodes =
+                            logicalTopology.stream().filter(node -> !newlogicalTopology.contains(node)).collect(toList());
+
+                    List<String> addedNodes =
+                            newlogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                    logicalTopology = newlogicalTopology;
+
+                    zonesConfiguration.distributionZones().value().namedListKeys()
+                            .forEach(zoneName -> {
+                                DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                Integer zoneId = zoneCfg.zoneId().value();
+
+                                if (!removedNodes.isEmpty()) {

Review Comment:
   Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
alievmirza commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1053992256


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME_1 = "zone1";
+    private static final String ZONE_NAME_2 = "zone2";
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        LogicalTopologyServiceImpl logicalTopologyService = mock(LogicalTopologyServiceImpl.class);
+
+        LogicalTopologySnapshot topologySnapshot = mock(LogicalTopologySnapshot.class);
+
+        when(topologySnapshot.version()).thenReturn(1L);
+        when(topologySnapshot.nodes()).thenReturn(Collections.emptySet());
+
+        when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(topologySnapshot));
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        mockVaultZonesLogicalTopologyKey(Set.of());
+
+        distributionZoneManager.start();
+
+        //TODO: Add second distribution zone, when distributionZones.change.trigger per zone will be created.
+        mockZones(mockZoneWithAutoAdjust());
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+    }
+
+    private void checkDataNodesOfZone(int zoneId, Set<String> nodes) {
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(zoneId).bytes());
+
+        if (nodes == null) {
+            assertNull(entry.value());
+        } else {
+            Set<String> newDataNodes = fromBytes(entry.value());
+
+            assertTrue(newDataNodes.containsAll(nodes));
+            assertEquals(nodes.size(), newDataNodes.size());
+        }
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        mockVaultZonesLogicalTopologyKey(Set.of());
+
+        distributionZoneManager.start();
+
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, null);
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, null);
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+    }
+
+    @Test
+    void testLogicalTopologyIsNullOnZoneManagerStart1() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(2);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, after(500).never()).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockZones(DistributionZoneConfiguration... zones) {
+        List<String> names = new ArrayList<>();
+
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        for (DistributionZoneConfiguration zone : zones) {
+            names.add(zone.name().value());
+
+            when(namedConfigurationTree.get(zone.name().value())).thenReturn(zone);
+        }
+
+        when(namedListView.namedListKeys()).thenReturn(names);
+    }
+
+    private DistributionZoneConfiguration mockZone(
+            Integer zoneId,
+            String name,
+            Integer dataNodesAutoAdjustTime,
+            Integer dataNodesAutoAdjustScaleUpTime,
+            Integer dataNodesAutoAdjustScaleDownTime
+    ) {
+        DistributionZoneConfiguration distributionZoneConfiguration = mock(DistributionZoneConfiguration.class);
+
+        ConfigurationValue<String> nameValue = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.name()).thenReturn(nameValue);
+        when(nameValue.value()).thenReturn(name);
+
+        ConfigurationValue<Integer> zoneIdValue = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.zoneId()).thenReturn(zoneIdValue);
+        when(zoneIdValue.value()).thenReturn(zoneId);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust);
+        when(dataNodesAutoAdjust.value()).thenReturn(dataNodesAutoAdjustTime);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp);
+        when(dataNodesAutoAdjustScaleUp.value()).thenReturn(dataNodesAutoAdjustScaleUpTime);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown);
+        when(dataNodesAutoAdjustScaleDown.value()).thenReturn(dataNodesAutoAdjustScaleDownTime);
+
+        return distributionZoneConfiguration;
+    }
+
+    private DistributionZoneConfiguration mockZoneWithAutoAdjust() {
+        return mockZone(1, ZONE_NAME_1, 100, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    }
+
+    private DistributionZoneConfiguration mockZoneWithAutoAdjustScaleUpScaleDown() {
+        return mockZone(2, ZONE_NAME_2, Integer.MAX_VALUE, 200, 300);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        org.apache.ignite.internal.metastorage.client.Entry newEntry =
+                new EntryImpl(zonesLogicalTopologyKey(), newLogicalTopology, rev, 1);
+
+        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+        WatchEvent evt = new WatchEvent(entryEvent);
+
+        watchListener.onUpdate(evt);
+    }
+
+    private void mockVaultAppliedRevision(long revision) {
+        when(vaultMgr.get(APPLIED_REV)).thenReturn(completedFuture(new VaultEntry(APPLIED_REV, longToBytes(revision))));
+    }
+
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode> clusterNodes) throws InterruptedException {

Review Comment:
   Here and below: unused code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1049876734


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {

Review Comment:
   We cannot create a test with several zone because we have `distributionZones.change.trigger` for all zones.
   When watch listener handle event and try to save data nodes for two zones it does two invokes with the same revision. So only one invoke is successful.
   I added the TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1049890192


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            byte[] newLogicalTopology = vaultEntry.value();
+
+            logicalTopology = ByteUtils.fromBytes(newLogicalTopology);
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetastorage(zoneId, newLogicalTopology, vaultAppliedRevision);
+                    });
+        }
+
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {

Review Comment:
   Sounds right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1050679258


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -235,6 +257,98 @@ public CompletableFuture<Void> dropZone(String name) {
     @Override
     public void start() {
         zonesConfiguration.distributionZones().listenElements(new ZonesConfigurationListener());
+
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {

Review Comment:
   As it was discussed I don't use `cmgManager` on start and in `DistributionZoneManager#updateMetaStorageOnZoneCreate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
alievmirza commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1052223469


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java:
##########
@@ -328,44 +297,40 @@ void testTriggerKeyNotPropagatedAfterZoneUpdate() throws Exception {
 
         assertZonesChangeTriggerKey(100);
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
     @Test
     void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        mockCmgLocalNodes(clusterNodes);
-
-        distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build());
+        distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(100));
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
 
-        distributionZoneManager.dropZone(ZONE_NAME);
+        distributionZoneManager.dropZone(ZONE_NAME).get();
 
         verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
-    private LogicalTopologySnapshot mockCmgLocalNodes(Set<ClusterNode> clusterNodes) {
-        LogicalTopologySnapshot logicalTopologySnapshot = mock(LogicalTopologySnapshot.class);
-
-        when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
-
-        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes);
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newLogicalTopology = toBytes(nodes);
 
-        return logicalTopologySnapshot;
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
     }
 
-    private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode> clusterNodes) throws InterruptedException {
-        byte[] nodes = clusterNodes == null
-                ? null
-                : ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<String> expectedNodes) throws InterruptedException {
+        if (expectedNodes == null) {
+            assertNull(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value());
+        } else {
+            Set<String> actual = fromBytes(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value());
 
-        assertTrue(waitForCondition(() -> Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(), nodes), 1000));
+            assertTrue(expectedNodes.containsAll(actual));
+            assertTrue(expectedNodes.size() == actual.size());

Review Comment:
   assertEquals(expectedNodes.size(), actual.size());



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java:
##########
@@ -196,65 +207,42 @@ public void tearDown() throws Exception {
 
     @Test
     void testDataNodesPropagationAfterZoneCreation() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        mockCmgLocalNodes(clusterNodes);
-
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
         assertZonesChangeTriggerKey(1);
     }
 
     @Test
     void testTriggerKeyPropagationAfterZoneUpdate() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        LogicalTopologySnapshot logicalTopologySnapshot = mockCmgLocalNodes(clusterNodes);
-
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();

Review Comment:
   lets add at the beginning of the test 
   
   ```
   assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
   ```



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java:
##########
@@ -196,65 +207,42 @@ public void tearDown() throws Exception {
 
     @Test
     void testDataNodesPropagationAfterZoneCreation() throws Exception {

Review Comment:
   lets add at the beginning of the test 
   ```
   assertDataNodesForZone(1, null);
   ```



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +541,136 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    private void initDataNodesFromVaultManager() {

Review Comment:
   Javadoc



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java:
##########
@@ -264,10 +252,6 @@ void testSeveralZoneCreationsUpdatesTriggerKey() throws Exception {
 
     @Test
     void testSeveralZoneUpdatesUpdatesTriggerKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        mockCmgLocalNodes(clusterNodes);
-
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();

Review Comment:
   lets add at the beginning of the test
   
   ```
   assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
   ```



##########
modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java:
##########
@@ -334,6 +334,9 @@ public static class DistributionZones {
 
         /** Distribution zone rename error. */
         public static final int ZONE_RENAME_ERR = DISTRIBUTION_ZONES_ERR_GROUP.registerErrorCode(3);
+
+        /** Distribution zone update error. */

Review Comment:
   This is a too wide exception, what does it mean "Distribution zone update"? You use this exception when you try to init DistributionZoneManager, we don't have any zone that is being updated.



##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java:
##########
@@ -196,65 +207,42 @@ public void tearDown() throws Exception {
 
     @Test
     void testDataNodesPropagationAfterZoneCreation() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        mockCmgLocalNodes(clusterNodes);
-
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
         assertZonesChangeTriggerKey(1);
     }
 
     @Test
     void testTriggerKeyPropagationAfterZoneUpdate() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        LogicalTopologySnapshot logicalTopologySnapshot = mockCmgLocalNodes(clusterNodes);
-
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
         assertZonesChangeTriggerKey(1);
 
-        var clusterNodes2 = Set.of(
-                new ClusterNode("1", "name1", null),
-                new ClusterNode("2", "name2", null)
-        );
-
-        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes2);
-
         distributionZoneManager.alterZone(
                 ZONE_NAME,
                 new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).dataNodesAutoAdjust(100).build()
         ).get();
 
         assertZonesChangeTriggerKey(2);
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
     @Test
     void testZoneDeleteRemovesMetaStorageKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        mockCmgLocalNodes(clusterNodes);
-
-        distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build());
+        distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
-        distributionZoneManager.dropZone(ZONE_NAME);
+        distributionZoneManager.dropZone(ZONE_NAME).get();
 
         assertTrue(waitForCondition(() -> keyValueStorage.get(zoneDataNodesKey(1).bytes()).value() == null, 5000));
     }
 
     @Test
     void testSeveralZoneCreationsUpdatesTriggerKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        mockCmgLocalNodes(clusterNodes);
-
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();

Review Comment:
   lets add at the beginning of the test
   
   ```
   assertNull(keyValueStorage.get(zonesChangeTriggerKey().bytes()).value());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
alievmirza commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1055186202


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME = "zone1";
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        cmgManager = mock(ClusterManagementGroupManager.class);
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                cmgManager,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return CompletableFuture.completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {

Review Comment:
   Please add ticket to that todo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1058811996


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -275,7 +310,9 @@ public void start() {
 
         logicalTopologyService.addEventListener(topologyEventListener);
 
-        initMetaStorageKeysOnStart();
+        registerMetaStorageWatchListener()

Review Comment:
   I added busy lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sanpwc commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sanpwc commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1053470125


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   Commonly, it's better to instantiate the state in one place only, meaning that it's better to move `= Collections.emptySet()` to constructor. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -137,7 +156,7 @@ public CompletableFuture<Void> createZone(DistributionZoneConfigurationParameter
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());

Review Comment:
   Here and in other places, please use withCause in order not to regenerate traceId.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();
+
+    /** Watch listener id to unregister the watch listener on {@link DistributionZoneManager#stop()}. */
+    private Long watchListenerId;

Review Comment:
   Same as above, this variable should be thread-safe. It's possible to you will try to set and read it from different threads.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();

Review Comment:
   Same, as above. It's not valid to call get() on futures in such cases. 
   There's no sense in going further with review until you fix it. 



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(UNEXPECTED_ERR, e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            logicalTopology = ByteUtils.fromBytes(vaultEntry.value());
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetaStorage(zoneId, toBytes(logicalTopology), vaultAppliedRevision);
+                    });
+        }
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of distribution zones on logical topology changing event.
+     */
+    private void registerMetaStorageWatchListener() {
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> !newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Returns applied vault revision.
+     *
+     * @return Applied vault revision.
+     */
+    private long vaultAppliedRevision() {
+        try {
+            return vaultMgr.get(APPLIED_REV)
+                    .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                    .get();

Review Comment:
   We should never-ever call get() or join() on futures in such cases. It's an important one.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   This variable should be thread safe, it's possible that you will init in one thread(node startup thread) and will try to read in watch thread, thus visibility is broken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1052433706


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +541,136 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    private void initDataNodesFromVaultManager() {

Review Comment:
   fixed



##########
modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java:
##########
@@ -334,6 +334,9 @@ public static class DistributionZones {
 
         /** Distribution zone rename error. */
         public static final int ZONE_RENAME_ERR = DISTRIBUTION_ZONES_ERR_GROUP.registerErrorCode(3);
+
+        /** Distribution zone update error. */

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1054507717


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.metastorage.MetaStorageManager.APPLIED_REV;
+import static org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl.toIfInfo;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.configuration.NamedConfigurationTree;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
+import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.EntryImpl;
+import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.StatementResult;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.metastorage.common.StatementResultInfo;
+import org.apache.ignite.internal.metastorage.common.command.MetaStorageCommandsFactory;
+import org.apache.ignite.internal.metastorage.common.command.MultiInvokeCommand;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.Command;
+import org.apache.ignite.internal.raft.WriteCommand;
+import org.apache.ignite.internal.raft.service.CommandClosure;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests distribution zones logical topology changes and reaction to that changes.
+ */
+public class DistributionZoneManagerWatchListenerTest extends IgniteAbstractTest {
+    private static final String ZONE_NAME_1 = "zone1";
+    private static final String ZONE_NAME_2 = "zone2";
+
+    private VaultManager vaultMgr;
+
+    private DistributionZoneManager distributionZoneManager;
+
+    private SimpleInMemoryKeyValueStorage keyValueStorage;
+
+    private ConfigurationManager clusterCfgMgr;
+
+    private WatchListener watchListener;
+
+    private DistributionZonesConfiguration zonesConfiguration;
+
+    private MetaStorageManager metaStorageManager;
+
+    @BeforeEach
+    public void setUp() {
+        clusterCfgMgr = new ConfigurationManager(
+                List.of(DistributionZonesConfiguration.KEY),
+                Map.of(),
+                new TestConfigurationStorage(DISTRIBUTED),
+                List.of(),
+                List.of()
+        );
+
+        zonesConfiguration = mock(DistributionZonesConfiguration.class);
+
+        metaStorageManager = mock(MetaStorageManager.class);
+
+        LogicalTopologyServiceImpl logicalTopologyService = mock(LogicalTopologyServiceImpl.class);
+
+        LogicalTopologySnapshot topologySnapshot = mock(LogicalTopologySnapshot.class);
+
+        when(topologySnapshot.version()).thenReturn(1L);
+        when(topologySnapshot.nodes()).thenReturn(Collections.emptySet());
+
+        when(logicalTopologyService.logicalTopologyOnLeader()).thenReturn(completedFuture(topologySnapshot));
+
+        vaultMgr = mock(VaultManager.class);
+
+        distributionZoneManager = new DistributionZoneManager(
+                zonesConfiguration,
+                metaStorageManager,
+                logicalTopologyService,
+                vaultMgr
+        );
+
+        clusterCfgMgr.start();
+
+        mockVaultAppliedRevision(1);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey())).thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        when(metaStorageManager.registerWatch(any(ByteArray.class), any())).then(invocation -> {
+            watchListener = invocation.getArgument(1);
+
+            return completedFuture(null);
+        });
+
+        mockEmptyZonesList();
+
+        AtomicLong raftIndex = new AtomicLong();
+
+        keyValueStorage = spy(new SimpleInMemoryKeyValueStorage());
+
+        MetaStorageListener metaStorageListener = new MetaStorageListener(keyValueStorage);
+
+        RaftGroupService metaStorageService = mock(RaftGroupService.class);
+
+        // Delegate directly to listener.
+        lenient().doAnswer(
+                invocationClose -> {
+                    Command cmd = invocationClose.getArgument(0);
+
+                    long commandIndex = raftIndex.incrementAndGet();
+
+                    CompletableFuture<Serializable> res = new CompletableFuture<>();
+
+                    CommandClosure<WriteCommand> clo = new CommandClosure<>() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public long index() {
+                            return commandIndex;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public WriteCommand command() {
+                            return (WriteCommand) cmd;
+                        }
+
+                        /** {@inheritDoc} */
+                        @Override
+                        public void result(@Nullable Serializable r) {
+                            if (r instanceof Throwable) {
+                                res.completeExceptionally((Throwable) r);
+                            } else {
+                                res.complete(r);
+                            }
+                        }
+                    };
+
+                    try {
+                        metaStorageListener.onWrite(List.of(clo).iterator());
+                    } catch (Throwable e) {
+                        res.completeExceptionally(new IgniteInternalException(e));
+                    }
+
+                    return res;
+                }
+        ).when(metaStorageService).run(any());
+
+        MetaStorageCommandsFactory commandsFactory = new MetaStorageCommandsFactory();
+
+        lenient().doAnswer(invocationClose -> {
+            If iif = invocationClose.getArgument(0);
+
+            MultiInvokeCommand multiInvokeCommand = commandsFactory.multiInvokeCommand().iif(toIfInfo(iif, commandsFactory)).build();
+
+            return metaStorageService.run(multiInvokeCommand).thenApply(bi -> new StatementResult(((StatementResultInfo) bi).result()));
+        }).when(metaStorageManager).invoke(any());
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        vaultMgr.stop();
+
+        distributionZoneManager.stop();
+
+        clusterCfgMgr.stop();
+
+        keyValueStorage.close();
+    }
+
+    @Test
+    void testDataNodesUpdatedOnWatchListenerEvent() {
+        mockVaultZonesLogicalTopologyKey(Set.of());
+
+        distributionZoneManager.start();
+
+        //TODO: Add second distribution zone, when distributionZones.change.trigger per zone will be created.
+        mockZones(mockZoneWithAutoAdjust());
+
+        //first event
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, 1);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+
+        //second event
+
+        nodes = Set.of("node1", "node3");
+
+        watchListenerOnUpdate(nodes, 2);
+
+        verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+
+        //third event
+
+        nodes = Collections.emptySet();
+
+        watchListenerOnUpdate(nodes, 3);
+
+        verify(keyValueStorage, timeout(1000).times(3)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+    }
+
+    private void checkDataNodesOfZone(int zoneId, Set<String> nodes) {
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(zoneId).bytes());
+
+        if (nodes == null) {
+            assertNull(entry.value());
+        } else {
+            Set<String> newDataNodes = fromBytes(entry.value());
+
+            assertTrue(newDataNodes.containsAll(nodes));
+            assertEquals(nodes.size(), newDataNodes.size());
+        }
+    }
+
+    @Test
+    void testStaleWatchEvent() {
+        mockVaultZonesLogicalTopologyKey(Set.of());
+
+        distributionZoneManager.start();
+
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(1);
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        watchListenerOnUpdate(nodes, revision);
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, null);
+    }
+
+    @Test
+    void testStaleVaultRevisionOnZoneManagerStart() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        long revision = 100;
+
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(revision));
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        mockVaultAppliedRevision(revision);
+
+        distributionZoneManager.start();
+
+        verify(metaStorageManager, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, null);
+    }
+
+    @Test
+    void testDataNodesUpdatedOnZoneManagerStart() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(2);
+
+        Set<String> nodes = Set.of("node1", "node2");
+
+        mockVaultZonesLogicalTopologyKey(nodes);
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, timeout(1000).times(1)).invoke(any());
+
+        checkDataNodesOfZone(1, nodes);
+    }
+
+    @Test
+    void testLogicalTopologyIsNullOnZoneManagerStart1() {
+        mockZones(mockZoneWithAutoAdjust());
+
+        mockVaultAppliedRevision(2);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), null)));
+
+        distributionZoneManager.start();
+
+        verify(keyValueStorage, after(500).never()).invoke(any());
+
+        Entry entry = keyValueStorage.get(zoneDataNodesKey(1).bytes());
+
+        assertNull(entry.value());
+    }
+
+    private void mockEmptyZonesList() {
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        when(zonesConfiguration.distributionZones().value().namedListKeys()).thenReturn(Collections.emptyList());
+    }
+
+    private void mockZones(DistributionZoneConfiguration... zones) {
+        List<String> names = new ArrayList<>();
+
+        NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> namedConfigurationTree =
+                mock(NamedConfigurationTree.class);
+        when(zonesConfiguration.distributionZones()).thenReturn(namedConfigurationTree);
+
+        NamedListView<DistributionZoneView> namedListView = mock(NamedListView.class);
+        when(namedConfigurationTree.value()).thenReturn(namedListView);
+
+        for (DistributionZoneConfiguration zone : zones) {
+            names.add(zone.name().value());
+
+            when(namedConfigurationTree.get(zone.name().value())).thenReturn(zone);
+        }
+
+        when(namedListView.namedListKeys()).thenReturn(names);
+    }
+
+    private DistributionZoneConfiguration mockZone(
+            Integer zoneId,
+            String name,
+            Integer dataNodesAutoAdjustTime,
+            Integer dataNodesAutoAdjustScaleUpTime,
+            Integer dataNodesAutoAdjustScaleDownTime
+    ) {
+        DistributionZoneConfiguration distributionZoneConfiguration = mock(DistributionZoneConfiguration.class);
+
+        ConfigurationValue<String> nameValue = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.name()).thenReturn(nameValue);
+        when(nameValue.value()).thenReturn(name);
+
+        ConfigurationValue<Integer> zoneIdValue = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.zoneId()).thenReturn(zoneIdValue);
+        when(zoneIdValue.value()).thenReturn(zoneId);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjust = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.dataNodesAutoAdjust()).thenReturn(dataNodesAutoAdjust);
+        when(dataNodesAutoAdjust.value()).thenReturn(dataNodesAutoAdjustTime);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleUp = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.dataNodesAutoAdjustScaleUp()).thenReturn(dataNodesAutoAdjustScaleUp);
+        when(dataNodesAutoAdjustScaleUp.value()).thenReturn(dataNodesAutoAdjustScaleUpTime);
+
+        ConfigurationValue<Integer> dataNodesAutoAdjustScaleDown = mock(ConfigurationValue.class);
+        when(distributionZoneConfiguration.dataNodesAutoAdjustScaleDown()).thenReturn(dataNodesAutoAdjustScaleDown);
+        when(dataNodesAutoAdjustScaleDown.value()).thenReturn(dataNodesAutoAdjustScaleDownTime);
+
+        return distributionZoneConfiguration;
+    }
+
+    private DistributionZoneConfiguration mockZoneWithAutoAdjust() {
+        return mockZone(1, ZONE_NAME_1, 100, Integer.MAX_VALUE, Integer.MAX_VALUE);
+    }
+
+    private DistributionZoneConfiguration mockZoneWithAutoAdjustScaleUpScaleDown() {
+        return mockZone(2, ZONE_NAME_2, Integer.MAX_VALUE, 200, 300);
+    }
+
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
+    }
+
+    private void watchListenerOnUpdate(Set<String> nodes, long rev) {
+        byte[] newLogicalTopology = toBytes(nodes);
+
+        org.apache.ignite.internal.metastorage.client.Entry newEntry =
+                new EntryImpl(zonesLogicalTopologyKey(), newLogicalTopology, rev, 1);
+
+        EntryEvent entryEvent = new EntryEvent(null, newEntry);
+
+        WatchEvent evt = new WatchEvent(entryEvent);
+
+        watchListener.onUpdate(evt);
+    }
+
+    private void mockVaultAppliedRevision(long revision) {
+        when(vaultMgr.get(APPLIED_REV)).thenReturn(completedFuture(new VaultEntry(APPLIED_REV, longToBytes(revision))));
+    }
+
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode> clusterNodes) throws InterruptedException {

Review Comment:
   fixed



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1054508013


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -524,4 +540,140 @@ private void initMetaStorageKeysOnStart() {
             }
         });
     }
+
+    /**
+     * Initialises data nodes of distribution zones in meta storage
+     * from {@link DistributionZonesUtil#zonesLogicalTopologyKey()} in vault.
+     */
+    private void initDataNodesFromVaultManager() {
+        long vaultAppliedRevision = vaultAppliedRevision();
+
+        VaultEntry vaultEntry;
+
+        try {
+            vaultEntry = vaultMgr.get(zonesLogicalTopologyKey()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IgniteInternalException(UNEXPECTED_ERR, e);
+        }
+
+        if (vaultEntry != null && vaultEntry.value() != null) {
+            logicalTopology = ByteUtils.fromBytes(vaultEntry.value());
+
+            zonesConfiguration.distributionZones().value().namedListKeys()
+                    .forEach(zoneName -> {
+                        int zoneId = zonesConfiguration.distributionZones().get(zoneName).zoneId().value();
+
+                        saveDataNodesToMetaStorage(zoneId, toBytes(logicalTopology), vaultAppliedRevision);
+                    });
+        }
+    }
+
+    /**
+     * Registers {@link WatchListener} which updates data nodes of distribution zones on logical topology changing event.
+     */
+    private void registerMetaStorageWatchListener() {
+        metaStorageManager.registerWatch(zonesLogicalTopologyKey(), new WatchListener() {
+                    @Override
+                    public boolean onUpdate(@NotNull WatchEvent evt) {
+                        if (!busyLock.enterBusy()) {
+                            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+                        }
+
+                        try {
+                            assert evt.single();
+
+                            Entry newEntry = evt.entryEvent().newEntry();
+
+                            Set<String> newLogicalTopology = ByteUtils.fromBytes(newEntry.value());
+
+                            List<String> removedNodes =
+                                    logicalTopology.stream().filter(node -> !newLogicalTopology.contains(node)).collect(toList());
+
+                            List<String> addedNodes =
+                                    newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toList());
+
+                            logicalTopology = newLogicalTopology;
+
+                            zonesConfiguration.distributionZones().value().namedListKeys()
+                                    .forEach(zoneName -> {
+                                        DistributionZoneConfiguration zoneCfg = zonesConfiguration.distributionZones().get(zoneName);
+
+                                        int autoAdjust = zoneCfg.dataNodesAutoAdjust().value();
+                                        int autoAdjustScaleDown = zoneCfg.dataNodesAutoAdjustScaleDown().value();
+                                        int autoAdjustScaleUp = zoneCfg.dataNodesAutoAdjustScaleUp().value();
+
+                                        Integer zoneId = zoneCfg.zoneId().value();
+
+                                        if ((!addedNodes.isEmpty() || !removedNodes.isEmpty()) && autoAdjust != Integer.MAX_VALUE) {
+                                            //TODO: IGNITE-18134 Create scheduler with dataNodesAutoAdjust timer.
+                                            saveDataNodesToMetaStorage(
+                                                    zoneId, newEntry.value(), newEntry.revision()
+                                            );
+                                        } else {
+                                            if (!addedNodes.isEmpty() && autoAdjustScaleUp != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18121 Create scale up scheduler with dataNodesAutoAdjustScaleUp timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+
+                                            if (!removedNodes.isEmpty() && autoAdjustScaleDown != Integer.MAX_VALUE) {
+                                                //TODO: IGNITE-18132 Create scale down scheduler with dataNodesAutoAdjustScaleDown timer.
+                                                saveDataNodesToMetaStorage(
+                                                        zoneId, newEntry.value(), newEntry.revision()
+                                                );
+                                            }
+                                        }
+                                    });
+
+                            return true;
+                        } finally {
+                            busyLock.leaveBusy();
+                        }
+                    }
+
+                    @Override
+                    public void onError(@NotNull Throwable e) {
+                        LOG.warn("Unable to process logical topology event", e);
+                    }
+                })
+                .thenAccept(id -> watchListenerId = id);
+    }
+
+    /**
+     * Returns applied vault revision.
+     *
+     * @return Applied vault revision.
+     */
+    private long vaultAppliedRevision() {
+        try {
+            return vaultMgr.get(APPLIED_REV)
+                    .thenApply(appliedRevision -> appliedRevision == null ? 0L : bytesToLong(appliedRevision.value()))
+                    .get();

Review Comment:
   fixed



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();
+
+    /** Watch listener id to unregister the watch listener on {@link DistributionZoneManager#stop()}. */
+    private Long watchListenerId;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1054508290


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -107,24 +120,30 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private Set<String> logicalTopology = Collections.emptySet();

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] alievmirza commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
alievmirza commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1054811350


##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java:
##########
@@ -328,44 +306,40 @@ void testTriggerKeyNotPropagatedAfterZoneUpdate() throws Exception {
 
         assertZonesChangeTriggerKey(100);
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
     @Test
     void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
-        Set<ClusterNode> clusterNodes = Set.of(new ClusterNode("1", "name1", null));
-
-        mockCmgLocalNodes(clusterNodes);
-
-        distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build());
+        distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
 
-        keyValueStorage.put(zonesChangeTriggerKey().bytes(), ByteUtils.longToBytes(100));
+        keyValueStorage.put(zonesChangeTriggerKey().bytes(), longToBytes(100));
 
-        distributionZoneManager.dropZone(ZONE_NAME);
+        distributionZoneManager.dropZone(ZONE_NAME).get();
 
         verify(keyValueStorage, timeout(1000).times(2)).invoke(any());
 
-        assertDataNodesForZone(1, clusterNodes);
+        assertDataNodesForZone(1, nodes);
     }
 
-    private LogicalTopologySnapshot mockCmgLocalNodes(Set<ClusterNode> clusterNodes) {
-        LogicalTopologySnapshot logicalTopologySnapshot = mock(LogicalTopologySnapshot.class);
-
-        when(cmgManager.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
-
-        when(logicalTopologySnapshot.nodes()).thenReturn(clusterNodes);
+    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
+        byte[] newLogicalTopology = toBytes(nodes);
 
-        return logicalTopologySnapshot;
+        when(vaultMgr.get(zonesLogicalTopologyKey()))
+                .thenReturn(completedFuture(new VaultEntry(zonesLogicalTopologyKey(), newLogicalTopology)));
     }
 
-    private void assertDataNodesForZone(int zoneId, @Nullable Set<ClusterNode> clusterNodes) throws InterruptedException {
-        byte[] nodes = clusterNodes == null
-                ? null
-                : ByteUtils.toBytes(clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()));
+    private void assertDataNodesForZone(int zoneId, @Nullable Set<String> expectedNodes) throws InterruptedException {

Review Comment:
   Why did you decide to change that method? It is possible that changes will be propagated with some delay, lets keep it as it was.
   
   ```
       private void assertDataNodesForZone(int zoneId, @Nullable Set<String> clusterNodes) throws InterruptedException {
           byte[] nodes = clusterNodes == null ? null : toBytes(clusterNodes);
   
           assertTrue(waitForCondition(() -> Arrays.equals(keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value(), nodes), 1000));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1057776707


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -108,24 +120,32 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
         }
     };
 
+    /** The logical topology on the last watch event. */
+    private volatile Set<String> logicalTopology;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sergeyuttsel commented on a diff in pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sergeyuttsel commented on code in PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426#discussion_r1058813054


##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java:
##########
@@ -192,7 +217,7 @@ public CompletableFuture<Void> alterZone(String name, DistributionZoneConfigurat
         Objects.requireNonNull(distributionZoneCfg, "Distribution zone configuration is null.");
 
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());

Review Comment:
   fixed in https://github.com/gridgain/apache-ignite-3/pull/85



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sanpwc merged pull request #1426: IGNITE-18115 Populate DistributionZoneManager with MetaStorage listeners to logical topology events.

Posted by GitBox <gi...@apache.org>.
sanpwc merged PR #1426:
URL: https://github.com/apache/ignite-3/pull/1426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org