You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/06/01 09:40:00 UTC
[ignite-3] branch main updated: IGNITE-18963 Altering filters must recalculate data nodes (#2122)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a5586fff99 IGNITE-18963 Altering filters must recalculate data nodes (#2122)
a5586fff99 is described below
commit a5586fff99964c8a1a74b3077e5c86e83182fa37
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Thu Jun 1 13:39:54 2023 +0400
IGNITE-18963 Altering filters must recalculate data nodes (#2122)
---
.../distributionzones/DistributionZoneManager.java | 28 +-
.../distributionzones/DistributionZonesUtil.java | 2 +-
.../DistributionZoneManagerAlterFilterTest.java | 373 +++++++++++++++++++++
.../DistributionZoneManagerScaleUpTest.java | 60 ++--
.../DistributionZonesTestUtil.java | 30 ++
.../zones/ItDistributionZonesFilterTest.java | 136 ++++++++
6 files changed, 595 insertions(+), 34 deletions(-)
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 6ee1017f77..5a65d0629e 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -39,11 +39,11 @@ import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateDataNodesAndTriggerKeys;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneLogicalTopologyPrefix;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesDataNodesPrefix;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyPrefix;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
@@ -323,10 +323,12 @@ public class DistributionZoneManager implements IgniteComponent {
zonesConfiguration.distributionZones().listenElements(zonesConfigurationListener);
zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+ zonesConfiguration.distributionZones().any().filter().listen(onUpdateFilter());
zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+ zonesConfiguration.defaultDistributionZone().filter().listen(onUpdateFilter());
rebalanceEngine.start();
@@ -341,7 +343,7 @@ public class DistributionZoneManager implements IgniteComponent {
logicalTopologyService.addEventListener(topologyEventListener);
- metaStorageManager.registerPrefixWatch(zoneLogicalTopologyPrefix(), topologyWatchListener);
+ metaStorageManager.registerPrefixWatch(zonesLogicalTopologyPrefix(), topologyWatchListener);
metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(), dataNodesWatchListener);
initDataNodesFromVaultManager();
@@ -881,6 +883,26 @@ public class DistributionZoneManager implements IgniteComponent {
};
}
+ /**
+ * Creates configuration listener for updates of zone's filter value.
+ *
+ * @return Configuration listener for updates of zone's filter value.
+ */
+ private ConfigurationListener<String> onUpdateFilter() {
+ return ctx -> {
+ if (ctx.oldValue() == null) {
+ // zone creation, already handled in a separate listener.
+ return completedFuture(null);
+ }
+
+ int zoneId = ctx.newValue(DistributionZoneView.class).zoneId();
+
+ saveDataNodesToMetaStorageOnScaleUp(zoneId, ctx.storageRevision());
+
+ return completedFuture(null);
+ };
+ }
+
private class ZonesConfigurationListener implements ConfigurationNamedListListener<DistributionZoneView> {
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
@@ -2068,7 +2090,7 @@ public class DistributionZoneManager implements IgniteComponent {
}
@TestOnly
- Map<Integer, ZoneState> zonesTimers() {
+ Map<Integer, ZoneState> zonesState() {
return zonesState;
}
}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 0a4a662922..d52acbea9a 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -123,7 +123,7 @@ public class DistributionZonesUtil {
*
* @return ByteArray representation.
*/
- public static ByteArray zoneLogicalTopologyPrefix() {
+ public static ByteArray zonesLogicalTopologyPrefix() {
return new ByteArray(DISTRIBUTION_ZONES_LOGICAL_TOPOLOGY_PREFIX);
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
new file mode 100644
index 0000000000..844bd0ec37
--- /dev/null
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerAlterFilterTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesFromManager;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.internal.metastorage.server.If;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Test scenarios when filter of a zone is altered and immediate scale up is triggered.
+ */
+public class DistributionZoneManagerAlterFilterTest extends BaseDistributionZoneManagerTest {
+ private static final String ZONE_NAME = "zone1";
+
+ private static final int ZONE_ID = 1;
+
+ private static final String FILTER = "$[?(@.storage == 'SSD' || @.region == 'US')]";
+
+ private static final long TIMEOUT_MILLIS = 10_000L;
+
+ private static final int TIMER_SECONDS = 10_000;
+
+ private static final LogicalNode A = new LogicalNode(
+ new ClusterNode("1", "A", new NetworkAddress("localhost", 123)),
+ Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10")
+ );
+
+ private static final LogicalNode B = new LogicalNode(
+ new ClusterNode("2", "B", new NetworkAddress("localhost", 123)),
+ Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30")
+ );
+
+ private static final LogicalNode C = new LogicalNode(
+ new ClusterNode("3", "C", new NetworkAddress("localhost", 123)),
+ Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+ );
+
+ private static final LogicalNode D = new LogicalNode(
+ new ClusterNode("4", "D", new NetworkAddress("localhost", 123)),
+ Map.of("region", "CN", "storage", "HDD", "dataRegionSize", "20")
+ );
+
+ /**
+ * Tests that node that was added before altering filter is taken into account after altering of a filter and corresponding
+ * immediate scale up.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @MethodSource("provideArgumentsForFilterAlteringTests")
+ void testAlterFilter(int zoneId, String zoneName) throws Exception {
+ preparePrerequisites(zoneId);
+
+ // Change timers to infinite, add new node, alter filter and check that data nodes was changed.
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+ .filter(FILTER)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ topology.putNode(D);
+
+ // Nodes C and D match the filter.
+ String newFilter = "$[?(@.region == 'CN')]";
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .filter(newFilter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change this to the causality versioned call to dataNodes.
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(C, D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+ }
+
+ /**
+ * Tests that empty data nodes are propagated after altering of a filter and corresponding immediate scale up.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @MethodSource("provideArgumentsForFilterAlteringTests")
+ void testAlterFilterToEmtpyNodes(int zoneId, String zoneName) throws Exception {
+ preparePrerequisites(zoneId);
+
+ // Change timers to infinite, add new node, alter filter and check that data nodes was changed.
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
+ .filter(FILTER)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ topology.putNode(D);
+
+ // No nodes are matching the filter
+ String newFilter = "$[?(@.region == 'JP')]";
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .filter(newFilter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change this to the causality versioned call to dataNodes.
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(),
+ TIMEOUT_MILLIS
+ );
+ }
+
+ /**
+ * Tests that altering of a filter affects only scale up timers and only added nodes.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @MethodSource("provideArgumentsForFilterAlteringTests")
+ void testAlterFilterDoNotAffectScaleDown(int zoneId, String zoneName) throws Exception {
+ preparePrerequisites(IMMEDIATE_TIMER_VALUE, TIMER_SECONDS, zoneId);
+
+ topology.putNode(D);
+
+ if (zoneId == ZONE_ID) {
+ assertNull(distributionZoneManager.zonesState().get(ZONE_ID).scaleDownTask());
+ }
+
+ topology.removeNodes(Set.of(C));
+
+ // Check that scale down task was scheduled.
+ assertTrue(
+ waitForCondition(
+ () -> distributionZoneManager.zonesState().get(zoneId).scaleDownTask() != null,
+ TIMEOUT_MILLIS
+ )
+ );
+
+ // Nodes C and D match the filter.
+ String newFilter = "$[?(@.region == 'CN')]";
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .filter(newFilter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change this to the causality versioned call to dataNodes.
+ // Node C is still in data nodes because altering a filter triggers only immediate scale up.
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(C, D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+
+ // Check that scale down task is still scheduled.
+ assertNotNull(distributionZoneManager.zonesState().get(zoneId).scaleUpTask());
+
+ // Alter zone so we could check that node C is removed from data nodes.
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .filter(newFilter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+ }
+
+ /**
+ * Tests that node, that was added after altering of a filter (meaning, that immediate scale up was triggered), is added to data nodes
+ * only after altering and corresponding scale up.
+ *
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @MethodSource("provideArgumentsForFilterAlteringTests")
+ void testNodeAddedWhileAlteringFilter(int zoneId, String zoneName) throws Exception {
+ preparePrerequisites(TIMER_SECONDS, INFINITE_TIMER_VALUE, zoneId);
+
+ if (zoneId == ZONE_ID) {
+ assertNull(distributionZoneManager.zonesState().get(ZONE_ID).scaleUpTask());
+ }
+
+ topology.putNode(D);
+
+ // Check that scale up task was scheduled.
+ assertTrue(
+ waitForCondition(
+ () -> distributionZoneManager.zonesState().get(zoneId).scaleUpTask() != null,
+ TIMEOUT_MILLIS
+ )
+ );
+
+ // Nodes C and D and E match the filter.
+ String newFilter = "$[?(@.region == 'CN')]";
+
+ distributionZoneManager.alterZone(zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .filter(newFilter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ LogicalNode e = new LogicalNode(
+ new ClusterNode("5", "E", new NetworkAddress("localhost", 123)),
+ Map.of("region", "CN", "storage", "HDD", "dataRegionSize", "20")
+ );
+
+ doAnswer(invocation -> {
+ If iif = invocation.getArgument(0);
+
+ // Emulate a situation when immediate timer was run after filter altering and new node was added, so timer was scheduled.
+ byte[] key = zoneScaleUpChangeTriggerKey(zoneId).bytes();
+
+ if (Arrays.stream(iif.cond().keys()).anyMatch(k -> Arrays.equals(key, k))) {
+ assertNotNull(distributionZoneManager.zonesState().get(zoneId).scaleUpTask());
+
+ topology.putNode(e);
+ }
+ return invocation.callRealMethod();
+ }).when(keyValueStorage).invoke(any(), any());
+
+ // Check that node E, that was added while filter's altering, is not propagated to data nodes.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change this to the causality versioned call to dataNodes.
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(C, D).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+
+ // Assert that scheduled timer was not canceled because of immediate scale up after filter altering.
+ assertNotNull(distributionZoneManager.zonesState().get(zoneId).scaleUpTask());
+
+ distributionZoneManager.alterZone(
+ zoneName,
+ new DistributionZoneConfigurationParameters.Builder(zoneName)
+ .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .filter(newFilter)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ // Check that node E, that was added after filter's altering, was added only after altering immediate scale up.
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(C, D, e).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+ }
+
+ /**
+ * Starts distribution zone manager with a zone and checks that two out of three nodes, which match filter,
+ * are presented in the zones data nodes.
+ *
+ * @throws Exception If failed
+ */
+ private void preparePrerequisites(int zoneId) throws Exception {
+ preparePrerequisites(IMMEDIATE_TIMER_VALUE, IMMEDIATE_TIMER_VALUE, zoneId);
+ }
+
+ /**
+ * Starts distribution zone manager with a zone and checks that two out of three nodes, which match filter,
+ * are presented in the zones data nodes.
+ *
+ * @throws Exception If failed
+ */
+ private void preparePrerequisites(int scaleUpTimer, int scaleDownTimer, int zoneId) throws Exception {
+ topology.putNode(A);
+ topology.putNode(B);
+ topology.putNode(C);
+
+ startDistributionZoneManager();
+
+ if (zoneId == DEFAULT_ZONE_ID) {
+ distributionZoneManager.alterZone(
+ DEFAULT_ZONE_NAME,
+ new DistributionZoneConfigurationParameters.Builder(DEFAULT_ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(scaleUpTimer)
+ .dataNodesAutoAdjustScaleDown(scaleDownTimer)
+ .filter(FILTER)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } else {
+ distributionZoneManager.createZone(
+ new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(scaleUpTimer)
+ .dataNodesAutoAdjustScaleDown(scaleDownTimer)
+ .filter(FILTER)
+ .build()
+ ).get(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ assertDataNodesFromManager(
+ distributionZoneManager,
+ zoneId,
+ Set.of(A, C).stream().map(ClusterNode::name).collect(Collectors.toSet()),
+ TIMEOUT_MILLIS
+ );
+ }
+
+ private static Stream<Arguments> provideArgumentsForFilterAlteringTests() {
+ List<Arguments> args = new ArrayList<>();
+
+ args.add(Arguments.of(DEFAULT_ZONE_ID, DEFAULT_ZONE_NAME));
+ args.add(Arguments.of(ZONE_ID, ZONE_NAME));
+
+ return args.stream();
+ }
+}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 25bcfe8920..18eabf32b4 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -687,7 +687,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testCleanUpAfterSchedulers() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToAddToDataNodes(Set.of(E), 1004);
@@ -729,7 +729,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertDataNodesForZone(ZONE_1_ID, Set.of(), keyValueStorage);
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
assertNull(zoneState.scaleUpTask());
@@ -761,7 +761,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
assertNull(zoneState.scaleDownTask());
@@ -857,7 +857,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios1_1() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
@@ -872,7 +872,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios1_2() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
@@ -887,7 +887,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios1_3() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
@@ -902,7 +902,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios1_4() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
@@ -917,7 +917,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios2_1() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -933,7 +933,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios2_2() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -948,7 +948,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios2_3() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -963,7 +963,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios2_4() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -978,7 +978,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios3_1() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -994,7 +994,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios3_2() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -1010,7 +1010,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios3_3() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -1028,7 +1028,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios3_4() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -1044,7 +1044,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios3_5() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -1060,7 +1060,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios3_6() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -1076,7 +1076,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios4_1() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -1093,7 +1093,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios4_2() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -1109,7 +1109,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios4_3() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
@@ -1125,7 +1125,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios4_4() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -1141,7 +1141,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios4_5() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -1157,7 +1157,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios4_6() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
@@ -1173,7 +1173,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios5_1() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
@@ -1189,7 +1189,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios5_2() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
@@ -1205,7 +1205,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios5_3() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
@@ -1221,7 +1221,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios5_4() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
@@ -1237,7 +1237,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios5_5() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
@@ -1253,7 +1253,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
void testVariousScaleUpScaleDownScenarios5_6() throws Exception {
preparePrerequisites();
- ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
+ ZoneState zoneState = distributionZoneManager.zonesState().get(ZONE_1_ID);
zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
@@ -1354,7 +1354,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
private void assertThatZonesAugmentationMapContainsRevision(int zoneId, long revisionToAssert) throws InterruptedException {
assertTrue(
waitForCondition(
- () -> distributionZoneManager.zonesTimers().get(zoneId).topologyAugmentationMap().containsKey(revisionToAssert),
+ () -> distributionZoneManager.zonesState().get(zoneId).topologyAugmentationMap().containsKey(revisionToAssert),
1000
)
);
diff --git a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 98daab9098..132795d09f 100644
--- a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++ b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -443,4 +443,34 @@ public class DistributionZonesTestUtil {
assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
}
}
+
+ /**
+ * Asserts data nodes from the distribution zone manager.
+ *
+ * @param distributionZoneManager Distribution zone manager.
+ * @param zoneId Zone id.
+ * @param expectedValue Expected value.
+ * @param timeoutMillis Timeout in milliseconds.
+ * @throws InterruptedException If interrupted.
+ */
+ public static void assertDataNodesFromManager(
+ DistributionZoneManager distributionZoneManager,
+ int zoneId,
+ @Nullable Set<String> expectedValue,
+ long timeoutMillis
+ ) throws InterruptedException {
+ boolean success = waitForCondition(() -> {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change this to the causality versioned call to dataNodes.
+ Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
+
+ return Objects.equals(dataNodes, expectedValue);
+ }, timeoutMillis);
+
+ // We do a second check simply to print a nice error message in case the condition above is not achieved.
+ if (!success) {
+ Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
+
+ assertThat(dataNodes, is(expectedValue));
+ }
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
index 57ade2725a..94a49aba7f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.distribution.zones;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_FILTER;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -173,6 +174,133 @@ public class ItDistributionZonesFilterTest extends ClusterPerTestIntegrationTest
assertTrue(stable.contains(node(0).name()) && stable.contains(node(2).name()));
}
+ /**
+ * Tests the scenario when altering filter triggers immediate scale up so data nodes
+ * and stable key for rebalance is changed to the new value even if scale up timer is big enough.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void testAlteringFiltersPropagatedDataNodesToStableImmediately() throws Exception {
+ String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
+
+ IgniteImpl node0 = node(0);
+
+ Session session = node0.sql().createSession();
+
+ session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+ + "\"REPLICAS\" = 3, "
+ + "\"PARTITIONS\" = 2, "
+ + "\"DATA_NODES_FILTER\" = " + filter + ", "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 10000, "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 10000");
+
+ String tableName = "table1";
+
+ session.execute(null, "CREATE TABLE " + tableName + "("
+ + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) WITH PRIMARY_ZONE='TEST_ZONE'");
+
+ MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
+ .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+
+ TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
+
+ TableImpl table = (TableImpl) tableManager.table(tableName);
+
+ TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ null,
+ TIMEOUT_MILLIS
+ );
+
+ @Language("JSON") String firstNodeAttributes = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
+
+ // This node pass the filter
+ startNode(1, createStartConfig(firstNodeAttributes));
+
+ // Expected size is 1 because we have timers equals to 10000, so no scale up will be propagated.
+ waitDataNodeAndListenersAreHandled(metaStorageManager, 1);
+
+ session.execute(null, "ALTER ZONE \"TEST_ZONE\" SET "
+ + "\"DATA_NODES_FILTER\" = '" + DEFAULT_FILTER + "'");
+
+ // We check that all nodes that pass the filter are presented in the stable key because altering filter triggers immediate scale up.
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v))
+ .stream().map(Assignment::consistentId).collect(Collectors.toSet()),
+ Set.of(node(0).name(), node(1).name()),
+ TIMEOUT_MILLIS * 2
+ );
+ }
+
+ /**
+ * Tests the scenario when empty data nodes are not propagated to stable after filter is altered, because there are no node that
+ * matches the new filter.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void testEmptyDataNodesDoNotPropagatedToStableAfterAlteringFilter() throws Exception {
+ String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
+
+ IgniteImpl node0 = node(0);
+
+ Session session = node0.sql().createSession();
+
+ session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+ + "\"REPLICAS\" = 3, "
+ + "\"PARTITIONS\" = 2, "
+ + "\"DATA_NODES_FILTER\" = " + filter + ", "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 10000, "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 10000");
+
+ String tableName = "table1";
+
+ session.execute(null, "CREATE TABLE " + tableName + "("
+ + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) WITH PRIMARY_ZONE='TEST_ZONE'");
+
+ MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
+ .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+
+ TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
+
+ TableImpl table = (TableImpl) tableManager.table(tableName);
+
+ TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ null,
+ TIMEOUT_MILLIS
+ );
+
+ @Language("JSON") String firstNodeAttributes = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
+
+ // This node pass the filter
+ startNode(1, createStartConfig(firstNodeAttributes));
+
+ // Expected size is 2 because we have timers equals to 10000, so no scale up will be propagated.
+ waitDataNodeAndListenersAreHandled(metaStorageManager, 1);
+
+ // There is no node that match the filter
+ String newFilter = "'$[?(@.region == \"FOO\" && @.storage == \"BAR\")]'";
+
+ session.execute(null, "ALTER ZONE \"TEST_ZONE\" SET "
+ + "\"DATA_NODES_FILTER\" = " + newFilter);
+
+ waitDataNodeAndListenersAreHandled(metaStorageManager, 2);
+
+ assertPendingStableAreNull(metaStorageManager, partId);
+ }
+
/**
* Tests the scenario when removal node from the logical topology leads to empty data nodes, but this empty data nodes do not trigger
* rebalance. Data nodes become empty after applying corresponding filter.
@@ -336,6 +464,14 @@ public class ItDistributionZonesFilterTest extends ClusterPerTestIntegrationTest
// We wait for all data nodes listeners are triggered and all their meta storage activity is done.
assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= fakeEntry.revision(), 5_000));
+
+ assertValueInStorage(
+ metaStorageManager,
+ zoneDataNodesKey(1),
+ (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+ expectedDataNodesSize,
+ TIMEOUT_MILLIS
+ );
}
private static void assertPendingStableAreNull(