You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/05/17 17:18:28 UTC

[ignite-3] branch main updated: IGNITE-18955 The ability to use filters when data nodes are calculated added. (#1958)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ca4914616 IGNITE-18955 The ability to use filters when data nodes are calculated added. (#1958)
9ca4914616 is described below

commit 9ca4914616a719f376b2aa8dc394e5cabbfd6cbd
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Wed May 17 21:18:22 2023 +0400

    IGNITE-18955 The ability to use filters when data nodes are calculated added. (#1958)
---
 modules/distribution-zones/build.gradle            |   3 +
 .../distributionzones/DistributionZoneManager.java | 286 +++++++-------
 .../distributionzones/DistributionZonesUtil.java   |  47 ++-
 .../ignite/internal/distributionzones/Node.java    |  72 ++++
 .../distributionzones/NodeWithAttributes.java      |  80 ++++
 .../rebalance/DistributionZoneRebalanceEngine.java |  14 +-
 .../BaseDistributionZoneManagerTest.java           |   7 +
 .../DistributionZoneAwaitDataNodesTest.java        | 149 ++++----
 .../DistributionZoneFiltersTest.java               |  25 +-
 ...ibutionZoneManagerConfigurationChangesTest.java |  23 +-
 .../DistributionZoneManagerFilterTest.java         | 143 +++++++
 ...butionZoneManagerLogicalTopologyEventsTest.java |   4 +-
 .../DistributionZoneManagerScaleUpTest.java        | 424 ++++++++++-----------
 .../DistributionZoneManagerWatchListenerTest.java  |  68 ++--
 .../DistributionZoneRebalanceEngineTest.java       |   8 +-
 .../util/DistributionZonesTestUtil.java            | 251 ------------
 .../DistributionZonesTestUtil.java                 | 353 +++++++++++++++++
 .../zones/ItDistributionZonesFilterTest.java       | 272 +++++++++++++
 .../sql/engine/exec/ddl/DdlCommandHandler.java     |   4 +
 .../internal/table/distributed/TableManager.java   |   3 +-
 20 files changed, 1478 insertions(+), 758 deletions(-)

diff --git a/modules/distribution-zones/build.gradle b/modules/distribution-zones/build.gradle
index 9219929c8c..c0d38eff4f 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -64,9 +64,12 @@ dependencies {
     testFixturesImplementation libs.jetbrains.annotations
     testFixturesImplementation libs.mockito.core
     testFixturesImplementation libs.mockito.junit
+    testFixturesImplementation libs.hamcrest.core
     testFixturesImplementation project(':ignite-raft-api')
     testFixturesImplementation project(':ignite-metastorage')
     testFixturesImplementation project(':ignite-schema')
+    testFixturesImplementation project(':ignite-cluster-management')
+    testFixturesImplementation project(':ignite-vault')
     testFixturesImplementation(testFixtures(project(':ignite-core')))
     testFixturesImplementation(testFixtures(project(':ignite-metastorage')))
 }
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 15debe3b5a..6ee1017f77 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -24,11 +24,11 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.isZoneExist;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
@@ -129,7 +129,6 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteSystemProperties;
 import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -238,7 +237,14 @@ public class DistributionZoneManager implements IgniteComponent {
      * The logical topology on the last watch event.
      * It's enough to mark this field by volatile because we don't update the collection after it is assigned to the field.
      */
-    private volatile Set<String> logicalTopology;
+    private volatile Set<NodeWithAttributes> logicalTopology;
+
+    /**
+     * Local mapping of {@code nodeId} -> node's attributes, where {@code nodeId} is a node id, that changes between restarts.
+     * This map is updated every time we receive a topology event in a {@code topologyWatchListener}.
+     * TODO: https://issues.apache.org/jira/browse/IGNITE-19491 properly clean up this map
+     */
+    private final Map<String, Map<String, String>> nodesAttributes;
 
     /** Watch listener for logical topology keys. */
     private final WatchListener topologyWatchListener;
@@ -281,6 +287,8 @@ public class DistributionZoneManager implements IgniteComponent {
 
         logicalTopology = emptySet();
 
+        nodesAttributes = new ConcurrentHashMap<>();
+
         executor = new ScheduledThreadPoolExecutor(
                 Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
                 new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
@@ -302,9 +310,73 @@ public class DistributionZoneManager implements IgniteComponent {
         );
     }
 
-    @TestOnly
-    Map<Integer, ZoneState> zonesTimers() {
-        return zonesState;
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());
+        }
+
+        try {
+            ZonesConfigurationListener zonesConfigurationListener = new ZonesConfigurationListener();
+
+            zonesConfiguration.distributionZones().listenElements(zonesConfigurationListener);
+            zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
+            zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+
+            zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
+            zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
+            zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+
+            rebalanceEngine.start();
+
+            // Init timers after restart.
+            zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
+
+            zonesConfiguration.distributionZones().value().forEach(zone -> {
+                int zoneId = zone.zoneId();
+
+                zonesState.putIfAbsent(zoneId, new ZoneState(executor));
+            });
+
+            logicalTopologyService.addEventListener(topologyEventListener);
+
+            metaStorageManager.registerPrefixWatch(zoneLogicalTopologyPrefix(), topologyWatchListener);
+            metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(), dataNodesWatchListener);
+
+            initDataNodesFromVaultManager();
+
+            initLogicalTopologyAndVersionInMetaStorageOnStart();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        if (!stopGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        rebalanceEngine.stop();
+
+        logicalTopologyService.removeEventListener(topologyEventListener);
+
+        metaStorageManager.unregisterWatch(topologyWatchListener);
+        metaStorageManager.unregisterWatch(dataNodesWatchListener);
+
+        //Need to update trackers with max possible value to complete all futures that are waiting for trackers.
+        topVerTracker.update(Long.MAX_VALUE, null);
+
+        zonesState.values().forEach(zoneState -> {
+            zoneState.scaleUpRevisionTracker().update(Long.MAX_VALUE, null);
+            zoneState.scaleDownRevisionTracker().update(Long.MAX_VALUE, null);
+        });
+
+        shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
     }
 
     /**
@@ -611,7 +683,7 @@ public class DistributionZoneManager implements IgniteComponent {
         return allOf(
                 timerValuesFut.thenCompose(timerValues -> scaleUpAwaiting(zoneId, timerValues.get1())),
                 timerValuesFut.thenCompose(timerValues -> scaleDownAwaiting(zoneId, timerValues.get2()))
-        ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+        ).thenApply(ignored -> dataNodes(zoneId));
     }
 
     /**
@@ -703,65 +775,24 @@ public class DistributionZoneManager implements IgniteComponent {
     }
 
     /**
-     * Returns the future with data nodes of the specified zone.
+     * Returns the data nodes of the specified zone.
      *
      * @param zoneId Zone id.
-     * @return Future.
+     * @return The latest data nodes.
      */
-    private CompletableFuture<Set<String>> getDataNodesFuture(int zoneId) {
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 Proper causality token based implementation is expected.
+    public Set<String> dataNodes(int zoneId) {
         return inBusyLock(busyLock, () -> {
             ZoneState zoneState = zonesState.get(zoneId);
 
             if (zoneState != null) {
-                return completedFuture(zonesState.get(zoneId).nodes());
+                return zonesState.get(zoneId).nodes();
             } else {
-                return failedFuture(new DistributionZoneWasRemovedException(zoneId));
+                throw new DistributionZoneWasRemovedException(zoneId);
             }
         });
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void start() {
-        if (!busyLock.enterBusy()) {
-            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());
-        }
-
-        try {
-            ZonesConfigurationListener zonesConfigurationListener = new ZonesConfigurationListener();
-
-            zonesConfiguration.distributionZones().listenElements(zonesConfigurationListener);
-            zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
-            zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
-
-            zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
-            zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
-            zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
-
-            rebalanceEngine.start();
-
-            // Init timers after restart.
-            zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
-
-            zonesConfiguration.distributionZones().value().forEach(zone -> {
-                int zoneId = zone.zoneId();
-
-                zonesState.putIfAbsent(zoneId, new ZoneState(executor));
-            });
-
-            logicalTopologyService.addEventListener(topologyEventListener);
-
-            metaStorageManager.registerPrefixWatch(zoneLogicalTopologyPrefix(), topologyWatchListener);
-            metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(), dataNodesWatchListener);
-
-            initDataNodesFromVaultManager();
-
-            initLogicalTopologyAndVersionInMetaStorageOnStart();
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     /**
      * Creates configuration listener for updates of scale up value.
      *
@@ -850,33 +881,6 @@ public class DistributionZoneManager implements IgniteComponent {
         };
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void stop() throws Exception {
-        if (!stopGuard.compareAndSet(false, true)) {
-            return;
-        }
-
-        busyLock.block();
-
-        rebalanceEngine.stop();
-
-        logicalTopologyService.removeEventListener(topologyEventListener);
-
-        metaStorageManager.unregisterWatch(topologyWatchListener);
-        metaStorageManager.unregisterWatch(dataNodesWatchListener);
-
-        //Need to update trackers with max possible value to complete all futures that are waiting for trackers.
-        topVerTracker.update(Long.MAX_VALUE, null);
-
-        zonesState.values().forEach(zoneState -> {
-            zoneState.scaleUpRevisionTracker().update(Long.MAX_VALUE, null);
-            zoneState.scaleDownRevisionTracker().update(Long.MAX_VALUE, null);
-        });
-
-        shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
-    }
-
     private class ZonesConfigurationListener implements ConfigurationNamedListListener<DistributionZoneView> {
         @Override
         public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
@@ -886,7 +890,11 @@ public class DistributionZoneManager implements IgniteComponent {
 
             zonesState.putIfAbsent(zoneId, zoneState);
 
-            saveDataNodesAndUpdateTriggerKeysInMetaStorage(zoneId, ctx.storageRevision(), logicalTopology);
+            saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+                    zoneId,
+                    ctx.storageRevision(),
+                    logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
+            );
 
             return completedFuture(null);
         }
@@ -917,7 +925,7 @@ public class DistributionZoneManager implements IgniteComponent {
      * @param revision Revision of an event that has triggered this method.
      * @param dataNodes Data nodes.
      */
-    private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId, long revision, Set<String> dataNodes) {
+    private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId, long revision, Set<Node> dataNodes) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
         }
@@ -1032,7 +1040,7 @@ public class DistributionZoneManager implements IgniteComponent {
         }
 
         try {
-            Set<String> topologyFromCmg = newTopology.nodes().stream().map(ClusterNode::name).collect(toSet());
+            Set<LogicalNode> logicalTopology = newTopology.nodes();
 
             Condition updateCondition;
 
@@ -1045,7 +1053,7 @@ public class DistributionZoneManager implements IgniteComponent {
 
             Iif iff = iif(
                     updateCondition,
-                    updateLogicalTopologyAndVersion(topologyFromCmg, newTopology.version()),
+                    updateLogicalTopologyAndVersion(logicalTopology, newTopology.version()),
                     ops().yield(false)
             );
 
@@ -1054,19 +1062,19 @@ public class DistributionZoneManager implements IgniteComponent {
                     LOG.error(
                             "Failed to update distribution zones' logical topology and version keys [topology = {}, version = {}]",
                             e,
-                            Arrays.toString(topologyFromCmg.toArray()),
+                            Arrays.toString(logicalTopology.toArray()),
                             newTopology.version()
                     );
                 } else if (res.getAsBoolean()) {
                     LOG.debug(
                             "Distribution zones' logical topology and version keys were updated [topology = {}, version = {}]",
-                            Arrays.toString(topologyFromCmg.toArray()),
+                            Arrays.toString(logicalTopology.toArray()),
                             newTopology.version()
                     );
                 } else {
                     LOG.debug(
                             "Failed to update distribution zones' logical topology and version keys [topology = {}, version = {}]",
-                            Arrays.toString(topologyFromCmg.toArray()),
+                            Arrays.toString(logicalTopology.toArray()),
                             newTopology.version()
                     );
                 }
@@ -1101,7 +1109,7 @@ public class DistributionZoneManager implements IgniteComponent {
                     byte[] topVerFromMetaStorage = topVerEntry.value();
 
                     if (topVerFromMetaStorage == null || bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
-                        Set<String> topologyFromCmg = snapshot.nodes().stream().map(ClusterNode::name).collect(toSet());
+                        Set<LogicalNode> topologyFromCmg = snapshot.nodes();
 
                         Condition topologyVersionCondition = topVerFromMetaStorage == null
                                 ? notExists(zonesLogicalTopologyVersionKey()) :
@@ -1176,11 +1184,13 @@ public class DistributionZoneManager implements IgniteComponent {
 
                 logicalTopology = fromBytes(topologyEntry.value());
 
+                logicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n.nodeAttributes()));
+
                 // init keys and data nodes for default zone
                 saveDataNodesAndUpdateTriggerKeysInMetaStorage(
                         DEFAULT_ZONE_ID,
                         appliedRevision,
-                        logicalTopology
+                        logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
                 );
 
                 zonesConfiguration.distributionZones().value().forEach(zone -> {
@@ -1189,7 +1199,7 @@ public class DistributionZoneManager implements IgniteComponent {
                     saveDataNodesAndUpdateTriggerKeysInMetaStorage(
                             zoneId,
                             appliedRevision,
-                            logicalTopology
+                            logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
                     );
                 });
             }
@@ -1199,7 +1209,7 @@ public class DistributionZoneManager implements IgniteComponent {
 
                 zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision, null);
 
-                zoneState.nodes(logicalTopology);
+                zoneState.nodes(logicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet()));
             });
 
             assert topologyEntry == null || topologyEntry.value() == null || logicalTopology.equals(fromBytes(topologyEntry.value()))
@@ -1232,7 +1242,7 @@ public class DistributionZoneManager implements IgniteComponent {
 
                     byte[] newLogicalTopologyBytes = null;
 
-                    Set<String> newLogicalTopology = null;
+                    Set<NodeWithAttributes> newLogicalTopology = null;
 
                     long revision = 0;
 
@@ -1253,13 +1263,19 @@ public class DistributionZoneManager implements IgniteComponent {
                     assert newLogicalTopology != null : "The event doesn't contain logical topology";
                     assert revision > 0 : "The event doesn't contain logical topology version";
 
-                    Set<String> newLogicalTopology0 = newLogicalTopology;
+                    Set<NodeWithAttributes> newLogicalTopology0 = newLogicalTopology;
 
-                    Set<String> removedNodes =
-                            logicalTopology.stream().filter(node -> !newLogicalTopology0.contains(node)).collect(toSet());
+                    Set<Node> removedNodes =
+                            logicalTopology.stream()
+                                    .filter(node -> !newLogicalTopology0.contains(node))
+                                    .map(NodeWithAttributes::node)
+                                    .collect(toSet());
 
-                    Set<String> addedNodes =
-                            newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toSet());
+                    Set<Node> addedNodes =
+                            newLogicalTopology.stream()
+                                    .filter(node -> !logicalTopology.contains(node))
+                                    .map(NodeWithAttributes::node)
+                                    .collect(toSet());
 
                     //Firstly update lastScaleUpRevision and lastScaleDownRevision then update topVerTracker to ensure thread-safety.
                     if (!addedNodes.isEmpty()) {
@@ -1272,6 +1288,8 @@ public class DistributionZoneManager implements IgniteComponent {
 
                     topVerTracker.update(topVer, null);
 
+                    newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n.nodeAttributes()));
+
                     logicalTopology = newLogicalTopology;
 
                     NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> zones =
@@ -1316,7 +1334,7 @@ public class DistributionZoneManager implements IgniteComponent {
                 try {
                     int zoneId = 0;
 
-                    Set<String> newDataNodes = null;
+                    Set<Node> newDataNodes = null;
 
                     long scaleUpRevision = 0;
 
@@ -1331,7 +1349,7 @@ public class DistributionZoneManager implements IgniteComponent {
                             byte[] dataNodesBytes = e.value();
 
                             if (dataNodesBytes != null) {
-                                newDataNodes = dataNodes(fromBytes(dataNodesBytes));
+                                newDataNodes = DistributionZonesUtil.dataNodes(fromBytes(dataNodesBytes));
                             } else {
                                 newDataNodes = emptySet();
                             }
@@ -1355,7 +1373,9 @@ public class DistributionZoneManager implements IgniteComponent {
 
                     assert newDataNodes != null : "Data nodes was not initialized.";
 
-                    zoneState.nodes(newDataNodes);
+                    String filter = getZoneById(zonesConfiguration, zoneId).filter().value();
+
+                    zoneState.nodes(filterDataNodes(newDataNodes, filter, nodesAttributes()));
 
                     //Associates scale up meta storage revision and data nodes.
                     if (scaleUpRevision > 0) {
@@ -1390,8 +1410,8 @@ public class DistributionZoneManager implements IgniteComponent {
      */
     private void scheduleTimers(
             DistributionZoneView zoneCfg,
-            Set<String> addedNodes,
-            Set<String> removedNodes,
+            Set<Node> addedNodes,
+            Set<Node> removedNodes,
             long revision
     ) {
         scheduleTimers(
@@ -1416,8 +1436,8 @@ public class DistributionZoneManager implements IgniteComponent {
      */
     void scheduleTimers(
             DistributionZoneView zoneCfg,
-            Set<String> addedNodes,
-            Set<String> removedNodes,
+            Set<Node> addedNodes,
+            Set<Node> removedNodes,
             long revision,
             BiFunction<Integer, Long, CompletableFuture<Void>> saveDataNodesOnScaleUp,
             BiFunction<Integer, Long, CompletableFuture<Void>> saveDataNodesOnScaleDown
@@ -1516,7 +1536,7 @@ public class DistributionZoneManager implements IgniteComponent {
                     return completedFuture(null);
                 }
 
-                Map<String, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
+                Map<Node, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
 
                 long scaleUpTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleUpChangeTriggerKey(zoneId)));
 
@@ -1526,12 +1546,16 @@ public class DistributionZoneManager implements IgniteComponent {
                     return completedFuture(null);
                 }
 
-                List<String> deltaToAdd = zoneState.nodesToBeAddedToDataNodes(scaleUpTriggerRevision, revision);
+                List<Node> deltaToAdd = zoneState.nodesToBeAddedToDataNodes(scaleUpTriggerRevision, revision);
 
-                Map<String, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
+                Map<Node, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
 
                 deltaToAdd.forEach(n -> newDataNodes.merge(n, 1, Integer::sum));
 
+                // Update dataNodes, so nodeId will be updated with the latest seen data on the node.
+                // For example, node could be restarted with new nodeId, we need to update it in the data nodes.
+                deltaToAdd.forEach(n -> newDataNodes.put(n, newDataNodes.remove(n)));
+
                 // Remove redundant nodes that are not presented in the data nodes.
                 newDataNodes.entrySet().removeIf(e -> e.getValue() == 0);
 
@@ -1547,6 +1571,8 @@ public class DistributionZoneManager implements IgniteComponent {
                         .thenApply(StatementResult::getAsBoolean)
                         .thenCompose(invokeResult -> inBusyLock(busyLock, () -> {
                             if (invokeResult) {
+                                // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
+                                // Currently we call clean up only on a node that successfully writes data nodes.
                                 zoneState.cleanUp(Math.min(scaleDownTriggerRevision, revision));
                             } else {
                                 LOG.debug("Updating data nodes for a zone has not succeeded [zoneId = {}]", zoneId);
@@ -1599,7 +1625,7 @@ public class DistributionZoneManager implements IgniteComponent {
                     return completedFuture(null);
                 }
 
-                Map<String, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
+                Map<Node, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
 
                 long scaleUpTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleUpChangeTriggerKey(zoneId)));
 
@@ -1609,9 +1635,9 @@ public class DistributionZoneManager implements IgniteComponent {
                     return completedFuture(null);
                 }
 
-                List<String> deltaToRemove = zoneState.nodesToBeRemovedFromDataNodes(scaleDownTriggerRevision, revision);
+                List<Node> deltaToRemove = zoneState.nodesToBeRemovedFromDataNodes(scaleDownTriggerRevision, revision);
 
-                Map<String, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
+                Map<Node, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
 
                 deltaToRemove.forEach(n -> newDataNodes.merge(n, -1, Integer::sum));
 
@@ -1630,6 +1656,8 @@ public class DistributionZoneManager implements IgniteComponent {
                         .thenApply(StatementResult::getAsBoolean)
                         .thenCompose(invokeResult -> inBusyLock(busyLock, () -> {
                             if (invokeResult) {
+                                // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
+                                // Currently we call clean up only on a node that successfully writes data nodes.
                                 zoneState.cleanUp(Math.min(scaleUpTriggerRevision, revision));
                             } else {
                                 LOG.debug("Updating data nodes for a zone has not succeeded [zoneId = {}]", zoneId);
@@ -1884,7 +1912,7 @@ public class DistributionZoneManager implements IgniteComponent {
          *                 Nodes that were associated with this event will be included.
          * @return List of nodes that should be added to zone's data nodes.
          */
-        List<String> nodesToBeAddedToDataNodes(long scaleUpRevision, long revision) {
+        List<Node> nodesToBeAddedToDataNodes(long scaleUpRevision, long revision) {
             return accumulateNodes(scaleUpRevision, revision, true);
         }
 
@@ -1897,7 +1925,7 @@ public class DistributionZoneManager implements IgniteComponent {
          *                 Nodes that were associated with this event will be included.
          * @return List of nodes that should be removed from zone's data nodes.
          */
-        List<String> nodesToBeRemovedFromDataNodes(long scaleDownRevision, long revision) {
+        List<Node> nodesToBeRemovedFromDataNodes(long scaleDownRevision, long revision) {
             return accumulateNodes(scaleDownRevision, revision, false);
         }
 
@@ -1907,7 +1935,7 @@ public class DistributionZoneManager implements IgniteComponent {
          * @param nodes Nodes to add to zone's data nodes.
          * @param revision Revision of the event that triggered this addition.
          */
-        void nodesToAddToDataNodes(Set<String> nodes, long revision) {
+        void nodesToAddToDataNodes(Set<Node> nodes, long revision) {
             topologyAugmentationMap.put(revision, new Augmentation(nodes, true));
         }
 
@@ -1917,7 +1945,7 @@ public class DistributionZoneManager implements IgniteComponent {
          * @param nodes Nodes to remove from zone's data nodes.
          * @param revision Revision of the event that triggered this addition.
          */
-        void nodesToRemoveFromDataNodes(Set<String> nodes, long revision) {
+        void nodesToRemoveFromDataNodes(Set<Node> nodes, long revision) {
             topologyAugmentationMap.put(revision, new Augmentation(nodes, false));
         }
 
@@ -1931,11 +1959,11 @@ public class DistributionZoneManager implements IgniteComponent {
          * @param addition Indicates whether we should accumulate nodes that should be added to data nodes, or removed.
          * @return Accumulated nodes.
          */
-        private List<String> accumulateNodes(long fromKey, long toKey, boolean addition) {
+        private List<Node> accumulateNodes(long fromKey, long toKey, boolean addition) {
             return topologyAugmentationMap.subMap(fromKey, false, toKey, true).values()
                     .stream()
                     .filter(a -> a.addition == addition)
-                    .flatMap(a -> a.nodeNames.stream())
+                    .flatMap(a -> a.nodes.stream())
                     .collect(toList());
         }
 
@@ -2018,29 +2046,29 @@ public class DistributionZoneManager implements IgniteComponent {
      */
     private static class Augmentation {
         /** Names of the node. */
-        Set<String> nodeNames;
+        Set<Node> nodes;
 
         /** Flag that indicates whether {@code nodeNames} should be added or removed. */
         boolean addition;
 
-        Augmentation(Set<String> nodeNames, boolean addition) {
-            this.nodeNames = nodeNames;
+        Augmentation(Set<Node> nodes, boolean addition) {
+            this.nodes = nodes;
             this.addition = addition;
         }
     }
 
     /**
-     * Returns a data nodes set for the specified zone.
+     * Returns local mapping of {@code nodeId} -> node's attributes, where {@code nodeId} is a node id, that changes between restarts.
+     * This map is updated every time we receive a topology event in a {@code topologyWatchListener}.
      *
-     * @param zoneId Zone id.
-     * @return Data nodes set.
+     * @return Mapping {@code nodeId} -> node's attributes.
      */
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 Proper causality token based implementation is expected.
-    public Set<String> getDataNodesByZoneId(int zoneId) {
-        return inBusyLock(busyLock, () -> {
-            ZoneState zoneState = zonesState.get(zoneId);
+    public Map<String, Map<String, String>> nodesAttributes() {
+        return nodesAttributes;
+    }
 
-            return zoneState.nodes;
-        });
+    @TestOnly
+    Map<Integer, ZoneState> zonesTimers() {
+        return zonesState;
     }
 }
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 371b6c337a..0a4a662922 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.distributionzones;
 
 import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_FILTER;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -38,7 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
@@ -307,10 +309,14 @@ public class DistributionZonesUtil {
      * @param topologyVersion Logical topology version.
      * @return Update command for the meta storage.
      */
-    static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology, long topologyVersion) {
+    public static Update updateLogicalTopologyAndVersion(Set<LogicalNode> logicalTopology, long topologyVersion) {
+        Set<NodeWithAttributes> topologyFromCmg = logicalTopology.stream()
+                .map(n -> new NodeWithAttributes(n.name(), n.id(), n.nodeAttributes()))
+                .collect(toSet());
+
         return ops(
                 put(zonesLogicalTopologyVersionKey(), ByteUtils.longToBytes(topologyVersion)),
-                put(zonesLogicalTopologyKey(), ByteUtils.toBytes(logicalTopology))
+                put(zonesLogicalTopologyKey(), ByteUtils.toBytes(topologyFromCmg))
         ).yield(true);
     }
 
@@ -322,20 +328,20 @@ public class DistributionZonesUtil {
      *                     Joining increases the counter, leaving decreases.
      * @return Returns a set of data nodes retrieved from data nodes map, which value is more than 0.
      */
-    public static Set<String> dataNodes(Map<String, Integer> dataNodesMap) {
-        return dataNodesMap.entrySet().stream().filter(e -> e.getValue() > 0).map(Map.Entry::getKey).collect(Collectors.toSet());
+    public static Set<Node> dataNodes(Map<Node, Integer> dataNodesMap) {
+        return dataNodesMap.entrySet().stream().filter(e -> e.getValue() > 0).map(Map.Entry::getKey).collect(toSet());
     }
 
     /**
-     * Returns a map from a set of data nodes. This map has the following structure: node name is mapped to integer,
+     * Returns a map from a set of data nodes. This map has the following structure: node is mapped to integer,
      * integer represents how often node joined or leaved topology. In this case, set of nodes is interpreted as nodes
      * that joined topology, so all mappings will be node -> 1.
      *
      * @param dataNodes Set of data nodes.
      * @return Returns a map from a set of data nodes.
      */
-    public static Map<String, Integer> toDataNodesMap(Set<String> dataNodes) {
-        Map<String, Integer> dataNodesMap = new HashMap<>();
+    public static Map<Node, Integer> toDataNodesMap(Set<Node> dataNodes) {
+        Map<Node, Integer> dataNodesMap = new HashMap<>();
 
         dataNodes.forEach(n -> dataNodesMap.merge(n, 1, Integer::sum));
 
@@ -348,7 +354,7 @@ public class DistributionZonesUtil {
      * @param dataNodesEntry Meta storage entry with data nodes.
      * @return Data nodes.
      */
-    static Map<String, Integer> extractDataNodes(Entry dataNodesEntry) {
+    static Map<Node, Integer> extractDataNodes(Entry dataNodesEntry) {
         if (!dataNodesEntry.empty()) {
             return fromBytes(dataNodesEntry.value());
         } else {
@@ -453,6 +459,9 @@ public class DistributionZonesUtil {
      * @return True if {@code nodeAttributes} satisfy {@code filter}, false otherwise. Returns true if {@code nodeAttributes} is empty.
      */
     public static boolean filter(Map<String, String> nodeAttributes, String filter) {
+        if (filter.equals(DEFAULT_FILTER)) {
+            return true;
+        }
         // We need to convert numbers to Long objects, so they could be parsed to numbers in JSON.
         // nodeAttributes has String values of numbers because nodeAttributes come from configuration,
         // but configuration does not support Object as a configuration value.
@@ -476,4 +485,24 @@ public class DistributionZonesUtil {
 
         return !res.isEmpty();
     }
+
+    /**
+     * Filters {@code dataNodes} according to the provided {@code filter}.
+     * Nodes' attributes are taken from {@code nodesAttributes} map.
+     *
+     * @param dataNodes Data nodes.
+     * @param filter Filter for data nodes.
+     * @param nodesAttributes Nodes' attributes which used for filtering.
+     * @return Filtered data nodes.
+     */
+    public static Set<String> filterDataNodes(
+            Set<Node> dataNodes,
+            String filter,
+            Map<String, Map<String, String>> nodesAttributes
+    ) {
+        return dataNodes.stream()
+                .filter(n -> filter(nodesAttributes.get(n.nodeId()), filter))
+                .map(Node::nodeName)
+                .collect(toSet());
+    }
 }
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
new file mode 100644
index 0000000000..0927c2c4e3
--- /dev/null
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.Serializable;
+
+/**
+ * Node representation that we store in data nodes.
+ * Includes {@code nodeName} that is unique identifier of the node, that is not changing after a restart, and
+ * {@code nodeId} that is unique identifier of a node, that changes after a restart.
+ * {@code nodeId} is needed to get node's attributes from the local state of the distribution zone manager.
+ */
+public class Node implements Serializable {
+    private static final long serialVersionUID = 875461392587175703L;
+
+    private final String nodeName;
+
+    private final String nodeId;
+
+    public Node(String nodeName, String nodeId) {
+        this.nodeName = nodeName;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public int hashCode() {
+        return nodeName.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        Node that = (Node) obj;
+
+        return nodeName().equals(that.nodeName());
+    }
+
+    public String nodeName() {
+        return nodeName;
+    }
+
+    public String nodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        return nodeName;
+    }
+}
\ No newline at end of file
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
new file mode 100644
index 0000000000..eb11971d4b
--- /dev/null
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+
+/**
+ * Structure that represents node with the attributes and which we store in Meta Storage when we store logical topology.
+ * Light-weighted version of the {@link LogicalNode}.
+ */
+public class NodeWithAttributes implements Serializable {
+    private static final long serialVersionUID = -7778967985161743937L;
+
+    private final Node node;
+
+    private final Map<String, String> nodeAttributes;
+
+    public NodeWithAttributes(String nodeName, String nodeId, Map<String, String> nodeAttributes) {
+        this.node = new Node(nodeName, nodeId);
+        this.nodeAttributes = nodeAttributes;
+    }
+
+    @Override
+    public int hashCode() {
+        return node.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+
+        NodeWithAttributes that = (NodeWithAttributes) obj;
+
+        return node.equals(that.node);
+    }
+
+    public String nodeName() {
+        return node.nodeName();
+    }
+
+    public String nodeId() {
+        return node.nodeId();
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    public Map<String, String> nodeAttributes() {
+        return nodeAttributes;
+    }
+
+    @Override
+    public String toString() {
+        return node.toString();
+    }
+}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index 601ffc0944..e7c116e951 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.updatePendingAssignmentsKeys;
@@ -38,6 +39,7 @@ import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.Node;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
@@ -163,7 +165,7 @@ public class DistributionZoneRebalanceEngine {
 
                     int zoneId = extractZoneId(evt.entryEvent().newEntry().key());
 
-                    Set<String> dataNodes = dataNodes(ByteUtils.fromBytes(dataNodesBytes));
+                    Set<Node> dataNodes = dataNodes(ByteUtils.fromBytes(dataNodesBytes));
 
                     for (int i = 0; i < tables.value().size(); i++) {
                         TableView tableView = tables.value().get(i);
@@ -173,6 +175,12 @@ public class DistributionZoneRebalanceEngine {
                         DistributionZoneConfiguration distributionZoneConfiguration =
                                 getZoneById(zonesConfiguration, tableZoneId);
 
+                        Set<String> filteredDataNodes = filterDataNodes(
+                                dataNodes,
+                                distributionZoneConfiguration.filter().value(),
+                                distributionZoneManager.nodesAttributes()
+                        );
+
                         if (zoneId == tableZoneId) {
                             TableConfiguration tableCfg = tables.get(tableView.name());
 
@@ -194,7 +202,7 @@ public class DistributionZoneRebalanceEngine {
                                 updatePendingAssignmentsKeys(
                                         tableView.name(),
                                         replicaGrpId,
-                                        dataNodes,
+                                        filteredDataNodes,
                                         replicas,
                                         evt.entryEvent().newEntry().revision(),
                                         metaStorageManager,
@@ -271,7 +279,7 @@ public class DistributionZoneRebalanceEngine {
                         futs[furCur++] = updatePendingAssignmentsKeys(
                                 tblCfg.name().value(),
                                 replicaGrpId,
-                                distributionZoneManager.getDataNodesByZoneId(zoneCfg.zoneId()),
+                                distributionZoneManager.dataNodes(zoneCfg.zoneId()),
                                 newReplicas,
                                 replicasCtx.storageRevision(),
                                 metaStorageManager,
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index b2d536e116..2fbe8a4044 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.distributionzones;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -139,4 +140,10 @@ public class BaseDistributionZoneManagerTest extends BaseIgniteAbstractTest {
         IgniteUtils.closeAll(components.stream().map(c -> c::beforeNodeStop));
         IgniteUtils.closeAll(components.stream().map(c -> c::stop));
     }
+
+    void startDistributionZoneManager() throws Exception {
+        deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
+
+        distributionZoneManager.start();
+    }
 }
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
index 925e77ce79..cb4ef2acb4 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
@@ -17,19 +17,20 @@
 
 package org.apache.ignite.internal.distributionzones;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.setLogicalTopologyInMetaStorage;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
@@ -42,12 +43,12 @@ import static org.mockito.Mockito.doAnswer;
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
@@ -56,8 +57,6 @@ import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNo
 import org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.server.If;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.ClusterNode;
@@ -81,6 +80,12 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
     private static final String ZONE_NAME_2 = "zone2";
 
+    private static final LogicalNode NODE_0 = new LogicalNode("node0", "node0", new NetworkAddress("localhost", 123));
+
+    private static final LogicalNode NODE_1 = new LogicalNode("node1", "node1", new NetworkAddress("localhost", 123));
+
+    private static final LogicalNode NODE_2 = new LogicalNode("node2", "node2", new NetworkAddress("localhost", 123));
+
     /**
      * This test invokes {@link DistributionZoneManager#topologyVersionedDataNodes(int, long)} with default and non-default zone id and
      * different logical topology versions. Simulates new logical topology with new nodes and with removed nodes. Check that data nodes
@@ -119,18 +124,19 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         int topVer0 = 2;
 
-        Set<String> threeNodes = Set.of("node0", "node1", "node2");
+        Set<LogicalNode> threeNodes = Set.of(NODE_0, NODE_1, NODE_2);
+        Set<String> threeNodesNames = Set.of(NODE_0.name(), NODE_1.name(), NODE_2.name());
 
-        setLogicalTopologyInMetaStorage(threeNodes, topVer0);
+        setLogicalTopologyInMetaStorage(threeNodes, topVer0, metaStorageManager);
 
-        assertEquals(threeNodes, dataNodesUpFut0.get(5, SECONDS));
-        assertEquals(threeNodes, dataNodesUpFut1.get(3, SECONDS));
-        assertEquals(threeNodes, dataNodesUpFut2.get(3, SECONDS));
+        assertEquals(threeNodesNames, dataNodesUpFut0.get(5, SECONDS));
+        assertEquals(threeNodesNames, dataNodesUpFut1.get(3, SECONDS));
+        assertEquals(threeNodesNames, dataNodesUpFut2.get(3, SECONDS));
 
-        assertEquals(threeNodes, dataNodesUpFut4.get(3, SECONDS));
-        assertEquals(threeNodes, dataNodesUpFut5.get(3, SECONDS));
-        assertEquals(threeNodes, dataNodesUpFut6.get(3, SECONDS));
-        assertEquals(threeNodes, dataNodesUpFut7.get(3, SECONDS));
+        assertEquals(threeNodesNames, dataNodesUpFut4.get(3, SECONDS));
+        assertEquals(threeNodesNames, dataNodesUpFut5.get(3, SECONDS));
+        assertEquals(threeNodesNames, dataNodesUpFut6.get(3, SECONDS));
+        assertEquals(threeNodesNames, dataNodesUpFut7.get(3, SECONDS));
         assertFalse(dataNodesUpFut3.isDone());
 
         LOG.info("Topology with removed nodes.");
@@ -146,29 +152,31 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         int topVer1 = 5;
 
-        Set<String> twoNodes = Set.of("node0", "node1");
+        Set<LogicalNode> twoNodes = Set.of(NODE_0, NODE_1);
+        Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
 
-        setLogicalTopologyInMetaStorage(twoNodes, topVer1);
+        setLogicalTopologyInMetaStorage(twoNodes, topVer1, metaStorageManager);
 
-        assertEquals(twoNodes, dataNodesDownFut0.get(3, SECONDS));
-        assertEquals(twoNodes, dataNodesDownFut1.get(3, SECONDS));
-        assertEquals(twoNodes, dataNodesDownFut2.get(3, SECONDS));
-        assertEquals(twoNodes, dataNodesDownFut4.get(3, SECONDS));
-        assertEquals(twoNodes, dataNodesDownFut5.get(3, SECONDS));
-        assertEquals(twoNodes, dataNodesDownFut6.get(3, SECONDS));
-        assertEquals(twoNodes, dataNodesDownFut7.get(3, SECONDS));
+        assertEquals(twoNodesNames, dataNodesDownFut0.get(3, SECONDS));
+        assertEquals(twoNodesNames, dataNodesDownFut1.get(3, SECONDS));
+        assertEquals(twoNodesNames, dataNodesDownFut2.get(3, SECONDS));
+        assertEquals(twoNodesNames, dataNodesDownFut4.get(3, SECONDS));
+        assertEquals(twoNodesNames, dataNodesDownFut5.get(3, SECONDS));
+        assertEquals(twoNodesNames, dataNodesDownFut6.get(3, SECONDS));
+        assertEquals(twoNodesNames, dataNodesDownFut7.get(3, SECONDS));
         assertFalse(dataNodesDownFut3.isDone());
 
         int topVer2 = 20;
 
         LOG.info("Topology with added and removed nodes.");
 
-        Set<String> dataNodes = Set.of("node0", "node2");
+        Set<LogicalNode> dataNodes = Set.of(NODE_0, NODE_1);
+        Set<String> dataNodesNames = Set.of(NODE_0.name(), NODE_1.name());
 
-        setLogicalTopologyInMetaStorage(dataNodes, topVer2);
+        setLogicalTopologyInMetaStorage(dataNodes, topVer2, metaStorageManager);
 
-        assertEquals(dataNodes, dataNodesUpFut3.get(3, SECONDS));
-        assertEquals(dataNodes, dataNodesDownFut3.get(3, SECONDS));
+        assertEquals(dataNodesNames, dataNodesUpFut3.get(3, SECONDS));
+        assertEquals(dataNodesNames, dataNodesDownFut3.get(3, SECONDS));
     }
 
     /**
@@ -184,23 +192,25 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         long topVer = 100;
 
-        Set<String> dataNodes0 = Set.of("node0", "node1");
+        Set<LogicalNode> dataNodes = Set.of(NODE_0, NODE_1);
+        Set<String> dataNodesNames = Set.of(NODE_0.name(), NODE_1.name());
 
-        setLogicalTopologyInMetaStorage(dataNodes0, topVer);
+        setLogicalTopologyInMetaStorage(dataNodes, topVer, metaStorageManager);
 
         assertFalse(dataNodesFut.isDone());
 
-        assertEquals(dataNodes0, dataNodesFut.get(3, SECONDS));
+        assertEquals(dataNodesNames, dataNodesFut.get(3, SECONDS));
 
         dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 106);
 
-        Set<String> dataNodes1 = Set.of("node0");
+        Set<LogicalNode> dataNodes1 = Set.of(NODE_0);
+        Set<String> dataNodesNames1 = Set.of(NODE_0.name());
 
-        setLogicalTopologyInMetaStorage(dataNodes1, topVer + 100);
+        setLogicalTopologyInMetaStorage(dataNodes1, topVer + 100, metaStorageManager);
 
         assertFalse(dataNodesFut.isDone());
 
-        assertEquals(dataNodes1, dataNodesFut.get(3, SECONDS));
+        assertEquals(dataNodesNames1, dataNodesFut.get(3, SECONDS));
     }
 
     /**
@@ -225,19 +235,20 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         CompletableFuture<Set<String>> dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(zoneId, 1);
 
-        Set<String> nodes0 = Set.of("node0", "node1");
+        Set<LogicalNode> nodes = Set.of(NODE_0, NODE_1);
+        Set<String> nodesNames = Set.of(NODE_0.name(), NODE_1.name());
 
-        setLogicalTopologyInMetaStorage(nodes0, 1);
+        setLogicalTopologyInMetaStorage(nodes, 1, metaStorageManager);
 
-        assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+        assertEquals(nodesNames, dataNodesFut.get(3, SECONDS));
 
         dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(zoneId, 2);
 
         assertFalse(dataNodesFut.isDone());
 
-        setLogicalTopologyInMetaStorage(Set.of("node0"), 2);
+        setLogicalTopologyInMetaStorage(Set.of(NODE_0), 2, metaStorageManager);
 
-        assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+        assertEquals(nodesNames, dataNodesFut.get(3, SECONDS));
     }
 
     /**
@@ -279,9 +290,9 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         CompletableFuture<Set<String>> dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(zoneId1, 1);
 
-        Set<String> nodes0 = Set.of("node0", "node1");
+        Set<LogicalNode> nodes0 = Set.of(NODE_0, NODE_1);
 
-        setLogicalTopologyInMetaStorage(nodes0, 1);
+        setLogicalTopologyInMetaStorage(nodes0, 1, metaStorageManager);
 
         dataNodesFut.get(3, SECONDS);
 
@@ -293,25 +304,26 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
         assertFalse(dataNodesFut1.isDone());
         assertFalse(dataNodesFut1Zone2.isDone());
 
-        Set<String> nodes1 = Set.of("node0");
+        Set<LogicalNode> nodes1 = Set.of(NODE_0);
+        Set<String> nodesNames1 = Set.of(NODE_0.name());
 
         distributionZoneManager.alterZone(ZONE_NAME_1, new DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
                         .dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
                 .get(3, SECONDS);
 
-        setLogicalTopologyInMetaStorage(nodes1, 2);
+        setLogicalTopologyInMetaStorage(nodes1, 2, metaStorageManager);
 
-        assertEquals(nodes1, dataNodesFut1.get(3, SECONDS));
+        assertEquals(nodesNames1, dataNodesFut1.get(3, SECONDS));
 
         CompletableFuture<Set<String>> dataNodesFut2 = distributionZoneManager.topologyVersionedDataNodes(zoneId1, 3);
 
-        Set<String> nodes2 = Set.of("node0", "node1");
+        Set<LogicalNode> nodes2 = Set.of(NODE_0, NODE_1);
 
         assertFalse(dataNodesFut2.isDone());
 
-        setLogicalTopologyInMetaStorage(nodes2, 3);
+        setLogicalTopologyInMetaStorage(nodes2, 3, metaStorageManager);
 
-        assertEquals(nodes1, dataNodesFut2.get(3, SECONDS));
+        assertEquals(nodesNames1, dataNodesFut2.get(3, SECONDS));
     }
 
     /**
@@ -338,9 +350,9 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         assertFalse(dataNodesFut.isDone());
 
-        Set<String> nodes0 = Set.of("node0", "node1");
+        Set<LogicalNode> nodes0 = Set.of(NODE_0, NODE_1);
 
-        setLogicalTopologyInMetaStorage(nodes0, 1);
+        setLogicalTopologyInMetaStorage(nodes0, 1, metaStorageManager);
 
         assertEquals(emptySet(), dataNodesFut.get(3, SECONDS));
     }
@@ -375,7 +387,7 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
             return invocation.callRealMethod();
         }).when(keyValueStorage).invoke(any(), any());
 
-        setLogicalTopologyInMetaStorage(Set.of("node0"), 200);
+        setLogicalTopologyInMetaStorage(Set.of(NODE_0), 200, metaStorageManager);
 
         assertFalse(dataNodesFut0.isDone());
 
@@ -401,7 +413,7 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         assertThat(distributionZoneManager.dropZone(ZONE_NAME_0), willSucceedIn(3, SECONDS));
 
-        setLogicalTopologyInMetaStorage(Set.of("node0"), 200);
+        setLogicalTopologyInMetaStorage(Set.of(NODE_0), 200, metaStorageManager);
 
         assertFalse(dataNodesFut0.isDone());
 
@@ -416,15 +428,16 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
     void testScaleUpScaleDownAreChangedWhileAwaitingDataNodes() throws Exception {
         startZoneManager();
 
-        Set<String> nodes0 = Set.of("node0", "node1");
+        Set<LogicalNode> nodes0 = Set.of(NODE_0, NODE_1);
+        Set<String> nodesNames0 = Set.of(NODE_0.name(), NODE_1.name());
 
-        setLogicalTopologyInMetaStorage(nodes0, 1);
+        setLogicalTopologyInMetaStorage(nodes0, 1, metaStorageManager);
 
         CompletableFuture<Set<String>> dataNodesFut0 = distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
 
-        assertEquals(nodes0, dataNodesFut0.get(3, SECONDS));
+        assertEquals(nodesNames0, dataNodesFut0.get(3, SECONDS));
 
-        Set<String> nodes1 = Set.of("node0", "node2");
+        Set<LogicalNode> nodes1 = Set.of(NODE_0, NODE_2);
 
         CompletableFuture<Set<String>> dataNodesFut1 = distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2);
 
@@ -461,9 +474,9 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
             return invocation.callRealMethod();
         }).when(keyValueStorage).invoke(any(), any());
 
-        setLogicalTopologyInMetaStorage(nodes1, 2);
+        setLogicalTopologyInMetaStorage(nodes1, 2, metaStorageManager);
 
-        assertEquals(nodes0, dataNodesFut1.get(5, SECONDS));
+        assertEquals(nodes0.stream().map(ClusterNode::name).collect(Collectors.toSet()), dataNodesFut1.get(5, SECONDS));
 
         scaleUpLatch.countDown();
         scaleDownLatch.countDown();
@@ -474,7 +487,10 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
      */
     @Test
     void testInitializedDataNodesOnZoneManagerStart() throws Exception {
-        Set<String> dataNodes = Set.of("node0", "node1");
+        Set<String> dataNodes0 = Set.of("node0", "node1");
+
+        Set<NodeWithAttributes> dataNodes = Set.of(new NodeWithAttributes("node0", "id_node0", emptyMap()),
+                new NodeWithAttributes("node1", "id_node1", emptyMap()));
 
         Map<ByteArray, byte[]> valEntries = new HashMap<>();
 
@@ -483,12 +499,12 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
 
         assertThat(vaultMgr.putAll(valEntries), willCompleteSuccessfully());
 
-        topology.putNode(new LogicalNode(new ClusterNode("node0", "node0", new NetworkAddress("local", 1))));
-        topology.putNode(new LogicalNode(new ClusterNode("node1", "node1", new NetworkAddress("local", 1))));
+        topology.putNode(new LogicalNode(new ClusterNode("id_node0", "node0", new NetworkAddress("local", 1))));
+        topology.putNode(new LogicalNode(new ClusterNode("id_node1", "node1", new NetworkAddress("local", 1))));
 
         startZoneManager();
 
-        assertEquals(dataNodes, distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2)
+        assertEquals(dataNodes0, distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2)
                 .get(3, SECONDS));
     }
 
@@ -503,17 +519,4 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
                                 .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
                 .get(3, SECONDS);
     }
-
-    private void setLogicalTopologyInMetaStorage(Set<String> nodes, long topVer) {
-        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
-                Conditions.exists(zonesLogicalTopologyKey()),
-                List.of(
-                        Operations.put(zonesLogicalTopologyKey(), toBytes(nodes)),
-                        Operations.put(zonesLogicalTopologyVersionKey(), longToBytes(topVer))
-                ),
-                List.of(Operations.noop())
-        );
-
-        assertThat(invokeFuture, willBe(true));
-    }
 }
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java
index e3f2898342..d2eda4d0d9 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java
@@ -93,6 +93,13 @@ public class DistributionZoneFiltersTest {
 
     @Test
     void testNodeAttributesFilterScenario8() {
+        Map<String, String> newAttributesMap = Map.of();
+
+        assertTrue(filter(newAttributesMap, DEFAULT_FILTER));
+    }
+
+    @Test
+    void testNodeAttributesFilterScenario9() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$.[?(@.newValue == 100)]";
@@ -101,7 +108,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario9() {
+    void testNodeAttributesFilterScenario10() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$.[?(@.newValue != 100)]";
@@ -110,7 +117,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario10() {
+    void testNodeAttributesFilterScenario11() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$.[?(@.newValue && @.newValue != 100)]";
@@ -119,7 +126,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario11() {
+    void testNodeAttributesFilterScenario12() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$[?(@.dataRegionSize != 10)]";
@@ -128,7 +135,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario12() {
+    void testNodeAttributesFilterScenario13() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$[?(@.region != 'EU')]";
@@ -137,7 +144,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario13() {
+    void testNodeAttributesFilterScenario14() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$[?(@.region != 'EU' && @.dataRegionSize > 5)]";
@@ -146,7 +153,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario14() {
+    void testNodeAttributesFilterScenario15() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$[?(@.region != 'EU' && @.dataRegionSize < 5)]";
@@ -155,7 +162,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario15() {
+    void testNodeAttributesFilterScenario16() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$[?(@.dataRegionSize > 5 && @.dataRegionSize < 5)]";
@@ -164,7 +171,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario16() {
+    void testNodeAttributesFilterScenario17() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$[?(@.region == 'US' && @.region == 'EU')]";
@@ -173,7 +180,7 @@ public class DistributionZoneFiltersTest {
     }
 
     @Test
-    void testNodeAttributesFilterScenario17() {
+    void testNodeAttributesFilterScenario18() {
         Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
 
         String filter = "$[?(@.region == 'EU' || @.dataRegionSize > 5)]";
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 07420d4750..9f70537995 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -20,15 +20,15 @@ package org.apache.ignite.internal.distributionzones;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZone;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZoneWithAttributes;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZonesChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZonesChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -74,7 +75,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
     private static final String NEW_ZONE_NAME = "zone2";
 
-    private static final Set<String> nodes = Set.of("name1");
+    private static final Set<NodeWithAttributes> nodes = Set.of(new NodeWithAttributes("name1", "name1", Collections.emptyMap()));
 
     private DistributionZoneManager distributionZoneManager;
 
@@ -110,7 +111,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
         LogicalTopologyService logicalTopologyService = mock(LogicalTopologyService.class);
 
         Set<LogicalNode> logicalTopology = nodes.stream()
-                .map(n -> new LogicalNode(n, n, new NetworkAddress(n, 10_000)))
+                .map(n -> new LogicalNode(n.nodeName(), n.nodeName(), new NetworkAddress(n.nodeName(), 10_000)))
                 .collect(toSet());
 
         when(logicalTopologyService.logicalTopologyOnLeader())
@@ -135,7 +136,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
         clusterCfgMgr.start();
         metaStorageManager.start();
 
-        deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
+        DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
 
         distributionZoneManager.start();
 
@@ -156,7 +157,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
 
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, nodes, keyValueStorage);
+        assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
 
         assertZoneScaleUpChangeTriggerKey(1L, 1, keyValueStorage);
 
@@ -167,7 +168,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
     void testZoneDeleteRemovesMetaStorageKey() throws Exception {
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, nodes, keyValueStorage);
+        assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
 
         distributionZoneManager.dropZone(ZONE_NAME).get();
 
@@ -204,12 +205,12 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
     void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
         distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
 
-        assertDataNodesForZone(1, nodes, keyValueStorage);
+        assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
 
         keyValueStorage.put(zonesChangeTriggerKey(1).bytes(), longToBytes(100), HybridTimestamp.MIN_VALUE);
 
         distributionZoneManager.dropZone(ZONE_NAME).get();
 
-        assertDataNodesForZone(1, nodes, keyValueStorage);
+        assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
     }
 }
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
new file mode 100644
index 0000000000..9dc2523358
--- /dev/null
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests distribution zone manager interactions with data nodes filtering.
+ */
+public class DistributionZoneManagerFilterTest extends BaseDistributionZoneManagerTest {
+    private static final String ZONE_NAME = "zone1";
+
+    private static final LogicalNode A = new LogicalNode(
+            new ClusterNode("1", "A", new NetworkAddress("localhost", 123)),
+            Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10")
+    );
+
+    private static final LogicalNode B = new LogicalNode(
+            new ClusterNode("2", "B", new NetworkAddress("localhost", 123)),
+            Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30")
+    );
+
+    private static final LogicalNode C = new LogicalNode(
+            new ClusterNode("3", "C", new NetworkAddress("localhost", 123)),
+            Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+    );
+
+    private static final LogicalNode D = new LogicalNode(
+            new ClusterNode("4", "D", new NetworkAddress("localhost", 123)),
+            Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+    );
+
+    @Test
+    void testFilterOnScaleUp() throws Exception {
+        preparePrerequisites();
+
+        Set<String> nodes;
+
+        topology.putNode(D);
+
+        nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                topology.getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A, C, D).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+    }
+
+    @Test
+    void testFilterOnScaleDown() throws Exception {
+        preparePrerequisites();
+
+        Set<String> nodes;
+
+        topology.removeNodes(Set.of(C));
+
+        nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                topology.getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+    }
+
+    @Test
+    void testFilterOnScaleUpWithNewAttributesAfterRestart() throws Exception {
+        preparePrerequisites();
+
+        Set<String> nodes;
+
+        topology.removeNodes(Set.of(B));
+
+        LogicalNode newB = new LogicalNode(
+                new ClusterNode("2", "newB", new NetworkAddress("localhost", 123)),
+                Map.of("region", "US", "storage", "HHD", "dataRegionSize", "30")
+        );
+
+        topology.putNode(newB);
+
+        nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                topology.getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A, newB, C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+    }
+
+    /**
+     * Starts distribution zone manager with a zone and checks that two out of three nodes, which match filter,
+     * are presented in the zones data nodes.
+     *
+     * @throws Exception If failed
+     */
+    private void preparePrerequisites() throws Exception {
+        String filter = "$[?(@.storage == 'SSD' || @.region == 'US')]";
+
+        topology.putNode(A);
+        topology.putNode(B);
+        topology.putNode(C);
+
+        startDistributionZoneManager();
+
+        distributionZoneManager.createZone(
+                new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+                        .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                        .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                        .filter(filter)
+                        .build()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        Set<String> nodes = distributionZoneManager.topologyVersionedDataNodes(
+                1,
+                topology.getLogicalTopology().version()
+        ).get(10_000, TimeUnit.MILLISECONDS);
+
+        assertEquals(Set.of(A, C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+    }
+}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index afaf723452..21e14195cb 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.distributionzones;
 
 import static org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopology;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopologyVersion;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopologyVersion;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.Set;
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 80faf12dd2..25bcfe8920 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -21,23 +21,16 @@ import static org.apache.ignite.internal.distributionzones.DistributionZoneManag
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZone;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZoneWithAttributes;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopology;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZoneScaleDownChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleDownChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -48,13 +41,12 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.stream.Collectors;
 import org.apache.ignite.configuration.NamedConfigurationTree;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
@@ -63,10 +55,7 @@ import org.apache.ignite.internal.distributionzones.configuration.DistributionZo
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.server.If;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
@@ -79,11 +68,21 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
     private static final int ZONE_1_ID = 1;
 
-    private static final LogicalNode NODE_1 = new LogicalNode("1", "A", new NetworkAddress("localhost", 123));
+    private static final LogicalNode NODE_1 = new LogicalNode("1", "node1", new NetworkAddress("localhost", 123));
 
-    private static final LogicalNode NODE_2 = new LogicalNode("2", "B", new NetworkAddress("localhost", 123));
+    private static final LogicalNode NODE_2 = new LogicalNode("2", "node2", new NetworkAddress("localhost", 123));
 
-    private static final LogicalNode NODE_3 = new LogicalNode("3", "C", new NetworkAddress("localhost", 123));
+    private static final LogicalNode NODE_3 = new LogicalNode("3", "node3", new NetworkAddress("localhost", 123));
+
+    private static final Node A  = new Node("A", "id_A");
+
+    private static final Node B  = new Node("B", "id_B");
+
+    private static final Node C  = new Node("C", "id_C");
+
+    private static final Node D  = new Node("D", "id_D");
+
+    private static final Node E  = new Node("E", "id_E");
 
     private long prerequisiteRevision;
 
@@ -95,7 +94,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         startDistributionZoneManager();
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
 
         topology.putNode(NODE_2);
 
@@ -110,7 +109,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                         .build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
 
         topology.putNode(NODE_3);
 
@@ -118,8 +117,8 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertLogicalTopology(clusterNodes3, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, clusterNodes3.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
-        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes3.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, clusterNodes3, keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes3, keyValueStorage);
     }
 
     @Test
@@ -139,8 +138,8 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         topology.putNode(NODE_1);
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1.name()), keyValueStorage);
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
     }
 
     @Test
@@ -149,18 +148,17 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         topology.putNode(NODE_2);
 
-        Set<String> clusterNodesNames = Set.of(NODE_1.name(), NODE_2.name());
         Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
 
         startDistributionZoneManager();
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames, keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
 
         distributionZoneManager.createZone(
                 new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, clusterNodes, keyValueStorage);
 
         topology.removeNodes(Set.of(NODE_2));
 
@@ -168,10 +166,10 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertLogicalTopology(clusterNodes2, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
 
         // Check that default zone still has both node1 and node2 because dafault zones' scaleDown is INF.
-        assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1.name(), NODE_2.name()), keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
     }
 
     @Test
@@ -182,7 +180,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         startDistributionZoneManager();
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
 
         distributionZoneManager.alterZone(
                 DEFAULT_ZONE_NAME,
@@ -195,7 +193,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         topology.putNode(NODE_2);
 
         Set<LogicalNode> clusterNodes2 = Set.of(NODE_1, NODE_2);
-        Set<String> clusterNodesNames2 = Set.of(NODE_1.name(), NODE_2.name());
 
         assertLogicalTopology(clusterNodes2, keyValueStorage);
 
@@ -207,7 +204,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                         .build()
         ).get();
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames2, keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes2, keyValueStorage);
     }
 
     @Test
@@ -216,11 +213,11 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         topology.putNode(NODE_2);
 
-        Set<String> clusterNodesNames = Set.of(NODE_1.name(), NODE_2.name());
+        Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
 
         startDistributionZoneManager();
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames, keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
 
         distributionZoneManager.alterZone(
                 DEFAULT_ZONE_NAME,
@@ -233,7 +230,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         topology.removeNodes(Set.of(NODE_2));
 
         Set<LogicalNode> clusterNodes2 = Set.of(NODE_1);
-        Set<String> clusterNodesNames2 = Set.of(NODE_1.name());
 
         assertLogicalTopology(clusterNodes2, keyValueStorage);
 
@@ -245,7 +241,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                         .build()
         ).get();
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames2, keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes2, keyValueStorage);
     }
 
     @Test
@@ -257,7 +253,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         topology.putNode(NODE_2);
 
         Set<LogicalNode> clusterNodes2 = Set.of(NODE_1, NODE_2);
-        Set<String> clusterNodesNames2 = Set.of(NODE_1.name(), NODE_2.name());
 
         assertLogicalTopology(clusterNodes2, keyValueStorage);
 
@@ -265,7 +260,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                 new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE).build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, clusterNodesNames2, keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
 
         assertNotNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(ZONE_1_ID).bytes()).value(),
                 "zoneScaleUpChangeTriggerKey must be not null.");
@@ -290,7 +285,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         topology.removeNodes(Set.of(NODE_2));
 
         Set<LogicalNode> clusterNodes2 = Set.of(NODE_1);
-        Set<String> clusterNodesNames2 = Set.of(NODE_1.name());
 
         assertLogicalTopology(clusterNodes2, keyValueStorage);
 
@@ -298,7 +292,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                 new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, clusterNodesNames2, keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
 
         assertNotNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(ZONE_1_ID).bytes()).value(),
                 "zoneScaleUpChangeTriggerKey must be not null.");
@@ -328,7 +322,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         distributionZoneManager.scheduleTimers(
                 zoneView,
-                Set.of("D"),
+                Set.of(D),
                 Set.of(),
                 prerequisiteRevision + 1,
                 (zoneId, revision) -> {
@@ -349,7 +343,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         distributionZoneManager.scheduleTimers(
                 zoneView,
-                Set.of("E"),
+                Set.of(E),
                 Set.of(),
                 prerequisiteRevision + 2,
                 (zoneId, revision) -> {
@@ -379,7 +373,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D", "E"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage);
 
         out2.countDown();
 
@@ -391,7 +385,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         // Assert that nothing has been changed.
         assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D", "E"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage);
     }
 
     @Test
@@ -411,7 +405,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         distributionZoneManager.scheduleTimers(
                 zoneView,
                 Set.of(),
-                Set.of("B"),
+                Set.of(B),
                 prerequisiteRevision + 1,
                 (t1, t2) -> null,
                 (zoneId, revision) -> {
@@ -432,7 +426,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         distributionZoneManager.scheduleTimers(
                 zoneView,
                 Set.of(),
-                Set.of("C"),
+                Set.of(C),
                 prerequisiteRevision + 2,
                 (t1, t2) -> null,
                 (zoneId, revision) -> {
@@ -461,7 +455,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage);
 
         out2.countDown();
 
@@ -473,7 +467,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         // Assert that nothing has been changed.
         assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage);
     }
 
     @Test
@@ -491,7 +485,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         distributionZoneManager.scheduleTimers(
                 zoneView,
-                Set.of("D"),
+                Set.of(D),
                 Set.of(),
                 prerequisiteRevision + 1,
                 (zoneId, revision) -> {
@@ -514,7 +508,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         distributionZoneManager.scheduleTimers(
                 zoneView,
-                Set.of("E"),
+                Set.of(E),
                 Set.of(),
                 prerequisiteRevision + 2,
                 (zoneId, revision) -> {
@@ -535,14 +529,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
 
         // Second task is run and we await that data nodes will be changed from ["A", "B", "C", "D"] to ["A", "B", "C", "D", "E"]
         in2.countDown();
 
         assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D", "E"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage);
 
         out1.countDown();
     }
@@ -563,7 +557,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         distributionZoneManager.scheduleTimers(
                 zoneView,
                 Set.of(),
-                Set.of("B"),
+                Set.of(B),
                 prerequisiteRevision + 1,
                 (t1, t2) -> null,
                 (zoneId, revision) -> {
@@ -586,7 +580,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         distributionZoneManager.scheduleTimers(
                 zoneView,
                 Set.of(),
-                Set.of("C"),
+                Set.of(C),
                 prerequisiteRevision + 2,
                 (t1, t2) -> null,
                 (zoneId, revision) -> {
@@ -606,14 +600,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
 
         // Second task is run and we await that data nodes will be changed from ["A", "C"] to ["A"]
         in2.countDown();
 
         assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage);
 
         out1.countDown();
     }
@@ -634,7 +628,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
     }
 
     @Test
@@ -660,7 +654,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                 new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(0).build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
     }
 
     @Test
@@ -675,11 +669,11 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                 new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(100).build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
 
         topology.removeNodes(Set.of(NODE_1));
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
 
         distributionZoneManager.alterZone(
                 ZONE_1_NAME,
@@ -695,9 +689,9 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("E"), 1004);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(E), 1004);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
 
@@ -720,7 +714,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1015);
 
         assertTrue(zoneState.topologyAugmentationMap().isEmpty());
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D", "E"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D, E), keyValueStorage);
     }
 
     @Test
@@ -765,7 +759,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
                 new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(100).build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
@@ -828,13 +822,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
 
-        assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1), keyValueStorage);
 
         distributionZoneManager.createZone(
                 new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build()
         ).get();
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
 
         assertZoneScaleDownChangeTriggerKey(4L, ZONE_1_ID, keyValueStorage);
 
@@ -854,7 +848,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         topology.removeNodes(Set.of(NODE_1));
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+        assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
 
         assertZoneScaleDownChangeTriggerKey(100L, ZONE_1_ID, keyValueStorage);
     }
@@ -865,13 +859,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
     }
 
     @Test
@@ -880,13 +874,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
     }
 
     @Test
@@ -895,13 +889,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
     }
 
     @Test
@@ -910,13 +904,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
     }
 
     @Test
@@ -925,14 +919,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
     }
 
     @Test
@@ -941,13 +935,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
     }
 
     @Test
@@ -956,13 +950,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
     }
 
     @Test
@@ -971,13 +965,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
     }
 
     @Test
@@ -986,14 +980,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
     }
 
     @Test
@@ -1002,14 +996,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
     }
 
     @Test
@@ -1018,16 +1012,16 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
     }
 
     @Test
@@ -1036,14 +1030,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
     }
 
     @Test
@@ -1052,14 +1046,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
     }
 
     @Test
@@ -1068,14 +1062,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
     }
 
     @Test
@@ -1084,15 +1078,15 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
-        zoneState.nodesToAddToDataNodes(Set.of("E"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(E), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "E"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, E), keyValueStorage);
     }
 
     @Test
@@ -1101,14 +1095,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
-        zoneState.nodesToAddToDataNodes(Set.of("E"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(E), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "E"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, E), keyValueStorage);
     }
 
     @Test
@@ -1117,14 +1111,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        zoneState.nodesToAddToDataNodes(Set.of("E"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(E), 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "E"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, E), keyValueStorage);
     }
 
     @Test
@@ -1133,14 +1127,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("B"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(B), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
     }
 
     @Test
@@ -1149,14 +1143,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("B"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(B), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
     }
 
     @Test
@@ -1165,14 +1159,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("B"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(B), 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
     }
 
     @Test
@@ -1181,14 +1175,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
     }
 
     @Test
@@ -1197,14 +1191,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
     }
 
     @Test
@@ -1213,14 +1207,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
 
-        zoneState.nodesToAddToDataNodes(Set.of("C"), 1009);
+        zoneState.nodesToAddToDataNodes(Set.of(C), 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
     }
 
     @Test
@@ -1229,14 +1223,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
     }
 
     @Test
@@ -1245,14 +1239,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1009);
 
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
     }
 
     @Test
@@ -1261,60 +1255,79 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
 
         ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
-        zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+        zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1009);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1009);
         distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
 
-        assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+        assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
     }
 
     @Test
     void testZoneStateAddRemoveNodesPreservesDuplicationsOfNodes() {
         ZoneState zoneState = new ZoneState(new ScheduledThreadPoolExecutor(1));
 
-        zoneState.nodesToAddToDataNodes(Set.of("A", "B"), 1);
-        zoneState.nodesToAddToDataNodes(Set.of("A", "B"), 2);
+        zoneState.nodesToAddToDataNodes(Set.of(A, B), 1);
+        zoneState.nodesToAddToDataNodes(Set.of(A, B), 2);
+
+        List<Node> nodes = zoneState.nodesToBeAddedToDataNodes(0, 2);
 
-        List<String> nodes = zoneState.nodesToBeAddedToDataNodes(0, 2);
+        nodes.sort(Comparator.comparing(Node::nodeName));
 
-        Collections.sort(nodes);
-        assertEquals(List.of("A", "A", "B", "B"), nodes);
+        assertEquals(List.of(A, A, B, B), nodes);
 
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C", "D"), 3);
-        zoneState.nodesToRemoveFromDataNodes(Set.of("C", "D"), 4);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C, D), 3);
+        zoneState.nodesToRemoveFromDataNodes(Set.of(C, D), 4);
 
         nodes = zoneState.nodesToBeRemovedFromDataNodes(2, 4);
 
-        Collections.sort(nodes);
+        nodes.sort(Comparator.comparing(Node::nodeName));
 
-        assertEquals(List.of("C", "C", "D", "D"), nodes);
+        assertEquals(List.of(C, C, D, D), nodes);
     }
 
     /**
-     * Creates a zone with the auto adjust scale up scale down trigger equals to 0 and the data nodes equals ["A", "B", "C"].
+     * Creates a zone with the auto adjust scale up scale down trigger equals to 0 and the data nodes equals ["A", B, "C"].
      *
      * @throws Exception when something goes wrong.
      */
     private void preparePrerequisites() throws Exception {
-        topology.putNode(NODE_1);
-        topology.putNode(NODE_2);
-        topology.putNode(NODE_3);
+        preparePrerequisites(null);
+    }
 
-        Set<String> clusterNodesNames = Set.of(NODE_1.name(), NODE_2.name(), NODE_3.name());
+    private void preparePrerequisites(@Nullable String filter) throws Exception {
+        LogicalNode a = new LogicalNode("1", "A", new NetworkAddress("localhost", 123));
 
-        startDistributionZoneManager();
+        LogicalNode b = new LogicalNode("2", "B", new NetworkAddress("localhost", 123));
 
-        distributionZoneManager.createZone(
-                new Builder(ZONE_1_NAME)
-                        .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
-                        .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
-                        .build()
-        ).get();
+        LogicalNode c = new LogicalNode("3", "C", new NetworkAddress("localhost", 123));
+
+        topology.putNode(a);
+        topology.putNode(b);
+        topology.putNode(c);
 
-        assertDataNodesForZone(ZONE_1_ID, clusterNodesNames, keyValueStorage);
+        Set<LogicalNode> clusterNodes = Set.of(a, b, c);
+
+        startDistributionZoneManager();
+
+        if (filter == null) {
+            distributionZoneManager.createZone(
+                    new Builder(ZONE_1_NAME)
+                            .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                            .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                            .build()).get();
+        } else {
+            distributionZoneManager.createZone(
+                    new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME)
+                            .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+                            .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+                            .filter(filter)
+                            .build()).get();
+        }
+
+        assertDataNodesForZone(ZONE_1_ID, clusterNodes, keyValueStorage);
 
         long scaleUpChangeTriggerKey = bytesToLong(
                 keyValueStorage.get(zoneScaleUpChangeTriggerKey(ZONE_1_ID).bytes()).value()
@@ -1338,39 +1351,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
         return distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(zoneId, revision);
     }
 
-    private void assertNotEqualsDataNodesForZone(int zoneId, @Nullable Set<String> clusterNodes) throws InterruptedException {
-        assertFalse(waitForCondition(
-                () -> {
-                    byte[] dataNodes = keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value();
-
-                    if (dataNodes == null) {
-                        return clusterNodes == null;
-                    }
-
-                    Set<String> res = DistributionZonesUtil.dataNodes(ByteUtils.fromBytes(dataNodes));
-
-                    return res.equals(clusterNodes);
-                },
-                1000
-        ));
-    }
-
-    private void startDistributionZoneManager() throws Exception {
-        deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
-
-        distributionZoneManager.start();
-    }
-
-    private void setLogicalTopologyInMetaStorage(Set<String> nodes) {
-        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
-                Conditions.exists(zonesLogicalTopologyKey()),
-                put(zonesLogicalTopologyKey(), toBytes(nodes)),
-                noop()
-        );
-
-        assertThat(invokeFuture, willBe(true));
-    }
-
     private void assertThatZonesAugmentationMapContainsRevision(int zoneId, long revisionToAssert) throws InterruptedException {
         assertTrue(
                 waitForCondition(
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index fc3e304313..03d9638aed 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -20,27 +20,24 @@ package org.apache.ignite.internal.distributionzones;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZone;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.mockVaultZonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.setLogicalTopologyInMetaStorage;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
 import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 
+import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -48,9 +45,13 @@ import org.junit.jupiter.api.Test;
  */
 //TODO: IGNITE-18564 Add tests with not default distribution zones, when distributionZones.change.trigger per zone will be created.
 public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZoneManagerTest {
+    private static final LogicalNode NODE_1 = new LogicalNode("node1", "node1", new NetworkAddress("localhost", 123));
+    private static final LogicalNode NODE_2 = new LogicalNode("node2", "node2", new NetworkAddress("localhost", 123));
+    private static final LogicalNode NODE_3 = new LogicalNode("node3", "node3", new NetworkAddress("localhost", 123));
+
     @Test
     void testStaleWatchEvent() throws Exception {
-        mockVaultZonesLogicalTopologyKey(Set.of());
+        mockVaultZonesLogicalTopologyKey(Set.of(), vaultMgr);
 
         startDistributionZoneManager();
 
@@ -66,12 +67,9 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
 
         keyValueStorage.put(zoneScaleUpChangeTriggerKey(DEFAULT_ZONE_ID).bytes(), longToBytes(revision), HybridTimestamp.MIN_VALUE);
 
-        Set<String> nodes = Set.of("node1", "node2");
-
-        setLogicalTopologyInMetaStorage(nodes);
+        Set<LogicalNode> nodes = Set.of(NODE_1, NODE_2);
 
-        // two invokes on start, and invoke for update scale up won't be triggered, because revision == zoneScaleUpChangeTriggerKey
-        verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+        setLogicalTopologyInMetaStorage(nodes, 100, metaStorageManager);
 
         assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(), keyValueStorage);
     }
@@ -82,9 +80,12 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
 
         keyValueStorage.put(zonesChangeTriggerKey(DEFAULT_ZONE_ID).bytes(), longToBytes(revision), HybridTimestamp.MIN_VALUE);
 
-        Set<String> nodes = Set.of("node1", "node2");
+        Set<LogicalNode> nodes = Set.of(
+                new LogicalNode(new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap()),
+                new LogicalNode(new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap())
+        );
 
-        mockVaultZonesLogicalTopologyKey(nodes);
+        mockVaultZonesLogicalTopologyKey(nodes, vaultMgr);
 
         startDistributionZoneManager();
 
@@ -95,14 +96,15 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
 
     @Test
     void testDataNodesUpdatedOnZoneManagerStart() throws Exception {
-        Set<String> nodes = Set.of("node1", "node2");
+        Set<LogicalNode> nodes = Set.of(
+                new LogicalNode(new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap()),
+                new LogicalNode(new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap())
+        );
 
-        mockVaultZonesLogicalTopologyKey(nodes);
+        mockVaultZonesLogicalTopologyKey(nodes, vaultMgr);
 
         startDistributionZoneManager();
 
-        verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
-
         assertDataNodesForZone(DEFAULT_ZONE_ID, nodes, keyValueStorage);
     }
 
@@ -116,26 +118,4 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
         assertNull(keyValueStorage.get(zoneDataNodesKey(DEFAULT_ZONE_ID).bytes()).value());
         assertNull(keyValueStorage.get(zoneDataNodesKey(1).bytes()).value());
     }
-
-    private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
-        byte[] newLogicalTopology = toBytes(nodes);
-
-        assertThat(vaultMgr.put(zonesLogicalTopologyKey(), newLogicalTopology), willCompleteSuccessfully());
-    }
-
-    private void setLogicalTopologyInMetaStorage(Set<String> nodes) {
-        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
-                Conditions.exists(zonesLogicalTopologyKey()),
-                Operations.put(zonesLogicalTopologyKey(), toBytes(nodes)),
-                Operations.noop()
-        );
-
-        assertThat(invokeFuture, willBe(true));
-    }
-
-    private void startDistributionZoneManager() throws Exception {
-        deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
-
-        distributionZoneManager.start();
-    }
 }
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 3df413d173..969b05edcb 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -30,7 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.Node;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
 import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -122,8 +122,6 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest {
 
     @BeforeEach
     public void setUp() {
-        when(distributionZoneManager.getDataNodesByZoneId(anyInt())).thenReturn(Set.of("node0"));
-
         doAnswer(invocation -> {
             ByteArray key = invocation.getArgument(0);
 
@@ -417,7 +415,9 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest {
         byte[] newLogicalTopology;
 
         if (nodes != null) {
-            newLogicalTopology = toBytes(toDataNodesMap(nodes));
+            newLogicalTopology = toBytes(toDataNodesMap(nodes.stream()
+                    .map(n -> new Node(n, n))
+                    .collect(Collectors.toSet())));
         } else {
             newLogicalTopology = null;
         }
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/util/DistributionZonesTestUtil.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/util/DistributionZonesTestUtil.java
deleted file mode 100644
index 863a28c7cb..0000000000
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/util/DistributionZonesTestUtil.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.distributionzones.util;
-
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
-import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Util class for methods for Distribution zones tests.
- */
-public class DistributionZonesTestUtil {
-    /**
-     * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)}.
-     *
-     * @param zoneId Zone id.
-     * @param clusterNodes Data nodes.
-     * @param keyValueStorage Key-value storage.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void assertDataNodesForZone(
-            int zoneId,
-            @Nullable Set<String> clusterNodes,
-            KeyValueStorage keyValueStorage
-    ) throws InterruptedException {
-        assertValueInStorage(
-                keyValueStorage,
-                zoneDataNodesKey(zoneId).bytes(),
-                value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
-                clusterNodes,
-                2000
-        );
-    }
-
-    /**
-     * Asserts {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} revision.
-     *
-     * @param revision Revision.
-     * @param zoneId Zone id.
-     * @param keyValueStorage Key-value storage.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void assertZoneScaleUpChangeTriggerKey(
-            @Nullable Long revision,
-            int zoneId,
-            KeyValueStorage keyValueStorage
-    ) throws InterruptedException {
-        assertValueInStorage(
-                keyValueStorage,
-                zoneScaleUpChangeTriggerKey(zoneId).bytes(),
-                ByteUtils::bytesToLong,
-                revision,
-                2000
-        );
-    }
-
-    /**
-     * Asserts {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} revision.
-     *
-     * @param revision Revision.
-     * @param zoneId Zone id.
-     * @param keyValueStorage Key-value storage.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void assertZoneScaleDownChangeTriggerKey(
-            @Nullable Long revision,
-            int zoneId,
-            KeyValueStorage keyValueStorage
-    ) throws InterruptedException {
-        assertValueInStorage(
-                keyValueStorage,
-                zoneScaleDownChangeTriggerKey(zoneId).bytes(),
-                ByteUtils::bytesToLong,
-                revision,
-                2000
-        );
-    }
-
-    /**
-     * Asserts {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} revision.
-     *
-     * @param revision Revision.
-     * @param zoneId Zone id.
-     * @param keyValueStorage Key-value storage.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void assertZonesChangeTriggerKey(
-            long revision,
-            int zoneId,
-            KeyValueStorage keyValueStorage
-    ) throws InterruptedException {
-        assertValueInStorage(
-                keyValueStorage,
-                zonesChangeTriggerKey(zoneId).bytes(),
-                ByteUtils::bytesToLong,
-                revision,
-                1000
-        );
-    }
-
-    /**
-     * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value.
-     *
-     * @param clusterNodes Expected cluster nodes.
-     * @param keyValueStorage Key-value storage.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void assertLogicalTopology(
-            @Nullable Set<LogicalNode> clusterNodes,
-            KeyValueStorage keyValueStorage
-    ) throws InterruptedException {
-        Set<String> nodes = clusterNodes == null
-                ? null
-                : clusterNodes.stream().map(LogicalNode::name).collect(Collectors.toSet());
-
-        assertValueInStorage(
-                keyValueStorage,
-                zonesLogicalTopologyKey().bytes(),
-                ByteUtils::fromBytes,
-                nodes,
-                1000
-        );
-    }
-
-    /**
-     * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} value.
-     *
-     * @param topVer Topology version.
-     * @param keyValueStorage Key-value storage.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void assertLogicalTopologyVersion(long topVer, KeyValueStorage keyValueStorage) throws InterruptedException {
-        assertValueInStorage(
-                keyValueStorage,
-                zonesLogicalTopologyVersionKey().bytes(),
-                ByteUtils::bytesToLong,
-                topVer,
-                1000
-        );
-    }
-
-    /**
-     * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value.
-     *
-     * @param clusterNodes Expected cluster nodes' names.
-     * @param keyValueStorage Key-value storage.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void assertLogicalTopologyWithNodeNames(@Nullable Set<String> clusterNodes, KeyValueStorage keyValueStorage)
-            throws InterruptedException {
-        assertValueInStorage(
-                keyValueStorage,
-                zonesLogicalTopologyKey().bytes(),
-                ByteUtils::fromBytes,
-                clusterNodes,
-                1000
-        );
-    }
-
-    /**
-     * This method is used to initialize the meta storage revision before starting the distribution zone manager.
-     * TODO: IGNITE-19403 Watch listeners must be deployed after the zone manager starts.
-     *
-     * @param metaStorageManager Meta storage manager.
-     * @throws NodeStoppingException If node is stopping.
-     * @throws InterruptedException If thread was interrupted.
-     */
-    public static void deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager metaStorageManager)
-            throws NodeStoppingException, InterruptedException {
-        // Watches are deployed before distributionZoneManager start in order to update Meta Storage revision before
-        // distributionZoneManager's recovery.
-        metaStorageManager.deployWatches();
-
-        // Bump Meta Storage applied revision by modifying a fake key. DistributionZoneManager breaks on start if Vault is not empty, but
-        // Meta Storage revision is equal to 0.
-        var fakeKey = new ByteArray("foobar");
-
-        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
-                Conditions.notExists(fakeKey),
-                Operations.put(fakeKey, fakeKey.bytes()),
-                Operations.noop()
-        );
-
-        assertThat(invokeFuture, willBe(true));
-
-        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() > 0, 10_000));
-    }
-
-    private static <T> void assertValueInStorage(
-            KeyValueStorage keyValueStorage,
-            byte[] key,
-            Function<byte[], T> valueTransformer,
-            @Nullable T expectedValue,
-            long timeoutMillis
-    ) throws InterruptedException {
-        boolean success = waitForCondition(() -> {
-            byte[] storageValue = keyValueStorage.get(key).value();
-
-            T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
-
-            return Objects.equals(actualValue, expectedValue);
-        }, timeoutMillis);
-
-        // We do a second check simply to print a nice error message in case the condition above is not achieved.
-        if (!success) {
-            byte[] storageValue = keyValueStorage.get(key).value();
-
-            assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
-        }
-    }
-}
diff --git a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 7db6f48943..98daab9098 100644
--- a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++ b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -17,12 +17,49 @@
 
 package org.apache.ignite.internal.distributionzones;
 
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_FILTER;
 import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.schema.configuration.storage.DataStorageChange;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
 
 /**
  * Utils to manage distribution zones inside tests.
@@ -90,4 +127,320 @@ public class DistributionZonesTestUtil {
 
         return zoneManager.alterZone(zoneName, distributionZoneCfgBuilder.build());
     }
+
+    /**
+     * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)}.
+     *
+     * @param zoneId Zone id.
+     * @param clusterNodes Data nodes.
+     * @param keyValueStorage Key-value storage.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertDataNodesForZone(
+            int zoneId,
+            @Nullable Set<LogicalNode> clusterNodes,
+            KeyValueStorage keyValueStorage
+    ) throws InterruptedException {
+        Set<Node> nodes = clusterNodes == null
+                ? null
+                : clusterNodes.stream().map(n -> new Node(n.name(), n.id())).collect(Collectors.toSet());
+
+        assertValueInStorage(
+                keyValueStorage,
+                zoneDataNodesKey(zoneId).bytes(),
+                value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+                nodes,
+                2000
+        );
+    }
+
+    /**
+     * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)}.
+     *
+     * @param zoneId Zone id.
+     * @param nodes Data nodes.
+     * @param keyValueStorage Key-value storage.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertDataNodesForZoneWithAttributes(
+            int zoneId,
+            @Nullable Set<Node> nodes,
+            KeyValueStorage keyValueStorage
+    ) throws InterruptedException {
+        assertDataNodesForZoneWithAttributes(zoneId, nodes, keyValueStorage, DEFAULT_FILTER);
+    }
+
+    /**
+     * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)} and with provided {@code filter}.
+     *
+     * @param zoneId Zone id.
+     * @param nodes Data nodes.
+     * @param keyValueStorage Key-value storage.
+     * @param filter Filter for data nodes.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertDataNodesForZoneWithAttributes(
+            int zoneId,
+            @Nullable Set<Node> nodes,
+            KeyValueStorage keyValueStorage,
+            String filter
+    ) throws InterruptedException {
+        assertValueInStorage(
+                keyValueStorage,
+                zoneDataNodesKey(zoneId).bytes(),
+                value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+                nodes,
+                2000
+        );
+    }
+
+    /**
+     * Asserts {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} revision.
+     *
+     * @param revision Revision.
+     * @param zoneId Zone id.
+     * @param keyValueStorage Key-value storage.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertZoneScaleUpChangeTriggerKey(
+            @Nullable Long revision,
+            int zoneId,
+            KeyValueStorage keyValueStorage
+    ) throws InterruptedException {
+        assertValueInStorage(
+                keyValueStorage,
+                zoneScaleUpChangeTriggerKey(zoneId).bytes(),
+                ByteUtils::bytesToLong,
+                revision,
+                2000
+        );
+    }
+
+    /**
+     * Asserts {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} revision.
+     *
+     * @param revision Revision.
+     * @param zoneId Zone id.
+     * @param keyValueStorage Key-value storage.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertZoneScaleDownChangeTriggerKey(
+            @Nullable Long revision,
+            int zoneId,
+            KeyValueStorage keyValueStorage
+    ) throws InterruptedException {
+        assertValueInStorage(
+                keyValueStorage,
+                zoneScaleDownChangeTriggerKey(zoneId).bytes(),
+                ByteUtils::bytesToLong,
+                revision,
+                2000
+        );
+    }
+
+    /**
+     * Asserts {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} revision.
+     *
+     * @param revision Revision.
+     * @param zoneId Zone id.
+     * @param keyValueStorage Key-value storage.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertZonesChangeTriggerKey(
+            long revision,
+            int zoneId,
+            KeyValueStorage keyValueStorage
+    ) throws InterruptedException {
+        assertValueInStorage(
+                keyValueStorage,
+                zonesChangeTriggerKey(zoneId).bytes(),
+                ByteUtils::bytesToLong,
+                revision,
+                1000
+        );
+    }
+
+    /**
+     * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value.
+     *
+     * @param clusterNodes Expected cluster nodes.
+     * @param keyValueStorage Key-value storage.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertLogicalTopology(
+            @Nullable Set<LogicalNode> clusterNodes,
+            KeyValueStorage keyValueStorage
+    ) throws InterruptedException {
+        Set<NodeWithAttributes> nodes = clusterNodes == null
+                ? null
+                : clusterNodes.stream().map(n -> new NodeWithAttributes(n.name(), n.id(), n.nodeAttributes())).collect(Collectors.toSet());
+
+        assertValueInStorage(
+                keyValueStorage,
+                zonesLogicalTopologyKey().bytes(),
+                ByteUtils::fromBytes,
+                nodes,
+                1000
+        );
+    }
+
+    /**
+     * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} value.
+     *
+     * @param topVer Topology version.
+     * @param keyValueStorage Key-value storage.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void assertLogicalTopologyVersion(long topVer, KeyValueStorage keyValueStorage) throws InterruptedException {
+        assertValueInStorage(
+                keyValueStorage,
+                zonesLogicalTopologyVersionKey().bytes(),
+                ByteUtils::bytesToLong,
+                topVer,
+                1000
+        );
+    }
+
+    /**
+     * This method is used to initialize the meta storage revision before starting the distribution zone manager.
+     * TODO: IGNITE-19403 Watch listeners must be deployed after the zone manager starts.
+     *
+     * @param metaStorageManager Meta storage manager.
+     * @throws NodeStoppingException If node is stopping.
+     * @throws InterruptedException If thread was interrupted.
+     */
+    public static void deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager metaStorageManager)
+            throws NodeStoppingException, InterruptedException {
+        // Watches are deployed before distributionZoneManager start in order to update Meta Storage revision before
+        // distributionZoneManager's recovery.
+        metaStorageManager.deployWatches();
+
+        // Bump Meta Storage applied revision by modifying a fake key. DistributionZoneManager breaks on start if Vault is not empty, but
+        // Meta Storage revision is equal to 0.
+        var fakeKey = new ByteArray("foobar");
+
+        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
+                Conditions.notExists(fakeKey),
+                Operations.put(fakeKey, fakeKey.bytes()),
+                Operations.noop()
+        );
+
+        assertThat(invokeFuture, willBe(true));
+
+        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() > 0, 10_000));
+    }
+
+    /**
+     * Sets logical topology to Vault.
+     *
+     * @param nodes Logical topology
+     * @param vaultMgr Vault manager
+     */
+    public static void mockVaultZonesLogicalTopologyKey(Set<LogicalNode> nodes, VaultManager vaultMgr) {
+        Set<NodeWithAttributes> nodesWithAttributes = nodes.stream()
+                .map(n -> new NodeWithAttributes(n.name(), n.id(), n.nodeAttributes()))
+                .collect(Collectors.toSet());
+
+        byte[] newLogicalTopology = toBytes(nodesWithAttributes);
+
+        assertThat(vaultMgr.put(zonesLogicalTopologyKey(), newLogicalTopology), willCompleteSuccessfully());
+    }
+
+    /**
+     * Sets logical topology to Meta Storage.
+     *
+     * @param nodes Logical topology
+     * @param topVer Topology version
+     * @param metaStorageManager Meta Storage manager.
+     */
+    public static void setLogicalTopologyInMetaStorage(Set<LogicalNode> nodes, long topVer, MetaStorageManager metaStorageManager) {
+        Iif iff = iif(
+                Conditions.exists(zonesLogicalTopologyKey()),
+                updateLogicalTopologyAndVersion(nodes, topVer),
+                ops().yield(false)
+        );
+
+        CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(iff).thenApply(StatementResult::getAsBoolean);
+
+        assertThat(invokeFuture, willBe(true));
+    }
+
+    /**
+     * Asserts value from the storage.
+     *
+     * @param keyValueStorage Key-value storage.
+     * @param key Key of value to check.
+     * @param valueTransformer Function that is applied to value from the storage.
+     * @param expectedValue Expected value.
+     * @param timeoutMillis Timeout in milliseconds.
+     * @param <T> A type of value from storage.
+     * @throws InterruptedException If interrupted.
+     */
+    public static <T> void assertValueInStorage(
+            KeyValueStorage keyValueStorage,
+            byte[] key,
+            Function<byte[], T> valueTransformer,
+            @Nullable T expectedValue,
+            long timeoutMillis
+    ) throws InterruptedException {
+        boolean success = waitForCondition(() -> {
+            byte[] storageValue = keyValueStorage.get(key).value();
+
+            T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
+
+            return Objects.equals(actualValue, expectedValue);
+        }, timeoutMillis);
+
+        // We do a second check simply to print a nice error message in case the condition above is not achieved.
+        if (!success) {
+            byte[] storageValue = keyValueStorage.get(key).value();
+
+            assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
+        }
+    }
+
+    /**
+     * Asserts value from the meta storage.
+     *
+     * @param metaStorageManager Meta Storage manager.
+     * @param key Key of value to check.
+     * @param valueTransformer Function that is applied to value from the meta storage.
+     * @param expectedValue Expected value.
+     * @param timeoutMillis Timeout in milliseconds.
+     * @param <T> A type of value from the meta storage.
+     * @throws InterruptedException If interrupted.
+     */
+    public static <T> void assertValueInStorage(
+            MetaStorageManager metaStorageManager,
+            ByteArray key,
+            Function<byte[], T> valueTransformer,
+            @Nullable T expectedValue,
+            long timeoutMillis
+    ) throws InterruptedException {
+        boolean success = waitForCondition(() -> {
+            byte[] storageValue = new byte[0];
+            try {
+                storageValue = metaStorageManager.get(key).get().value();
+            } catch (InterruptedException | ExecutionException e) {
+                fail();
+            }
+
+            T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
+
+            return Objects.equals(actualValue, expectedValue);
+        }, timeoutMillis);
+
+        // We do a second check simply to print a nice error message in case the condition above is not achieved.
+        if (!success) {
+            byte[] storageValue = new byte[0];
+
+            try {
+                storageValue = metaStorageManager.get(key).get().value();
+            } catch (ExecutionException e) {
+                fail();
+            }
+
+            assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
+        }
+    }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
new file mode 100644
index 0000000000..522a6e07b3
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distribution.zones;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.distributionzones.Node;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.sql.Session;
+import org.intellij.lang.annotations.Language;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for data nodes' filters functionality.
+ */
+public class ItDistributionZonesFilterTest extends ClusterPerTestIntegrationTest {
+    @Language("JSON")
+    private static final String NODE_ATTRIBUTES = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
+
+    private static final String COLUMN_KEY = "key";
+
+    private static final String COLUMN_VAL = "val";
+
+    private static final int TIMEOUT_MILLIS = 10_000;
+
+    @Language("JSON")
+    private static String createStartConfig(@Language("JSON") String nodeAttributes) {
+        return "{\n"
+                + "  network: {\n"
+                + "    port: {},\n"
+                + "    nodeFinder: {\n"
+                + "      netClusterNodes: [ {} ]\n"
+                + "    }\n"
+                + "  },"
+                + "  nodeAttributes: {\n"
+                + "    nodeAttributes: " + nodeAttributes
+                + "  },\n"
+                + "}";
+    }
+
+    @Override
+    protected String getNodeBootstrapConfigTemplate() {
+        return createStartConfig(NODE_ATTRIBUTES);
+    }
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    /**
+     * Tests the scenario when filter was applied to data nodes and stable key for rebalance was changed to that filtered value.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    void testFilteredDataNodesPropagatedToStable() throws Exception {
+        String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
+
+        // This node do not pass the filter
+        @Language("JSON") String firstNodeAttributes = "{region:{attribute:\"EU\"},storage:{attribute:\"SSD\"}}";
+
+        IgniteImpl node = startNode(1, createStartConfig(firstNodeAttributes));
+
+        Session session = node.sql().createSession();
+
+        session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+                + "\"REPLICAS\" = 3, "
+                + "\"PARTITIONS\" = 2, "
+                + "\"DATA_NODES_FILTER\" = " + filter + ", "
+                + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
+                + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
+
+        String tableName = "table1";
+
+        session.execute(null, "CREATE TABLE " + tableName + "("
+                + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) WITH PRIMARY_ZONE='TEST_ZONE'");
+
+        MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
+                .getFieldValue(node, IgniteImpl.class, "metaStorageMgr");
+
+        TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "distributedTblMgr");
+
+        TableImpl table = (TableImpl) tableManager.table(tableName);
+
+        TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+                null,
+                TIMEOUT_MILLIS
+        );
+
+        @Language("JSON") String secondNodeAttributes = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
+
+        // This node pass the filter
+        startNode(2, createStartConfig(secondNodeAttributes));
+
+        assertValueInStorage(
+                metaStorageManager,
+                zoneDataNodesKey(1),
+                (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+                3,
+                TIMEOUT_MILLIS
+        );
+
+        Entry dataNodesEntry1 = metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+
+        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= dataNodesEntry1.revision(), TIMEOUT_MILLIS));
+
+        // We check that two nodes that pass the filter are presented in the stable key.
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+                2,
+                TIMEOUT_MILLIS * 2
+        );
+
+        byte[] stableAssignments = metaStorageManager.get(stablePartAssignmentsKey(partId)).get(5_000, MILLISECONDS).value();
+
+        assertNotNull(stableAssignments);
+
+        Set<String> stable = ((Set<Assignment>) fromBytes(stableAssignments)).stream()
+                .map(Assignment::consistentId)
+                .collect(Collectors.toSet());
+
+        assertEquals(2, stable.size());
+
+        assertTrue(stable.contains(node(0).name()) && stable.contains(node(2).name()));
+    }
+
+    /**
+     * Tests the scenario when removal node from the logical topology leads to empty data nodes, but this empty data nodes do not trigger
+     * rebalance. Data nodes become empty after applying corresponding filter.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19443")
+    void testFilteredEmptyDataNodesDoNotTriggerRebalance() throws Exception {
+        String filter = "'$[?(@.region == \"EU\" && @.storage == \"HDD\")]'";
+
+        // This node do not pass the filter.
+        IgniteImpl node0 = node(0);
+
+        // This node pass the filter
+        @Language("JSON") String firstNodeAttributes = "{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
+
+        IgniteImpl node1 = startNode(1, createStartConfig(firstNodeAttributes));
+
+        Session session = node1.sql().createSession();
+
+        session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+                + "\"REPLICAS\" = 1, "
+                + "\"PARTITIONS\" = 1, "
+                + "\"DATA_NODES_FILTER\" = " + filter + ", "
+                + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
+                + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
+
+        String tableName = "table1";
+
+        session.execute(null, "CREATE TABLE " + tableName + "("
+                + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) WITH PRIMARY_ZONE='TEST_ZONE'");
+
+        MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
+                .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+
+        TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
+
+        TableImpl table = (TableImpl) tableManager.table(tableName);
+
+        TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+                null,
+                TIMEOUT_MILLIS
+        );
+
+        assertValueInStorage(
+                metaStorageManager,
+                zoneDataNodesKey(1),
+                (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+                2,
+                TIMEOUT_MILLIS
+        );
+
+        Entry dataNodesEntry = metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+
+        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= dataNodesEntry.revision(), 5_000));
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+                null,
+                TIMEOUT_MILLIS
+        );
+
+        stopNode(1);
+
+        assertValueInStorage(
+                metaStorageManager,
+                zoneDataNodesKey(1),
+                (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+                1,
+                TIMEOUT_MILLIS
+        );
+
+        Entry newDataNodesEntry = metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+
+        assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= newDataNodesEntry.revision(), 5_000));
+
+        assertValueInStorage(
+                metaStorageManager,
+                pendingPartAssignmentsKey(partId),
+                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+                null,
+                TIMEOUT_MILLIS
+        );
+
+        assertValueInStorage(
+                metaStorageManager,
+                stablePartAssignmentsKey(partId),
+                (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+                null,
+                TIMEOUT_MILLIS
+        );
+    }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 6b0a1c27f2..63b3e4d66b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -190,6 +190,10 @@ public class DdlCommandHandler {
             zoneCfgBuilder.partitions(cmd.partitions());
         }
 
+        if (cmd.nodeFilter() != null) {
+            zoneCfgBuilder.filter(cmd.nodeFilter());
+        }
+
         zoneCfgBuilder.dataStorageChangeConsumer(
                 dataStorageManager.zoneDataStorageConsumer(cmd.dataStorage(), cmd.dataStorageOptions()));
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index dda37dce6c..93f47ffde1 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -38,6 +38,7 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmen
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -2016,7 +2017,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         LOG.info("Received update on pending assignments. Check if new raft group should be started"
                         + " [key={}, partition={}, table={}, localMemberAddress={}]",
-                pendingAssignmentsEntry.key(), partId, tbl.name(), localMember.address());
+                new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());
 
         CompletableFuture<Void> localServicesStartFuture;