You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/05/17 17:18:28 UTC
[ignite-3] branch main updated: IGNITE-18955 The ability to use filters when data nodes are calculated added. (#1958)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9ca4914616 IGNITE-18955 The ability to use filters when data nodes are calculated added. (#1958)
9ca4914616 is described below
commit 9ca4914616a719f376b2aa8dc394e5cabbfd6cbd
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Wed May 17 21:18:22 2023 +0400
IGNITE-18955 The ability to use filters when data nodes are calculated added. (#1958)
---
modules/distribution-zones/build.gradle | 3 +
.../distributionzones/DistributionZoneManager.java | 286 +++++++-------
.../distributionzones/DistributionZonesUtil.java | 47 ++-
.../ignite/internal/distributionzones/Node.java | 72 ++++
.../distributionzones/NodeWithAttributes.java | 80 ++++
.../rebalance/DistributionZoneRebalanceEngine.java | 14 +-
.../BaseDistributionZoneManagerTest.java | 7 +
.../DistributionZoneAwaitDataNodesTest.java | 149 ++++----
.../DistributionZoneFiltersTest.java | 25 +-
...ibutionZoneManagerConfigurationChangesTest.java | 23 +-
.../DistributionZoneManagerFilterTest.java | 143 +++++++
...butionZoneManagerLogicalTopologyEventsTest.java | 4 +-
.../DistributionZoneManagerScaleUpTest.java | 424 ++++++++++-----------
.../DistributionZoneManagerWatchListenerTest.java | 68 ++--
.../DistributionZoneRebalanceEngineTest.java | 8 +-
.../util/DistributionZonesTestUtil.java | 251 ------------
.../DistributionZonesTestUtil.java | 353 +++++++++++++++++
.../zones/ItDistributionZonesFilterTest.java | 272 +++++++++++++
.../sql/engine/exec/ddl/DdlCommandHandler.java | 4 +
.../internal/table/distributed/TableManager.java | 3 +-
20 files changed, 1478 insertions(+), 758 deletions(-)
diff --git a/modules/distribution-zones/build.gradle b/modules/distribution-zones/build.gradle
index 9219929c8c..c0d38eff4f 100644
--- a/modules/distribution-zones/build.gradle
+++ b/modules/distribution-zones/build.gradle
@@ -64,9 +64,12 @@ dependencies {
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.mockito.core
testFixturesImplementation libs.mockito.junit
+ testFixturesImplementation libs.hamcrest.core
testFixturesImplementation project(':ignite-raft-api')
testFixturesImplementation project(':ignite-metastorage')
testFixturesImplementation project(':ignite-schema')
+ testFixturesImplementation project(':ignite-cluster-management')
+ testFixturesImplementation project(':ignite-vault')
testFixturesImplementation(testFixtures(project(':ignite-core')))
testFixturesImplementation(testFixtures(project(':ignite-metastorage')))
}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 15debe3b5a..6ee1017f77 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -24,11 +24,11 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.isZoneExist;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.toDataNodesMap;
@@ -129,7 +129,6 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.lang.NodeStoppingException;
-import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -238,7 +237,14 @@ public class DistributionZoneManager implements IgniteComponent {
* The logical topology on the last watch event.
* It's enough to mark this field by volatile because we don't update the collection after it is assigned to the field.
*/
- private volatile Set<String> logicalTopology;
+ private volatile Set<NodeWithAttributes> logicalTopology;
+
+ /**
+ * Local mapping of {@code nodeId} -> node's attributes, where {@code nodeId} is a node id, that changes between restarts.
+ * This map is updated every time we receive a topology event in a {@code topologyWatchListener}.
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-19491 properly clean up this map
+ */
+ private final Map<String, Map<String, String>> nodesAttributes;
/** Watch listener for logical topology keys. */
private final WatchListener topologyWatchListener;
@@ -281,6 +287,8 @@ public class DistributionZoneManager implements IgniteComponent {
logicalTopology = emptySet();
+ nodesAttributes = new ConcurrentHashMap<>();
+
executor = new ScheduledThreadPoolExecutor(
Math.min(Runtime.getRuntime().availableProcessors() * 3, 20),
new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG),
@@ -302,9 +310,73 @@ public class DistributionZoneManager implements IgniteComponent {
);
}
- @TestOnly
- Map<Integer, ZoneState> zonesTimers() {
- return zonesState;
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+
+ try {
+ ZonesConfigurationListener zonesConfigurationListener = new ZonesConfigurationListener();
+
+ zonesConfiguration.distributionZones().listenElements(zonesConfigurationListener);
+ zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
+ zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+
+ zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
+ zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
+ zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
+
+ rebalanceEngine.start();
+
+ // Init timers after restart.
+ zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
+
+ zonesConfiguration.distributionZones().value().forEach(zone -> {
+ int zoneId = zone.zoneId();
+
+ zonesState.putIfAbsent(zoneId, new ZoneState(executor));
+ });
+
+ logicalTopologyService.addEventListener(topologyEventListener);
+
+ metaStorageManager.registerPrefixWatch(zoneLogicalTopologyPrefix(), topologyWatchListener);
+ metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(), dataNodesWatchListener);
+
+ initDataNodesFromVaultManager();
+
+ initLogicalTopologyAndVersionInMetaStorageOnStart();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ rebalanceEngine.stop();
+
+ logicalTopologyService.removeEventListener(topologyEventListener);
+
+ metaStorageManager.unregisterWatch(topologyWatchListener);
+ metaStorageManager.unregisterWatch(dataNodesWatchListener);
+
+ //Need to update trackers with max possible value to complete all futures that are waiting for trackers.
+ topVerTracker.update(Long.MAX_VALUE, null);
+
+ zonesState.values().forEach(zoneState -> {
+ zoneState.scaleUpRevisionTracker().update(Long.MAX_VALUE, null);
+ zoneState.scaleDownRevisionTracker().update(Long.MAX_VALUE, null);
+ });
+
+ shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
}
/**
@@ -611,7 +683,7 @@ public class DistributionZoneManager implements IgniteComponent {
return allOf(
timerValuesFut.thenCompose(timerValues -> scaleUpAwaiting(zoneId, timerValues.get1())),
timerValuesFut.thenCompose(timerValues -> scaleDownAwaiting(zoneId, timerValues.get2()))
- ).thenCompose(ignored -> getDataNodesFuture(zoneId));
+ ).thenApply(ignored -> dataNodes(zoneId));
}
/**
@@ -703,65 +775,24 @@ public class DistributionZoneManager implements IgniteComponent {
}
/**
- * Returns the future with data nodes of the specified zone.
+ * Returns the data nodes of the specified zone.
*
* @param zoneId Zone id.
- * @return Future.
+ * @return The latest data nodes.
*/
- private CompletableFuture<Set<String>> getDataNodesFuture(int zoneId) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 Proper causality token based implementation is expected.
+ public Set<String> dataNodes(int zoneId) {
return inBusyLock(busyLock, () -> {
ZoneState zoneState = zonesState.get(zoneId);
if (zoneState != null) {
- return completedFuture(zonesState.get(zoneId).nodes());
+ return zonesState.get(zoneId).nodes();
} else {
- return failedFuture(new DistributionZoneWasRemovedException(zoneId));
+ throw new DistributionZoneWasRemovedException(zoneId);
}
});
}
- /** {@inheritDoc} */
- @Override
- public void start() {
- if (!busyLock.enterBusy()) {
- throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());
- }
-
- try {
- ZonesConfigurationListener zonesConfigurationListener = new ZonesConfigurationListener();
-
- zonesConfiguration.distributionZones().listenElements(zonesConfigurationListener);
- zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
- zonesConfiguration.distributionZones().any().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
-
- zonesConfiguration.defaultDistributionZone().listen(zonesConfigurationListener);
- zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleUp().listen(onUpdateScaleUp());
- zonesConfiguration.defaultDistributionZone().dataNodesAutoAdjustScaleDown().listen(onUpdateScaleDown());
-
- rebalanceEngine.start();
-
- // Init timers after restart.
- zonesState.putIfAbsent(DEFAULT_ZONE_ID, new ZoneState(executor));
-
- zonesConfiguration.distributionZones().value().forEach(zone -> {
- int zoneId = zone.zoneId();
-
- zonesState.putIfAbsent(zoneId, new ZoneState(executor));
- });
-
- logicalTopologyService.addEventListener(topologyEventListener);
-
- metaStorageManager.registerPrefixWatch(zoneLogicalTopologyPrefix(), topologyWatchListener);
- metaStorageManager.registerPrefixWatch(zonesDataNodesPrefix(), dataNodesWatchListener);
-
- initDataNodesFromVaultManager();
-
- initLogicalTopologyAndVersionInMetaStorageOnStart();
- } finally {
- busyLock.leaveBusy();
- }
- }
-
/**
* Creates configuration listener for updates of scale up value.
*
@@ -850,33 +881,6 @@ public class DistributionZoneManager implements IgniteComponent {
};
}
- /** {@inheritDoc} */
- @Override
- public void stop() throws Exception {
- if (!stopGuard.compareAndSet(false, true)) {
- return;
- }
-
- busyLock.block();
-
- rebalanceEngine.stop();
-
- logicalTopologyService.removeEventListener(topologyEventListener);
-
- metaStorageManager.unregisterWatch(topologyWatchListener);
- metaStorageManager.unregisterWatch(dataNodesWatchListener);
-
- //Need to update trackers with max possible value to complete all futures that are waiting for trackers.
- topVerTracker.update(Long.MAX_VALUE, null);
-
- zonesState.values().forEach(zoneState -> {
- zoneState.scaleUpRevisionTracker().update(Long.MAX_VALUE, null);
- zoneState.scaleDownRevisionTracker().update(Long.MAX_VALUE, null);
- });
-
- shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
- }
-
private class ZonesConfigurationListener implements ConfigurationNamedListListener<DistributionZoneView> {
@Override
public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<DistributionZoneView> ctx) {
@@ -886,7 +890,11 @@ public class DistributionZoneManager implements IgniteComponent {
zonesState.putIfAbsent(zoneId, zoneState);
- saveDataNodesAndUpdateTriggerKeysInMetaStorage(zoneId, ctx.storageRevision(), logicalTopology);
+ saveDataNodesAndUpdateTriggerKeysInMetaStorage(
+ zoneId,
+ ctx.storageRevision(),
+ logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
+ );
return completedFuture(null);
}
@@ -917,7 +925,7 @@ public class DistributionZoneManager implements IgniteComponent {
* @param revision Revision of an event that has triggered this method.
* @param dataNodes Data nodes.
*/
- private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId, long revision, Set<String> dataNodes) {
+ private void saveDataNodesAndUpdateTriggerKeysInMetaStorage(int zoneId, long revision, Set<Node> dataNodes) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
@@ -1032,7 +1040,7 @@ public class DistributionZoneManager implements IgniteComponent {
}
try {
- Set<String> topologyFromCmg = newTopology.nodes().stream().map(ClusterNode::name).collect(toSet());
+ Set<LogicalNode> logicalTopology = newTopology.nodes();
Condition updateCondition;
@@ -1045,7 +1053,7 @@ public class DistributionZoneManager implements IgniteComponent {
Iif iff = iif(
updateCondition,
- updateLogicalTopologyAndVersion(topologyFromCmg, newTopology.version()),
+ updateLogicalTopologyAndVersion(logicalTopology, newTopology.version()),
ops().yield(false)
);
@@ -1054,19 +1062,19 @@ public class DistributionZoneManager implements IgniteComponent {
LOG.error(
"Failed to update distribution zones' logical topology and version keys [topology = {}, version = {}]",
e,
- Arrays.toString(topologyFromCmg.toArray()),
+ Arrays.toString(logicalTopology.toArray()),
newTopology.version()
);
} else if (res.getAsBoolean()) {
LOG.debug(
"Distribution zones' logical topology and version keys were updated [topology = {}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
+ Arrays.toString(logicalTopology.toArray()),
newTopology.version()
);
} else {
LOG.debug(
"Failed to update distribution zones' logical topology and version keys [topology = {}, version = {}]",
- Arrays.toString(topologyFromCmg.toArray()),
+ Arrays.toString(logicalTopology.toArray()),
newTopology.version()
);
}
@@ -1101,7 +1109,7 @@ public class DistributionZoneManager implements IgniteComponent {
byte[] topVerFromMetaStorage = topVerEntry.value();
if (topVerFromMetaStorage == null || bytesToLong(topVerFromMetaStorage) < topologyVersionFromCmg) {
- Set<String> topologyFromCmg = snapshot.nodes().stream().map(ClusterNode::name).collect(toSet());
+ Set<LogicalNode> topologyFromCmg = snapshot.nodes();
Condition topologyVersionCondition = topVerFromMetaStorage == null
? notExists(zonesLogicalTopologyVersionKey()) :
@@ -1176,11 +1184,13 @@ public class DistributionZoneManager implements IgniteComponent {
logicalTopology = fromBytes(topologyEntry.value());
+ logicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n.nodeAttributes()));
+
// init keys and data nodes for default zone
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
DEFAULT_ZONE_ID,
appliedRevision,
- logicalTopology
+ logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
);
zonesConfiguration.distributionZones().value().forEach(zone -> {
@@ -1189,7 +1199,7 @@ public class DistributionZoneManager implements IgniteComponent {
saveDataNodesAndUpdateTriggerKeysInMetaStorage(
zoneId,
appliedRevision,
- logicalTopology
+ logicalTopology.stream().map(NodeWithAttributes::node).collect(toSet())
);
});
}
@@ -1199,7 +1209,7 @@ public class DistributionZoneManager implements IgniteComponent {
zoneState.scaleDownRevisionTracker().update(lastScaleDownRevision, null);
- zoneState.nodes(logicalTopology);
+ zoneState.nodes(logicalTopology.stream().map(NodeWithAttributes::nodeName).collect(toSet()));
});
assert topologyEntry == null || topologyEntry.value() == null || logicalTopology.equals(fromBytes(topologyEntry.value()))
@@ -1232,7 +1242,7 @@ public class DistributionZoneManager implements IgniteComponent {
byte[] newLogicalTopologyBytes = null;
- Set<String> newLogicalTopology = null;
+ Set<NodeWithAttributes> newLogicalTopology = null;
long revision = 0;
@@ -1253,13 +1263,19 @@ public class DistributionZoneManager implements IgniteComponent {
assert newLogicalTopology != null : "The event doesn't contain logical topology";
assert revision > 0 : "The event doesn't contain logical topology version";
- Set<String> newLogicalTopology0 = newLogicalTopology;
+ Set<NodeWithAttributes> newLogicalTopology0 = newLogicalTopology;
- Set<String> removedNodes =
- logicalTopology.stream().filter(node -> !newLogicalTopology0.contains(node)).collect(toSet());
+ Set<Node> removedNodes =
+ logicalTopology.stream()
+ .filter(node -> !newLogicalTopology0.contains(node))
+ .map(NodeWithAttributes::node)
+ .collect(toSet());
- Set<String> addedNodes =
- newLogicalTopology.stream().filter(node -> !logicalTopology.contains(node)).collect(toSet());
+ Set<Node> addedNodes =
+ newLogicalTopology.stream()
+ .filter(node -> !logicalTopology.contains(node))
+ .map(NodeWithAttributes::node)
+ .collect(toSet());
//Firstly update lastScaleUpRevision and lastScaleDownRevision then update topVerTracker to ensure thread-safety.
if (!addedNodes.isEmpty()) {
@@ -1272,6 +1288,8 @@ public class DistributionZoneManager implements IgniteComponent {
topVerTracker.update(topVer, null);
+ newLogicalTopology.forEach(n -> nodesAttributes.put(n.nodeId(), n.nodeAttributes()));
+
logicalTopology = newLogicalTopology;
NamedConfigurationTree<DistributionZoneConfiguration, DistributionZoneView, DistributionZoneChange> zones =
@@ -1316,7 +1334,7 @@ public class DistributionZoneManager implements IgniteComponent {
try {
int zoneId = 0;
- Set<String> newDataNodes = null;
+ Set<Node> newDataNodes = null;
long scaleUpRevision = 0;
@@ -1331,7 +1349,7 @@ public class DistributionZoneManager implements IgniteComponent {
byte[] dataNodesBytes = e.value();
if (dataNodesBytes != null) {
- newDataNodes = dataNodes(fromBytes(dataNodesBytes));
+ newDataNodes = DistributionZonesUtil.dataNodes(fromBytes(dataNodesBytes));
} else {
newDataNodes = emptySet();
}
@@ -1355,7 +1373,9 @@ public class DistributionZoneManager implements IgniteComponent {
assert newDataNodes != null : "Data nodes was not initialized.";
- zoneState.nodes(newDataNodes);
+ String filter = getZoneById(zonesConfiguration, zoneId).filter().value();
+
+ zoneState.nodes(filterDataNodes(newDataNodes, filter, nodesAttributes()));
//Associates scale up meta storage revision and data nodes.
if (scaleUpRevision > 0) {
@@ -1390,8 +1410,8 @@ public class DistributionZoneManager implements IgniteComponent {
*/
private void scheduleTimers(
DistributionZoneView zoneCfg,
- Set<String> addedNodes,
- Set<String> removedNodes,
+ Set<Node> addedNodes,
+ Set<Node> removedNodes,
long revision
) {
scheduleTimers(
@@ -1416,8 +1436,8 @@ public class DistributionZoneManager implements IgniteComponent {
*/
void scheduleTimers(
DistributionZoneView zoneCfg,
- Set<String> addedNodes,
- Set<String> removedNodes,
+ Set<Node> addedNodes,
+ Set<Node> removedNodes,
long revision,
BiFunction<Integer, Long, CompletableFuture<Void>> saveDataNodesOnScaleUp,
BiFunction<Integer, Long, CompletableFuture<Void>> saveDataNodesOnScaleDown
@@ -1516,7 +1536,7 @@ public class DistributionZoneManager implements IgniteComponent {
return completedFuture(null);
}
- Map<String, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
+ Map<Node, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
long scaleUpTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleUpChangeTriggerKey(zoneId)));
@@ -1526,12 +1546,16 @@ public class DistributionZoneManager implements IgniteComponent {
return completedFuture(null);
}
- List<String> deltaToAdd = zoneState.nodesToBeAddedToDataNodes(scaleUpTriggerRevision, revision);
+ List<Node> deltaToAdd = zoneState.nodesToBeAddedToDataNodes(scaleUpTriggerRevision, revision);
- Map<String, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
+ Map<Node, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
deltaToAdd.forEach(n -> newDataNodes.merge(n, 1, Integer::sum));
+ // Update dataNodes, so nodeId will be updated with the latest seen data on the node.
+ // For example, node could be restarted with new nodeId, we need to update it in the data nodes.
+ deltaToAdd.forEach(n -> newDataNodes.put(n, newDataNodes.remove(n)));
+
// Remove redundant nodes that are not presented in the data nodes.
newDataNodes.entrySet().removeIf(e -> e.getValue() == 0);
@@ -1547,6 +1571,8 @@ public class DistributionZoneManager implements IgniteComponent {
.thenApply(StatementResult::getAsBoolean)
.thenCompose(invokeResult -> inBusyLock(busyLock, () -> {
if (invokeResult) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
+ // Currently we call clean up only on a node that successfully writes data nodes.
zoneState.cleanUp(Math.min(scaleDownTriggerRevision, revision));
} else {
LOG.debug("Updating data nodes for a zone has not succeeded [zoneId = {}]", zoneId);
@@ -1599,7 +1625,7 @@ public class DistributionZoneManager implements IgniteComponent {
return completedFuture(null);
}
- Map<String, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
+ Map<Node, Integer> dataNodesFromMetaStorage = extractDataNodes(values.get(zoneDataNodesKey(zoneId)));
long scaleUpTriggerRevision = extractChangeTriggerRevision(values.get(zoneScaleUpChangeTriggerKey(zoneId)));
@@ -1609,9 +1635,9 @@ public class DistributionZoneManager implements IgniteComponent {
return completedFuture(null);
}
- List<String> deltaToRemove = zoneState.nodesToBeRemovedFromDataNodes(scaleDownTriggerRevision, revision);
+ List<Node> deltaToRemove = zoneState.nodesToBeRemovedFromDataNodes(scaleDownTriggerRevision, revision);
- Map<String, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
+ Map<Node, Integer> newDataNodes = new HashMap<>(dataNodesFromMetaStorage);
deltaToRemove.forEach(n -> newDataNodes.merge(n, -1, Integer::sum));
@@ -1630,6 +1656,8 @@ public class DistributionZoneManager implements IgniteComponent {
.thenApply(StatementResult::getAsBoolean)
.thenCompose(invokeResult -> inBusyLock(busyLock, () -> {
if (invokeResult) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19491 Properly utilise this map
+ // Currently we call clean up only on a node that successfully writes data nodes.
zoneState.cleanUp(Math.min(scaleUpTriggerRevision, revision));
} else {
LOG.debug("Updating data nodes for a zone has not succeeded [zoneId = {}]", zoneId);
@@ -1884,7 +1912,7 @@ public class DistributionZoneManager implements IgniteComponent {
* Nodes that were associated with this event will be included.
* @return List of nodes that should be added to zone's data nodes.
*/
- List<String> nodesToBeAddedToDataNodes(long scaleUpRevision, long revision) {
+ List<Node> nodesToBeAddedToDataNodes(long scaleUpRevision, long revision) {
return accumulateNodes(scaleUpRevision, revision, true);
}
@@ -1897,7 +1925,7 @@ public class DistributionZoneManager implements IgniteComponent {
* Nodes that were associated with this event will be included.
* @return List of nodes that should be removed from zone's data nodes.
*/
- List<String> nodesToBeRemovedFromDataNodes(long scaleDownRevision, long revision) {
+ List<Node> nodesToBeRemovedFromDataNodes(long scaleDownRevision, long revision) {
return accumulateNodes(scaleDownRevision, revision, false);
}
@@ -1907,7 +1935,7 @@ public class DistributionZoneManager implements IgniteComponent {
* @param nodes Nodes to add to zone's data nodes.
* @param revision Revision of the event that triggered this addition.
*/
- void nodesToAddToDataNodes(Set<String> nodes, long revision) {
+ void nodesToAddToDataNodes(Set<Node> nodes, long revision) {
topologyAugmentationMap.put(revision, new Augmentation(nodes, true));
}
@@ -1917,7 +1945,7 @@ public class DistributionZoneManager implements IgniteComponent {
* @param nodes Nodes to remove from zone's data nodes.
* @param revision Revision of the event that triggered this addition.
*/
- void nodesToRemoveFromDataNodes(Set<String> nodes, long revision) {
+ void nodesToRemoveFromDataNodes(Set<Node> nodes, long revision) {
topologyAugmentationMap.put(revision, new Augmentation(nodes, false));
}
@@ -1931,11 +1959,11 @@ public class DistributionZoneManager implements IgniteComponent {
* @param addition Indicates whether we should accumulate nodes that should be added to data nodes, or removed.
* @return Accumulated nodes.
*/
- private List<String> accumulateNodes(long fromKey, long toKey, boolean addition) {
+ private List<Node> accumulateNodes(long fromKey, long toKey, boolean addition) {
return topologyAugmentationMap.subMap(fromKey, false, toKey, true).values()
.stream()
.filter(a -> a.addition == addition)
- .flatMap(a -> a.nodeNames.stream())
+ .flatMap(a -> a.nodes.stream())
.collect(toList());
}
@@ -2018,29 +2046,29 @@ public class DistributionZoneManager implements IgniteComponent {
*/
private static class Augmentation {
/** Names of the node. */
- Set<String> nodeNames;
+ Set<Node> nodes;
/** Flag that indicates whether {@code nodeNames} should be added or removed. */
boolean addition;
- Augmentation(Set<String> nodeNames, boolean addition) {
- this.nodeNames = nodeNames;
+ Augmentation(Set<Node> nodes, boolean addition) {
+ this.nodes = nodes;
this.addition = addition;
}
}
/**
- * Returns a data nodes set for the specified zone.
+ * Returns local mapping of {@code nodeId} -> node's attributes, where {@code nodeId} is a node id, that changes between restarts.
+ * This map is updated every time we receive a topology event in a {@code topologyWatchListener}.
*
- * @param zoneId Zone id.
- * @return Data nodes set.
+ * @return Mapping {@code nodeId} -> node's attributes.
*/
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19425 Proper causality token based implementation is expected.
- public Set<String> getDataNodesByZoneId(int zoneId) {
- return inBusyLock(busyLock, () -> {
- ZoneState zoneState = zonesState.get(zoneId);
+ public Map<String, Map<String, String>> nodesAttributes() {
+ return nodesAttributes;
+ }
- return zoneState.nodes;
- });
+ @TestOnly
+ Map<Integer, ZoneState> zonesTimers() {
+ return zonesState;
}
}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
index 371b6c337a..0a4a662922 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.distributionzones;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_FILTER;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.and;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
@@ -38,7 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
@@ -307,10 +309,14 @@ public class DistributionZonesUtil {
* @param topologyVersion Logical topology version.
* @return Update command for the meta storage.
*/
- static Update updateLogicalTopologyAndVersion(Set<String> logicalTopology, long topologyVersion) {
+ public static Update updateLogicalTopologyAndVersion(Set<LogicalNode> logicalTopology, long topologyVersion) {
+ Set<NodeWithAttributes> topologyFromCmg = logicalTopology.stream()
+ .map(n -> new NodeWithAttributes(n.name(), n.id(), n.nodeAttributes()))
+ .collect(toSet());
+
return ops(
put(zonesLogicalTopologyVersionKey(), ByteUtils.longToBytes(topologyVersion)),
- put(zonesLogicalTopologyKey(), ByteUtils.toBytes(logicalTopology))
+ put(zonesLogicalTopologyKey(), ByteUtils.toBytes(topologyFromCmg))
).yield(true);
}
@@ -322,20 +328,20 @@ public class DistributionZonesUtil {
* Joining increases the counter, leaving decreases.
* @return Returns a set of data nodes retrieved from data nodes map, which value is more than 0.
*/
- public static Set<String> dataNodes(Map<String, Integer> dataNodesMap) {
- return dataNodesMap.entrySet().stream().filter(e -> e.getValue() > 0).map(Map.Entry::getKey).collect(Collectors.toSet());
+ public static Set<Node> dataNodes(Map<Node, Integer> dataNodesMap) {
+ return dataNodesMap.entrySet().stream().filter(e -> e.getValue() > 0).map(Map.Entry::getKey).collect(toSet());
}
/**
- * Returns a map from a set of data nodes. This map has the following structure: node name is mapped to integer,
+ * Returns a map from a set of data nodes. This map has the following structure: node is mapped to integer,
* integer represents how often node joined or leaved topology. In this case, set of nodes is interpreted as nodes
* that joined topology, so all mappings will be node -> 1.
*
* @param dataNodes Set of data nodes.
* @return Returns a map from a set of data nodes.
*/
- public static Map<String, Integer> toDataNodesMap(Set<String> dataNodes) {
- Map<String, Integer> dataNodesMap = new HashMap<>();
+ public static Map<Node, Integer> toDataNodesMap(Set<Node> dataNodes) {
+ Map<Node, Integer> dataNodesMap = new HashMap<>();
dataNodes.forEach(n -> dataNodesMap.merge(n, 1, Integer::sum));
@@ -348,7 +354,7 @@ public class DistributionZonesUtil {
* @param dataNodesEntry Meta storage entry with data nodes.
* @return Data nodes.
*/
- static Map<String, Integer> extractDataNodes(Entry dataNodesEntry) {
+ static Map<Node, Integer> extractDataNodes(Entry dataNodesEntry) {
if (!dataNodesEntry.empty()) {
return fromBytes(dataNodesEntry.value());
} else {
@@ -453,6 +459,9 @@ public class DistributionZonesUtil {
* @return True if {@code nodeAttributes} satisfy {@code filter}, false otherwise. Returns true if {@code nodeAttributes} is empty.
*/
public static boolean filter(Map<String, String> nodeAttributes, String filter) {
+ if (filter.equals(DEFAULT_FILTER)) {
+ return true;
+ }
// We need to convert numbers to Long objects, so they could be parsed to numbers in JSON.
// nodeAttributes has String values of numbers because nodeAttributes come from configuration,
// but configuration does not support Object as a configuration value.
@@ -476,4 +485,24 @@ public class DistributionZonesUtil {
return !res.isEmpty();
}
+
+ /**
+ * Filters {@code dataNodes} according to the provided {@code filter}.
+ * Nodes' attributes are taken from {@code nodesAttributes} map.
+ *
+ * @param dataNodes Data nodes.
+ * @param filter Filter for data nodes.
+ * @param nodesAttributes Nodes' attributes which used for filtering.
+ * @return Filtered data nodes.
+ */
+ public static Set<String> filterDataNodes(
+ Set<Node> dataNodes,
+ String filter,
+ Map<String, Map<String, String>> nodesAttributes
+ ) {
+ return dataNodes.stream()
+ .filter(n -> filter(nodesAttributes.get(n.nodeId()), filter))
+ .map(Node::nodeName)
+ .collect(toSet());
+ }
}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
new file mode 100644
index 0000000000..0927c2c4e3
--- /dev/null
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/Node.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.Serializable;
+
+/**
+ * Node representation that we store in data nodes.
+ * Includes {@code nodeName} that is unique identifier of the node, that is not changing after a restart, and
+ * {@code nodeId} that is unique identifier of a node, that changes after a restart.
+ * {@code nodeId} is needed to get node's attributes from the local state of the distribution zone manager.
+ */
+public class Node implements Serializable {
+ private static final long serialVersionUID = 875461392587175703L;
+
+ private final String nodeName;
+
+ private final String nodeId;
+
+ public Node(String nodeName, String nodeId) {
+ this.nodeName = nodeName;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public int hashCode() {
+ return nodeName.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ Node that = (Node) obj;
+
+ return nodeName().equals(that.nodeName());
+ }
+
+ public String nodeName() {
+ return nodeName;
+ }
+
+ public String nodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return nodeName;
+ }
+}
\ No newline at end of file
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
new file mode 100644
index 0000000000..eb11971d4b
--- /dev/null
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/NodeWithAttributes.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+
+/**
+ * Structure that represents node with the attributes and which we store in Meta Storage when we store logical topology.
+ * Light-weighted version of the {@link LogicalNode}.
+ */
+public class NodeWithAttributes implements Serializable {
+ private static final long serialVersionUID = -7778967985161743937L;
+
+ private final Node node;
+
+ private final Map<String, String> nodeAttributes;
+
+ public NodeWithAttributes(String nodeName, String nodeId, Map<String, String> nodeAttributes) {
+ this.node = new Node(nodeName, nodeId);
+ this.nodeAttributes = nodeAttributes;
+ }
+
+ @Override
+ public int hashCode() {
+ return node.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ NodeWithAttributes that = (NodeWithAttributes) obj;
+
+ return node.equals(that.node);
+ }
+
+ public String nodeName() {
+ return node.nodeName();
+ }
+
+ public String nodeId() {
+ return node.nodeId();
+ }
+
+ public Node node() {
+ return node;
+ }
+
+ public Map<String, String> nodeAttributes() {
+ return nodeAttributes;
+ }
+
+ @Override
+ public String toString() {
+ return node.toString();
+ }
+}
diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index 601ffc0944..e7c116e951 100644
--- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.dataNodes;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractZoneId;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.filterDataNodes;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.updatePendingAssignmentsKeys;
@@ -38,6 +39,7 @@ import org.apache.ignite.configuration.NamedConfigurationTree;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
@@ -163,7 +165,7 @@ public class DistributionZoneRebalanceEngine {
int zoneId = extractZoneId(evt.entryEvent().newEntry().key());
- Set<String> dataNodes = dataNodes(ByteUtils.fromBytes(dataNodesBytes));
+ Set<Node> dataNodes = dataNodes(ByteUtils.fromBytes(dataNodesBytes));
for (int i = 0; i < tables.value().size(); i++) {
TableView tableView = tables.value().get(i);
@@ -173,6 +175,12 @@ public class DistributionZoneRebalanceEngine {
DistributionZoneConfiguration distributionZoneConfiguration =
getZoneById(zonesConfiguration, tableZoneId);
+ Set<String> filteredDataNodes = filterDataNodes(
+ dataNodes,
+ distributionZoneConfiguration.filter().value(),
+ distributionZoneManager.nodesAttributes()
+ );
+
if (zoneId == tableZoneId) {
TableConfiguration tableCfg = tables.get(tableView.name());
@@ -194,7 +202,7 @@ public class DistributionZoneRebalanceEngine {
updatePendingAssignmentsKeys(
tableView.name(),
replicaGrpId,
- dataNodes,
+ filteredDataNodes,
replicas,
evt.entryEvent().newEntry().revision(),
metaStorageManager,
@@ -271,7 +279,7 @@ public class DistributionZoneRebalanceEngine {
futs[furCur++] = updatePendingAssignmentsKeys(
tblCfg.name().value(),
replicaGrpId,
- distributionZoneManager.getDataNodesByZoneId(zoneCfg.zoneId()),
+ distributionZoneManager.dataNodes(zoneCfg.zoneId()),
newReplicas,
replicasCtx.storageRevision(),
metaStorageManager,
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index b2d536e116..2fbe8a4044 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -139,4 +140,10 @@ public class BaseDistributionZoneManagerTest extends BaseIgniteAbstractTest {
IgniteUtils.closeAll(components.stream().map(c -> c::beforeNodeStop));
IgniteUtils.closeAll(components.stream().map(c -> c::stop));
}
+
+ void startDistributionZoneManager() throws Exception {
+ deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
+
+ distributionZoneManager.start();
+ }
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
index 925e77ce79..cb4ef2acb4 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneAwaitDataNodesTest.java
@@ -17,19 +17,20 @@
package org.apache.ignite.internal.distributionzones;
+import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.setLogicalTopologyInMetaStorage;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
@@ -42,12 +43,12 @@ import static org.mockito.Mockito.doAnswer;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
@@ -56,8 +57,6 @@ import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNo
import org.apache.ignite.internal.distributionzones.exception.DistributionZoneWasRemovedException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterNode;
@@ -81,6 +80,12 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
private static final String ZONE_NAME_2 = "zone2";
+ private static final LogicalNode NODE_0 = new LogicalNode("node0", "node0", new NetworkAddress("localhost", 123));
+
+ private static final LogicalNode NODE_1 = new LogicalNode("node1", "node1", new NetworkAddress("localhost", 123));
+
+ private static final LogicalNode NODE_2 = new LogicalNode("node2", "node2", new NetworkAddress("localhost", 123));
+
/**
* This test invokes {@link DistributionZoneManager#topologyVersionedDataNodes(int, long)} with default and non-default zone id and
* different logical topology versions. Simulates new logical topology with new nodes and with removed nodes. Check that data nodes
@@ -119,18 +124,19 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
int topVer0 = 2;
- Set<String> threeNodes = Set.of("node0", "node1", "node2");
+ Set<LogicalNode> threeNodes = Set.of(NODE_0, NODE_1, NODE_2);
+ Set<String> threeNodesNames = Set.of(NODE_0.name(), NODE_1.name(), NODE_2.name());
- setLogicalTopologyInMetaStorage(threeNodes, topVer0);
+ setLogicalTopologyInMetaStorage(threeNodes, topVer0, metaStorageManager);
- assertEquals(threeNodes, dataNodesUpFut0.get(5, SECONDS));
- assertEquals(threeNodes, dataNodesUpFut1.get(3, SECONDS));
- assertEquals(threeNodes, dataNodesUpFut2.get(3, SECONDS));
+ assertEquals(threeNodesNames, dataNodesUpFut0.get(5, SECONDS));
+ assertEquals(threeNodesNames, dataNodesUpFut1.get(3, SECONDS));
+ assertEquals(threeNodesNames, dataNodesUpFut2.get(3, SECONDS));
- assertEquals(threeNodes, dataNodesUpFut4.get(3, SECONDS));
- assertEquals(threeNodes, dataNodesUpFut5.get(3, SECONDS));
- assertEquals(threeNodes, dataNodesUpFut6.get(3, SECONDS));
- assertEquals(threeNodes, dataNodesUpFut7.get(3, SECONDS));
+ assertEquals(threeNodesNames, dataNodesUpFut4.get(3, SECONDS));
+ assertEquals(threeNodesNames, dataNodesUpFut5.get(3, SECONDS));
+ assertEquals(threeNodesNames, dataNodesUpFut6.get(3, SECONDS));
+ assertEquals(threeNodesNames, dataNodesUpFut7.get(3, SECONDS));
assertFalse(dataNodesUpFut3.isDone());
LOG.info("Topology with removed nodes.");
@@ -146,29 +152,31 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
int topVer1 = 5;
- Set<String> twoNodes = Set.of("node0", "node1");
+ Set<LogicalNode> twoNodes = Set.of(NODE_0, NODE_1);
+ Set<String> twoNodesNames = Set.of(NODE_0.name(), NODE_1.name());
- setLogicalTopologyInMetaStorage(twoNodes, topVer1);
+ setLogicalTopologyInMetaStorage(twoNodes, topVer1, metaStorageManager);
- assertEquals(twoNodes, dataNodesDownFut0.get(3, SECONDS));
- assertEquals(twoNodes, dataNodesDownFut1.get(3, SECONDS));
- assertEquals(twoNodes, dataNodesDownFut2.get(3, SECONDS));
- assertEquals(twoNodes, dataNodesDownFut4.get(3, SECONDS));
- assertEquals(twoNodes, dataNodesDownFut5.get(3, SECONDS));
- assertEquals(twoNodes, dataNodesDownFut6.get(3, SECONDS));
- assertEquals(twoNodes, dataNodesDownFut7.get(3, SECONDS));
+ assertEquals(twoNodesNames, dataNodesDownFut0.get(3, SECONDS));
+ assertEquals(twoNodesNames, dataNodesDownFut1.get(3, SECONDS));
+ assertEquals(twoNodesNames, dataNodesDownFut2.get(3, SECONDS));
+ assertEquals(twoNodesNames, dataNodesDownFut4.get(3, SECONDS));
+ assertEquals(twoNodesNames, dataNodesDownFut5.get(3, SECONDS));
+ assertEquals(twoNodesNames, dataNodesDownFut6.get(3, SECONDS));
+ assertEquals(twoNodesNames, dataNodesDownFut7.get(3, SECONDS));
assertFalse(dataNodesDownFut3.isDone());
int topVer2 = 20;
LOG.info("Topology with added and removed nodes.");
- Set<String> dataNodes = Set.of("node0", "node2");
+ Set<LogicalNode> dataNodes = Set.of(NODE_0, NODE_1);
+ Set<String> dataNodesNames = Set.of(NODE_0.name(), NODE_1.name());
- setLogicalTopologyInMetaStorage(dataNodes, topVer2);
+ setLogicalTopologyInMetaStorage(dataNodes, topVer2, metaStorageManager);
- assertEquals(dataNodes, dataNodesUpFut3.get(3, SECONDS));
- assertEquals(dataNodes, dataNodesDownFut3.get(3, SECONDS));
+ assertEquals(dataNodesNames, dataNodesUpFut3.get(3, SECONDS));
+ assertEquals(dataNodesNames, dataNodesDownFut3.get(3, SECONDS));
}
/**
@@ -184,23 +192,25 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
long topVer = 100;
- Set<String> dataNodes0 = Set.of("node0", "node1");
+ Set<LogicalNode> dataNodes = Set.of(NODE_0, NODE_1);
+ Set<String> dataNodesNames = Set.of(NODE_0.name(), NODE_1.name());
- setLogicalTopologyInMetaStorage(dataNodes0, topVer);
+ setLogicalTopologyInMetaStorage(dataNodes, topVer, metaStorageManager);
assertFalse(dataNodesFut.isDone());
- assertEquals(dataNodes0, dataNodesFut.get(3, SECONDS));
+ assertEquals(dataNodesNames, dataNodesFut.get(3, SECONDS));
dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 106);
- Set<String> dataNodes1 = Set.of("node0");
+ Set<LogicalNode> dataNodes1 = Set.of(NODE_0);
+ Set<String> dataNodesNames1 = Set.of(NODE_0.name());
- setLogicalTopologyInMetaStorage(dataNodes1, topVer + 100);
+ setLogicalTopologyInMetaStorage(dataNodes1, topVer + 100, metaStorageManager);
assertFalse(dataNodesFut.isDone());
- assertEquals(dataNodes1, dataNodesFut.get(3, SECONDS));
+ assertEquals(dataNodesNames1, dataNodesFut.get(3, SECONDS));
}
/**
@@ -225,19 +235,20 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
CompletableFuture<Set<String>> dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(zoneId, 1);
- Set<String> nodes0 = Set.of("node0", "node1");
+ Set<LogicalNode> nodes = Set.of(NODE_0, NODE_1);
+ Set<String> nodesNames = Set.of(NODE_0.name(), NODE_1.name());
- setLogicalTopologyInMetaStorage(nodes0, 1);
+ setLogicalTopologyInMetaStorage(nodes, 1, metaStorageManager);
- assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+ assertEquals(nodesNames, dataNodesFut.get(3, SECONDS));
dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(zoneId, 2);
assertFalse(dataNodesFut.isDone());
- setLogicalTopologyInMetaStorage(Set.of("node0"), 2);
+ setLogicalTopologyInMetaStorage(Set.of(NODE_0), 2, metaStorageManager);
- assertEquals(nodes0, dataNodesFut.get(3, SECONDS));
+ assertEquals(nodesNames, dataNodesFut.get(3, SECONDS));
}
/**
@@ -279,9 +290,9 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
CompletableFuture<Set<String>> dataNodesFut = distributionZoneManager.topologyVersionedDataNodes(zoneId1, 1);
- Set<String> nodes0 = Set.of("node0", "node1");
+ Set<LogicalNode> nodes0 = Set.of(NODE_0, NODE_1);
- setLogicalTopologyInMetaStorage(nodes0, 1);
+ setLogicalTopologyInMetaStorage(nodes0, 1, metaStorageManager);
dataNodesFut.get(3, SECONDS);
@@ -293,25 +304,26 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
assertFalse(dataNodesFut1.isDone());
assertFalse(dataNodesFut1Zone2.isDone());
- Set<String> nodes1 = Set.of("node0");
+ Set<LogicalNode> nodes1 = Set.of(NODE_0);
+ Set<String> nodesNames1 = Set.of(NODE_0.name());
distributionZoneManager.alterZone(ZONE_NAME_1, new DistributionZoneConfigurationParameters.Builder(ZONE_NAME_1)
.dataNodesAutoAdjustScaleUp(INFINITE_TIMER_VALUE).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
.get(3, SECONDS);
- setLogicalTopologyInMetaStorage(nodes1, 2);
+ setLogicalTopologyInMetaStorage(nodes1, 2, metaStorageManager);
- assertEquals(nodes1, dataNodesFut1.get(3, SECONDS));
+ assertEquals(nodesNames1, dataNodesFut1.get(3, SECONDS));
CompletableFuture<Set<String>> dataNodesFut2 = distributionZoneManager.topologyVersionedDataNodes(zoneId1, 3);
- Set<String> nodes2 = Set.of("node0", "node1");
+ Set<LogicalNode> nodes2 = Set.of(NODE_0, NODE_1);
assertFalse(dataNodesFut2.isDone());
- setLogicalTopologyInMetaStorage(nodes2, 3);
+ setLogicalTopologyInMetaStorage(nodes2, 3, metaStorageManager);
- assertEquals(nodes1, dataNodesFut2.get(3, SECONDS));
+ assertEquals(nodesNames1, dataNodesFut2.get(3, SECONDS));
}
/**
@@ -338,9 +350,9 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
assertFalse(dataNodesFut.isDone());
- Set<String> nodes0 = Set.of("node0", "node1");
+ Set<LogicalNode> nodes0 = Set.of(NODE_0, NODE_1);
- setLogicalTopologyInMetaStorage(nodes0, 1);
+ setLogicalTopologyInMetaStorage(nodes0, 1, metaStorageManager);
assertEquals(emptySet(), dataNodesFut.get(3, SECONDS));
}
@@ -375,7 +387,7 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
return invocation.callRealMethod();
}).when(keyValueStorage).invoke(any(), any());
- setLogicalTopologyInMetaStorage(Set.of("node0"), 200);
+ setLogicalTopologyInMetaStorage(Set.of(NODE_0), 200, metaStorageManager);
assertFalse(dataNodesFut0.isDone());
@@ -401,7 +413,7 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
assertThat(distributionZoneManager.dropZone(ZONE_NAME_0), willSucceedIn(3, SECONDS));
- setLogicalTopologyInMetaStorage(Set.of("node0"), 200);
+ setLogicalTopologyInMetaStorage(Set.of(NODE_0), 200, metaStorageManager);
assertFalse(dataNodesFut0.isDone());
@@ -416,15 +428,16 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
void testScaleUpScaleDownAreChangedWhileAwaitingDataNodes() throws Exception {
startZoneManager();
- Set<String> nodes0 = Set.of("node0", "node1");
+ Set<LogicalNode> nodes0 = Set.of(NODE_0, NODE_1);
+ Set<String> nodesNames0 = Set.of(NODE_0.name(), NODE_1.name());
- setLogicalTopologyInMetaStorage(nodes0, 1);
+ setLogicalTopologyInMetaStorage(nodes0, 1, metaStorageManager);
CompletableFuture<Set<String>> dataNodesFut0 = distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 1);
- assertEquals(nodes0, dataNodesFut0.get(3, SECONDS));
+ assertEquals(nodesNames0, dataNodesFut0.get(3, SECONDS));
- Set<String> nodes1 = Set.of("node0", "node2");
+ Set<LogicalNode> nodes1 = Set.of(NODE_0, NODE_2);
CompletableFuture<Set<String>> dataNodesFut1 = distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2);
@@ -461,9 +474,9 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
return invocation.callRealMethod();
}).when(keyValueStorage).invoke(any(), any());
- setLogicalTopologyInMetaStorage(nodes1, 2);
+ setLogicalTopologyInMetaStorage(nodes1, 2, metaStorageManager);
- assertEquals(nodes0, dataNodesFut1.get(5, SECONDS));
+ assertEquals(nodes0.stream().map(ClusterNode::name).collect(Collectors.toSet()), dataNodesFut1.get(5, SECONDS));
scaleUpLatch.countDown();
scaleDownLatch.countDown();
@@ -474,7 +487,10 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
*/
@Test
void testInitializedDataNodesOnZoneManagerStart() throws Exception {
- Set<String> dataNodes = Set.of("node0", "node1");
+ Set<String> dataNodes0 = Set.of("node0", "node1");
+
+ Set<NodeWithAttributes> dataNodes = Set.of(new NodeWithAttributes("node0", "id_node0", emptyMap()),
+ new NodeWithAttributes("node1", "id_node1", emptyMap()));
Map<ByteArray, byte[]> valEntries = new HashMap<>();
@@ -483,12 +499,12 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
assertThat(vaultMgr.putAll(valEntries), willCompleteSuccessfully());
- topology.putNode(new LogicalNode(new ClusterNode("node0", "node0", new NetworkAddress("local", 1))));
- topology.putNode(new LogicalNode(new ClusterNode("node1", "node1", new NetworkAddress("local", 1))));
+ topology.putNode(new LogicalNode(new ClusterNode("id_node0", "node0", new NetworkAddress("local", 1))));
+ topology.putNode(new LogicalNode(new ClusterNode("id_node1", "node1", new NetworkAddress("local", 1))));
startZoneManager();
- assertEquals(dataNodes, distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2)
+ assertEquals(dataNodes0, distributionZoneManager.topologyVersionedDataNodes(DEFAULT_ZONE_ID, 2)
.get(3, SECONDS));
}
@@ -503,17 +519,4 @@ public class DistributionZoneAwaitDataNodesTest extends BaseDistributionZoneMana
.dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build())
.get(3, SECONDS);
}
-
- private void setLogicalTopologyInMetaStorage(Set<String> nodes, long topVer) {
- CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
- Conditions.exists(zonesLogicalTopologyKey()),
- List.of(
- Operations.put(zonesLogicalTopologyKey(), toBytes(nodes)),
- Operations.put(zonesLogicalTopologyVersionKey(), longToBytes(topVer))
- ),
- List.of(Operations.noop())
- );
-
- assertThat(invokeFuture, willBe(true));
- }
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java
index e3f2898342..d2eda4d0d9 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneFiltersTest.java
@@ -93,6 +93,13 @@ public class DistributionZoneFiltersTest {
@Test
void testNodeAttributesFilterScenario8() {
+ Map<String, String> newAttributesMap = Map.of();
+
+ assertTrue(filter(newAttributesMap, DEFAULT_FILTER));
+ }
+
+ @Test
+ void testNodeAttributesFilterScenario9() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$.[?(@.newValue == 100)]";
@@ -101,7 +108,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario9() {
+ void testNodeAttributesFilterScenario10() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$.[?(@.newValue != 100)]";
@@ -110,7 +117,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario10() {
+ void testNodeAttributesFilterScenario11() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$.[?(@.newValue && @.newValue != 100)]";
@@ -119,7 +126,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario11() {
+ void testNodeAttributesFilterScenario12() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$[?(@.dataRegionSize != 10)]";
@@ -128,7 +135,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario12() {
+ void testNodeAttributesFilterScenario13() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$[?(@.region != 'EU')]";
@@ -137,7 +144,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario13() {
+ void testNodeAttributesFilterScenario14() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$[?(@.region != 'EU' && @.dataRegionSize > 5)]";
@@ -146,7 +153,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario14() {
+ void testNodeAttributesFilterScenario15() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$[?(@.region != 'EU' && @.dataRegionSize < 5)]";
@@ -155,7 +162,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario15() {
+ void testNodeAttributesFilterScenario16() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$[?(@.dataRegionSize > 5 && @.dataRegionSize < 5)]";
@@ -164,7 +171,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario16() {
+ void testNodeAttributesFilterScenario17() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$[?(@.region == 'US' && @.region == 'EU')]";
@@ -173,7 +180,7 @@ public class DistributionZoneFiltersTest {
}
@Test
- void testNodeAttributesFilterScenario17() {
+ void testNodeAttributesFilterScenario18() {
Map<String, String> newAttributesMap = Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10");
String filter = "$[?(@.region == 'EU' || @.dataRegionSize > 5)]";
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index 07420d4750..9f70537995 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -20,15 +20,15 @@ package org.apache.ignite.internal.distributionzones;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZone;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZoneWithAttributes;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZonesChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZonesChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -74,7 +75,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
private static final String NEW_ZONE_NAME = "zone2";
- private static final Set<String> nodes = Set.of("name1");
+ private static final Set<NodeWithAttributes> nodes = Set.of(new NodeWithAttributes("name1", "name1", Collections.emptyMap()));
private DistributionZoneManager distributionZoneManager;
@@ -110,7 +111,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
LogicalTopologyService logicalTopologyService = mock(LogicalTopologyService.class);
Set<LogicalNode> logicalTopology = nodes.stream()
- .map(n -> new LogicalNode(n, n, new NetworkAddress(n, 10_000)))
+ .map(n -> new LogicalNode(n.nodeName(), n.nodeName(), new NetworkAddress(n.nodeName(), 10_000)))
.collect(toSet());
when(logicalTopologyService.logicalTopologyOnLeader())
@@ -135,7 +136,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
clusterCfgMgr.start();
metaStorageManager.start();
- deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
+ DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
distributionZoneManager.start();
@@ -156,7 +157,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- assertDataNodesForZone(1, nodes, keyValueStorage);
+ assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
assertZoneScaleUpChangeTriggerKey(1L, 1, keyValueStorage);
@@ -167,7 +168,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
void testZoneDeleteRemovesMetaStorageKey() throws Exception {
distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- assertDataNodesForZone(1, nodes, keyValueStorage);
+ assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
distributionZoneManager.dropZone(ZONE_NAME).get();
@@ -204,12 +205,12 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr
void testZoneDeleteDoNotRemoveMetaStorageKey() throws Exception {
distributionZoneManager.createZone(new DistributionZoneConfigurationParameters.Builder(ZONE_NAME).build()).get();
- assertDataNodesForZone(1, nodes, keyValueStorage);
+ assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
keyValueStorage.put(zonesChangeTriggerKey(1).bytes(), longToBytes(100), HybridTimestamp.MIN_VALUE);
distributionZoneManager.dropZone(ZONE_NAME).get();
- assertDataNodesForZone(1, nodes, keyValueStorage);
+ assertDataNodesForZoneWithAttributes(1, nodes.stream().map(NodeWithAttributes::node).collect(toSet()), keyValueStorage);
}
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
new file mode 100644
index 0000000000..9dc2523358
--- /dev/null
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerFilterTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones;
+
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests distribution zone manager interactions with data nodes filtering.
+ */
+public class DistributionZoneManagerFilterTest extends BaseDistributionZoneManagerTest {
+ private static final String ZONE_NAME = "zone1";
+
+ private static final LogicalNode A = new LogicalNode(
+ new ClusterNode("1", "A", new NetworkAddress("localhost", 123)),
+ Map.of("region", "US", "storage", "SSD", "dataRegionSize", "10")
+ );
+
+ private static final LogicalNode B = new LogicalNode(
+ new ClusterNode("2", "B", new NetworkAddress("localhost", 123)),
+ Map.of("region", "EU", "storage", "HHD", "dataRegionSize", "30")
+ );
+
+ private static final LogicalNode C = new LogicalNode(
+ new ClusterNode("3", "C", new NetworkAddress("localhost", 123)),
+ Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+ );
+
+ private static final LogicalNode D = new LogicalNode(
+ new ClusterNode("4", "D", new NetworkAddress("localhost", 123)),
+ Map.of("region", "CN", "storage", "SSD", "dataRegionSize", "20")
+ );
+
+ @Test
+ void testFilterOnScaleUp() throws Exception {
+ preparePrerequisites();
+
+ Set<String> nodes;
+
+ topology.putNode(D);
+
+ nodes = distributionZoneManager.topologyVersionedDataNodes(
+ 1,
+ topology.getLogicalTopology().version()
+ ).get(10_000, TimeUnit.MILLISECONDS);
+
+ assertEquals(Set.of(A, C, D).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+ }
+
+ @Test
+ void testFilterOnScaleDown() throws Exception {
+ preparePrerequisites();
+
+ Set<String> nodes;
+
+ topology.removeNodes(Set.of(C));
+
+ nodes = distributionZoneManager.topologyVersionedDataNodes(
+ 1,
+ topology.getLogicalTopology().version()
+ ).get(10_000, TimeUnit.MILLISECONDS);
+
+ assertEquals(Set.of(A).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+ }
+
+ @Test
+ void testFilterOnScaleUpWithNewAttributesAfterRestart() throws Exception {
+ preparePrerequisites();
+
+ Set<String> nodes;
+
+ topology.removeNodes(Set.of(B));
+
+ LogicalNode newB = new LogicalNode(
+ new ClusterNode("2", "newB", new NetworkAddress("localhost", 123)),
+ Map.of("region", "US", "storage", "HHD", "dataRegionSize", "30")
+ );
+
+ topology.putNode(newB);
+
+ nodes = distributionZoneManager.topologyVersionedDataNodes(
+ 1,
+ topology.getLogicalTopology().version()
+ ).get(10_000, TimeUnit.MILLISECONDS);
+
+ assertEquals(Set.of(A, newB, C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+ }
+
+ /**
+ * Starts distribution zone manager with a zone and checks that two out of three nodes, which match filter,
+ * are presented in the zones data nodes.
+ *
+ * @throws Exception If failed
+ */
+ private void preparePrerequisites() throws Exception {
+ String filter = "$[?(@.storage == 'SSD' || @.region == 'US')]";
+
+ topology.putNode(A);
+ topology.putNode(B);
+ topology.putNode(C);
+
+ startDistributionZoneManager();
+
+ distributionZoneManager.createZone(
+ new DistributionZoneConfigurationParameters.Builder(ZONE_NAME)
+ .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .filter(filter)
+ .build()
+ ).get(10_000, TimeUnit.MILLISECONDS);
+
+ Set<String> nodes = distributionZoneManager.topologyVersionedDataNodes(
+ 1,
+ topology.getLogicalTopology().version()
+ ).get(10_000, TimeUnit.MILLISECONDS);
+
+ assertEquals(Set.of(A, C).stream().map(ClusterNode::name).collect(Collectors.toSet()), nodes);
+ }
+}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index afaf723452..21e14195cb 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -18,9 +18,9 @@
package org.apache.ignite.internal.distributionzones;
import static org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl.LOGICAL_TOPOLOGY_KEY;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopology;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopologyVersion;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopologyVersion;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.Set;
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 80faf12dd2..25bcfe8920 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -21,23 +21,16 @@ import static org.apache.ignite.internal.distributionzones.DistributionZoneManag
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZone;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZoneWithAttributes;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopology;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZoneScaleDownChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertLogicalTopology;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleDownChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertZoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
-import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -48,13 +41,12 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.stream.Collectors;
import org.apache.ignite.configuration.NamedConfigurationTree;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
@@ -63,10 +55,7 @@ import org.apache.ignite.internal.distributionzones.configuration.DistributionZo
import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.server.If;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -79,11 +68,21 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
private static final int ZONE_1_ID = 1;
- private static final LogicalNode NODE_1 = new LogicalNode("1", "A", new NetworkAddress("localhost", 123));
+ private static final LogicalNode NODE_1 = new LogicalNode("1", "node1", new NetworkAddress("localhost", 123));
- private static final LogicalNode NODE_2 = new LogicalNode("2", "B", new NetworkAddress("localhost", 123));
+ private static final LogicalNode NODE_2 = new LogicalNode("2", "node2", new NetworkAddress("localhost", 123));
- private static final LogicalNode NODE_3 = new LogicalNode("3", "C", new NetworkAddress("localhost", 123));
+ private static final LogicalNode NODE_3 = new LogicalNode("3", "node3", new NetworkAddress("localhost", 123));
+
+ private static final Node A = new Node("A", "id_A");
+
+ private static final Node B = new Node("B", "id_B");
+
+ private static final Node C = new Node("C", "id_C");
+
+ private static final Node D = new Node("D", "id_D");
+
+ private static final Node E = new Node("E", "id_E");
private long prerequisiteRevision;
@@ -95,7 +94,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
startDistributionZoneManager();
- assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
topology.putNode(NODE_2);
@@ -110,7 +109,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
.build()
).get();
- assertDataNodesForZone(ZONE_1_ID, clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
topology.putNode(NODE_3);
@@ -118,8 +117,8 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertLogicalTopology(clusterNodes3, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, clusterNodes3.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
- assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes3.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, clusterNodes3, keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes3, keyValueStorage);
}
@Test
@@ -139,8 +138,8 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.putNode(NODE_1);
- assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1.name()), keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
}
@Test
@@ -149,18 +148,17 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.putNode(NODE_2);
- Set<String> clusterNodesNames = Set.of(NODE_1.name(), NODE_2.name());
Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
startDistributionZoneManager();
- assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames, keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
distributionZoneManager.createZone(
new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build()
).get();
- assertDataNodesForZone(ZONE_1_ID, clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, clusterNodes, keyValueStorage);
topology.removeNodes(Set.of(NODE_2));
@@ -168,10 +166,10 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertLogicalTopology(clusterNodes2, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, clusterNodes2.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
// Check that default zone still has both node1 and node2 because dafault zones' scaleDown is INF.
- assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1.name(), NODE_2.name()), keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
}
@Test
@@ -182,7 +180,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
startDistributionZoneManager();
- assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes.stream().map(ClusterNode::name).collect(Collectors.toSet()), keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
distributionZoneManager.alterZone(
DEFAULT_ZONE_NAME,
@@ -195,7 +193,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.putNode(NODE_2);
Set<LogicalNode> clusterNodes2 = Set.of(NODE_1, NODE_2);
- Set<String> clusterNodesNames2 = Set.of(NODE_1.name(), NODE_2.name());
assertLogicalTopology(clusterNodes2, keyValueStorage);
@@ -207,7 +204,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
.build()
).get();
- assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames2, keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes2, keyValueStorage);
}
@Test
@@ -216,11 +213,11 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.putNode(NODE_2);
- Set<String> clusterNodesNames = Set.of(NODE_1.name(), NODE_2.name());
+ Set<LogicalNode> clusterNodes = Set.of(NODE_1, NODE_2);
startDistributionZoneManager();
- assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames, keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes, keyValueStorage);
distributionZoneManager.alterZone(
DEFAULT_ZONE_NAME,
@@ -233,7 +230,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.removeNodes(Set.of(NODE_2));
Set<LogicalNode> clusterNodes2 = Set.of(NODE_1);
- Set<String> clusterNodesNames2 = Set.of(NODE_1.name());
assertLogicalTopology(clusterNodes2, keyValueStorage);
@@ -245,7 +241,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
.build()
).get();
- assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodesNames2, keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, clusterNodes2, keyValueStorage);
}
@Test
@@ -257,7 +253,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.putNode(NODE_2);
Set<LogicalNode> clusterNodes2 = Set.of(NODE_1, NODE_2);
- Set<String> clusterNodesNames2 = Set.of(NODE_1.name(), NODE_2.name());
assertLogicalTopology(clusterNodes2, keyValueStorage);
@@ -265,7 +260,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE).build()
).get();
- assertDataNodesForZone(ZONE_1_ID, clusterNodesNames2, keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
assertNotNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(ZONE_1_ID).bytes()).value(),
"zoneScaleUpChangeTriggerKey must be not null.");
@@ -290,7 +285,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.removeNodes(Set.of(NODE_2));
Set<LogicalNode> clusterNodes2 = Set.of(NODE_1);
- Set<String> clusterNodesNames2 = Set.of(NODE_1.name());
assertLogicalTopology(clusterNodes2, keyValueStorage);
@@ -298,7 +292,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build()
).get();
- assertDataNodesForZone(ZONE_1_ID, clusterNodesNames2, keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, clusterNodes2, keyValueStorage);
assertNotNull(keyValueStorage.get(zoneScaleUpChangeTriggerKey(ZONE_1_ID).bytes()).value(),
"zoneScaleUpChangeTriggerKey must be not null.");
@@ -328,7 +322,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
- Set.of("D"),
+ Set.of(D),
Set.of(),
prerequisiteRevision + 1,
(zoneId, revision) -> {
@@ -349,7 +343,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
- Set.of("E"),
+ Set.of(E),
Set.of(),
prerequisiteRevision + 2,
(zoneId, revision) -> {
@@ -379,7 +373,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D", "E"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage);
out2.countDown();
@@ -391,7 +385,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
// Assert that nothing has been changed.
assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D", "E"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage);
}
@Test
@@ -411,7 +405,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
Set.of(),
- Set.of("B"),
+ Set.of(B),
prerequisiteRevision + 1,
(t1, t2) -> null,
(zoneId, revision) -> {
@@ -432,7 +426,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
Set.of(),
- Set.of("C"),
+ Set.of(C),
prerequisiteRevision + 2,
(t1, t2) -> null,
(zoneId, revision) -> {
@@ -461,7 +455,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage);
out2.countDown();
@@ -473,7 +467,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
// Assert that nothing has been changed.
assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage);
}
@Test
@@ -491,7 +485,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
- Set.of("D"),
+ Set.of(D),
Set.of(),
prerequisiteRevision + 1,
(zoneId, revision) -> {
@@ -514,7 +508,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
- Set.of("E"),
+ Set.of(E),
Set.of(),
prerequisiteRevision + 2,
(zoneId, revision) -> {
@@ -535,14 +529,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
// Second task is run and we await that data nodes will be changed from ["A", "B", "C", "D"] to ["A", "B", "C", "D", "E"]
in2.countDown();
assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D", "E"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage);
out1.countDown();
}
@@ -563,7 +557,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
Set.of(),
- Set.of("B"),
+ Set.of(B),
prerequisiteRevision + 1,
(t1, t2) -> null,
(zoneId, revision) -> {
@@ -586,7 +580,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.scheduleTimers(
zoneView,
Set.of(),
- Set.of("C"),
+ Set.of(C),
prerequisiteRevision + 2,
(t1, t2) -> null,
(zoneId, revision) -> {
@@ -606,14 +600,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
// Second task is run and we await that data nodes will be changed from ["A", "C"] to ["A"]
in2.countDown();
assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage);
out1.countDown();
}
@@ -634,7 +628,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
}
@Test
@@ -660,7 +654,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleUp(0).build()
).get();
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
}
@Test
@@ -675,11 +669,11 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(100).build()
).get();
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
topology.removeNodes(Set.of(NODE_1));
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
distributionZoneManager.alterZone(
ZONE_1_NAME,
@@ -695,9 +689,9 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("E"), 1004);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(E), 1004);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
@@ -720,7 +714,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1015);
assertTrue(zoneState.topologyAugmentationMap().isEmpty());
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D", "E"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D, E), keyValueStorage);
}
@Test
@@ -765,7 +759,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(100).build()
).get();
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
@@ -828,13 +822,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
assertLogicalTopology(Set.of(NODE_1), keyValueStorage);
- assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(NODE_1), keyValueStorage);
distributionZoneManager.createZone(
new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME).dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE).build()
).get();
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
assertZoneScaleDownChangeTriggerKey(4L, ZONE_1_ID, keyValueStorage);
@@ -854,7 +848,7 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
topology.removeNodes(Set.of(NODE_1));
- assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1.name()), keyValueStorage);
+ assertDataNodesForZone(ZONE_1_ID, Set.of(NODE_1), keyValueStorage);
assertZoneScaleDownChangeTriggerKey(100L, ZONE_1_ID, keyValueStorage);
}
@@ -865,13 +859,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
}
@Test
@@ -880,13 +874,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
}
@Test
@@ -895,13 +889,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
}
@Test
@@ -910,13 +904,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, D), keyValueStorage);
}
@Test
@@ -925,14 +919,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
}
@Test
@@ -941,13 +935,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1003);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
}
@Test
@@ -956,13 +950,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
}
@Test
@@ -971,13 +965,13 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1003);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C), keyValueStorage);
}
@Test
@@ -986,14 +980,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
}
@Test
@@ -1002,14 +996,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
}
@Test
@@ -1018,16 +1012,16 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
}
@Test
@@ -1036,14 +1030,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
}
@Test
@@ -1052,14 +1046,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
}
@Test
@@ -1068,14 +1062,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
}
@Test
@@ -1084,15 +1078,15 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
- zoneState.nodesToAddToDataNodes(Set.of("E"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(E), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "E"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, E), keyValueStorage);
}
@Test
@@ -1101,14 +1095,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
- zoneState.nodesToAddToDataNodes(Set.of("E"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(E), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "E"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, E), keyValueStorage);
}
@Test
@@ -1117,14 +1111,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- zoneState.nodesToAddToDataNodes(Set.of("E"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(E), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "E"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, E), keyValueStorage);
}
@Test
@@ -1133,14 +1127,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("B"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(B), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
}
@Test
@@ -1149,14 +1143,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("B"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(B), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
}
@Test
@@ -1165,14 +1159,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("B"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(B), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "C"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage);
}
@Test
@@ -1181,14 +1175,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
}
@Test
@@ -1197,14 +1191,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
}
@Test
@@ -1213,14 +1207,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1003);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1007);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1003);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1007);
- zoneState.nodesToAddToDataNodes(Set.of("C"), 1009);
+ zoneState.nodesToAddToDataNodes(Set.of(C), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B", "C", "D"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage);
}
@Test
@@ -1229,14 +1223,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
}
@Test
@@ -1245,14 +1239,14 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
}
@Test
@@ -1261,60 +1255,79 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
ZoneState zoneState = distributionZoneManager.zonesTimers().get(ZONE_1_ID);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C"), 1003);
- zoneState.nodesToAddToDataNodes(Set.of("D"), 1007);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C), 1003);
+ zoneState.nodesToAddToDataNodes(Set.of(D), 1007);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleUp(ZONE_1_ID, 1007);
- zoneState.nodesToRemoveFromDataNodes(Set.of("D"), 1009);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(D), 1009);
distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(ZONE_1_ID, 1009);
- assertDataNodesForZone(ZONE_1_ID, Set.of("A", "B"), keyValueStorage);
+ assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B), keyValueStorage);
}
@Test
void testZoneStateAddRemoveNodesPreservesDuplicationsOfNodes() {
ZoneState zoneState = new ZoneState(new ScheduledThreadPoolExecutor(1));
- zoneState.nodesToAddToDataNodes(Set.of("A", "B"), 1);
- zoneState.nodesToAddToDataNodes(Set.of("A", "B"), 2);
+ zoneState.nodesToAddToDataNodes(Set.of(A, B), 1);
+ zoneState.nodesToAddToDataNodes(Set.of(A, B), 2);
+
+ List<Node> nodes = zoneState.nodesToBeAddedToDataNodes(0, 2);
- List<String> nodes = zoneState.nodesToBeAddedToDataNodes(0, 2);
+ nodes.sort(Comparator.comparing(Node::nodeName));
- Collections.sort(nodes);
- assertEquals(List.of("A", "A", "B", "B"), nodes);
+ assertEquals(List.of(A, A, B, B), nodes);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C", "D"), 3);
- zoneState.nodesToRemoveFromDataNodes(Set.of("C", "D"), 4);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C, D), 3);
+ zoneState.nodesToRemoveFromDataNodes(Set.of(C, D), 4);
nodes = zoneState.nodesToBeRemovedFromDataNodes(2, 4);
- Collections.sort(nodes);
+ nodes.sort(Comparator.comparing(Node::nodeName));
- assertEquals(List.of("C", "C", "D", "D"), nodes);
+ assertEquals(List.of(C, C, D, D), nodes);
}
/**
- * Creates a zone with the auto adjust scale up scale down trigger equals to 0 and the data nodes equals ["A", "B", "C"].
+ * Creates a zone with the auto adjust scale up scale down trigger equals to 0 and the data nodes equals ["A", B, "C"].
*
* @throws Exception when something goes wrong.
*/
private void preparePrerequisites() throws Exception {
- topology.putNode(NODE_1);
- topology.putNode(NODE_2);
- topology.putNode(NODE_3);
+ preparePrerequisites(null);
+ }
- Set<String> clusterNodesNames = Set.of(NODE_1.name(), NODE_2.name(), NODE_3.name());
+ private void preparePrerequisites(@Nullable String filter) throws Exception {
+ LogicalNode a = new LogicalNode("1", "A", new NetworkAddress("localhost", 123));
- startDistributionZoneManager();
+ LogicalNode b = new LogicalNode("2", "B", new NetworkAddress("localhost", 123));
- distributionZoneManager.createZone(
- new Builder(ZONE_1_NAME)
- .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
- .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
- .build()
- ).get();
+ LogicalNode c = new LogicalNode("3", "C", new NetworkAddress("localhost", 123));
+
+ topology.putNode(a);
+ topology.putNode(b);
+ topology.putNode(c);
- assertDataNodesForZone(ZONE_1_ID, clusterNodesNames, keyValueStorage);
+ Set<LogicalNode> clusterNodes = Set.of(a, b, c);
+
+ startDistributionZoneManager();
+
+ if (filter == null) {
+ distributionZoneManager.createZone(
+ new Builder(ZONE_1_NAME)
+ .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .build()).get();
+ } else {
+ distributionZoneManager.createZone(
+ new DistributionZoneConfigurationParameters.Builder(ZONE_1_NAME)
+ .dataNodesAutoAdjustScaleUp(IMMEDIATE_TIMER_VALUE)
+ .dataNodesAutoAdjustScaleDown(IMMEDIATE_TIMER_VALUE)
+ .filter(filter)
+ .build()).get();
+ }
+
+ assertDataNodesForZone(ZONE_1_ID, clusterNodes, keyValueStorage);
long scaleUpChangeTriggerKey = bytesToLong(
keyValueStorage.get(zoneScaleUpChangeTriggerKey(ZONE_1_ID).bytes()).value()
@@ -1338,39 +1351,6 @@ public class DistributionZoneManagerScaleUpTest extends BaseDistributionZoneMana
return distributionZoneManager.saveDataNodesToMetaStorageOnScaleDown(zoneId, revision);
}
- private void assertNotEqualsDataNodesForZone(int zoneId, @Nullable Set<String> clusterNodes) throws InterruptedException {
- assertFalse(waitForCondition(
- () -> {
- byte[] dataNodes = keyValueStorage.get(zoneDataNodesKey(zoneId).bytes()).value();
-
- if (dataNodes == null) {
- return clusterNodes == null;
- }
-
- Set<String> res = DistributionZonesUtil.dataNodes(ByteUtils.fromBytes(dataNodes));
-
- return res.equals(clusterNodes);
- },
- 1000
- ));
- }
-
- private void startDistributionZoneManager() throws Exception {
- deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
-
- distributionZoneManager.start();
- }
-
- private void setLogicalTopologyInMetaStorage(Set<String> nodes) {
- CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
- Conditions.exists(zonesLogicalTopologyKey()),
- put(zonesLogicalTopologyKey(), toBytes(nodes)),
- noop()
- );
-
- assertThat(invokeFuture, willBe(true));
- }
-
private void assertThatZonesAugmentationMapContainsRevision(int zoneId, long revisionToAssert) throws InterruptedException {
assertTrue(
waitForCondition(
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index fc3e304313..03d9638aed 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -20,27 +20,24 @@ package org.apache.ignite.internal.distributionzones;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_ID;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.INFINITE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesForZone;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.mockVaultZonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.setLogicalTopologyInMetaStorage;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.assertDataNodesForZone;
-import static org.apache.ignite.internal.distributionzones.util.DistributionZonesTestUtil.deployWatchesAndUpdateMetaStorageRevision;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
-import static org.apache.ignite.internal.util.ByteUtils.toBytes;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import java.util.Collections;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.Test;
/**
@@ -48,9 +45,13 @@ import org.junit.jupiter.api.Test;
*/
//TODO: IGNITE-18564 Add tests with not default distribution zones, when distributionZones.change.trigger per zone will be created.
public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZoneManagerTest {
+ private static final LogicalNode NODE_1 = new LogicalNode("node1", "node1", new NetworkAddress("localhost", 123));
+ private static final LogicalNode NODE_2 = new LogicalNode("node2", "node2", new NetworkAddress("localhost", 123));
+ private static final LogicalNode NODE_3 = new LogicalNode("node3", "node3", new NetworkAddress("localhost", 123));
+
@Test
void testStaleWatchEvent() throws Exception {
- mockVaultZonesLogicalTopologyKey(Set.of());
+ mockVaultZonesLogicalTopologyKey(Set.of(), vaultMgr);
startDistributionZoneManager();
@@ -66,12 +67,9 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
keyValueStorage.put(zoneScaleUpChangeTriggerKey(DEFAULT_ZONE_ID).bytes(), longToBytes(revision), HybridTimestamp.MIN_VALUE);
- Set<String> nodes = Set.of("node1", "node2");
-
- setLogicalTopologyInMetaStorage(nodes);
+ Set<LogicalNode> nodes = Set.of(NODE_1, NODE_2);
- // two invokes on start, and invoke for update scale up won't be triggered, because revision == zoneScaleUpChangeTriggerKey
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
+ setLogicalTopologyInMetaStorage(nodes, 100, metaStorageManager);
assertDataNodesForZone(DEFAULT_ZONE_ID, Set.of(), keyValueStorage);
}
@@ -82,9 +80,12 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
keyValueStorage.put(zonesChangeTriggerKey(DEFAULT_ZONE_ID).bytes(), longToBytes(revision), HybridTimestamp.MIN_VALUE);
- Set<String> nodes = Set.of("node1", "node2");
+ Set<LogicalNode> nodes = Set.of(
+ new LogicalNode(new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap()),
+ new LogicalNode(new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap())
+ );
- mockVaultZonesLogicalTopologyKey(nodes);
+ mockVaultZonesLogicalTopologyKey(nodes, vaultMgr);
startDistributionZoneManager();
@@ -95,14 +96,15 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
@Test
void testDataNodesUpdatedOnZoneManagerStart() throws Exception {
- Set<String> nodes = Set.of("node1", "node2");
+ Set<LogicalNode> nodes = Set.of(
+ new LogicalNode(new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap()),
+ new LogicalNode(new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.1:127")), Collections.emptyMap())
+ );
- mockVaultZonesLogicalTopologyKey(nodes);
+ mockVaultZonesLogicalTopologyKey(nodes, vaultMgr);
startDistributionZoneManager();
- verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
-
assertDataNodesForZone(DEFAULT_ZONE_ID, nodes, keyValueStorage);
}
@@ -116,26 +118,4 @@ public class DistributionZoneManagerWatchListenerTest extends BaseDistributionZo
assertNull(keyValueStorage.get(zoneDataNodesKey(DEFAULT_ZONE_ID).bytes()).value());
assertNull(keyValueStorage.get(zoneDataNodesKey(1).bytes()).value());
}
-
- private void mockVaultZonesLogicalTopologyKey(Set<String> nodes) {
- byte[] newLogicalTopology = toBytes(nodes);
-
- assertThat(vaultMgr.put(zonesLogicalTopologyKey(), newLogicalTopology), willCompleteSuccessfully());
- }
-
- private void setLogicalTopologyInMetaStorage(Set<String> nodes) {
- CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
- Conditions.exists(zonesLogicalTopologyKey()),
- Operations.put(zonesLogicalTopologyKey(), toBytes(nodes)),
- Operations.noop()
- );
-
- assertThat(invokeFuture, willBe(true));
- }
-
- private void startDistributionZoneManager() throws Exception {
- deployWatchesAndUpdateMetaStorageRevision(metaStorageManager);
-
- distributionZoneManager.start();
- }
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
index 3df413d173..969b05edcb 100644
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
+++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java
@@ -30,7 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
+import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration;
import org.apache.ignite.internal.metastorage.Entry;
@@ -122,8 +122,6 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest {
@BeforeEach
public void setUp() {
- when(distributionZoneManager.getDataNodesByZoneId(anyInt())).thenReturn(Set.of("node0"));
-
doAnswer(invocation -> {
ByteArray key = invocation.getArgument(0);
@@ -417,7 +415,9 @@ public class DistributionZoneRebalanceEngineTest extends IgniteAbstractTest {
byte[] newLogicalTopology;
if (nodes != null) {
- newLogicalTopology = toBytes(toDataNodesMap(nodes));
+ newLogicalTopology = toBytes(toDataNodesMap(nodes.stream()
+ .map(n -> new Node(n, n))
+ .collect(Collectors.toSet())));
} else {
newLogicalTopology = null;
}
diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/util/DistributionZonesTestUtil.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/util/DistributionZonesTestUtil.java
deleted file mode 100644
index 863a28c7cb..0000000000
--- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/util/DistributionZonesTestUtil.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.distributionzones.util;
-
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
-import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Conditions;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
-import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Util class for methods for Distribution zones tests.
- */
-public class DistributionZonesTestUtil {
- /**
- * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)}.
- *
- * @param zoneId Zone id.
- * @param clusterNodes Data nodes.
- * @param keyValueStorage Key-value storage.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void assertDataNodesForZone(
- int zoneId,
- @Nullable Set<String> clusterNodes,
- KeyValueStorage keyValueStorage
- ) throws InterruptedException {
- assertValueInStorage(
- keyValueStorage,
- zoneDataNodesKey(zoneId).bytes(),
- value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
- clusterNodes,
- 2000
- );
- }
-
- /**
- * Asserts {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} revision.
- *
- * @param revision Revision.
- * @param zoneId Zone id.
- * @param keyValueStorage Key-value storage.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void assertZoneScaleUpChangeTriggerKey(
- @Nullable Long revision,
- int zoneId,
- KeyValueStorage keyValueStorage
- ) throws InterruptedException {
- assertValueInStorage(
- keyValueStorage,
- zoneScaleUpChangeTriggerKey(zoneId).bytes(),
- ByteUtils::bytesToLong,
- revision,
- 2000
- );
- }
-
- /**
- * Asserts {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} revision.
- *
- * @param revision Revision.
- * @param zoneId Zone id.
- * @param keyValueStorage Key-value storage.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void assertZoneScaleDownChangeTriggerKey(
- @Nullable Long revision,
- int zoneId,
- KeyValueStorage keyValueStorage
- ) throws InterruptedException {
- assertValueInStorage(
- keyValueStorage,
- zoneScaleDownChangeTriggerKey(zoneId).bytes(),
- ByteUtils::bytesToLong,
- revision,
- 2000
- );
- }
-
- /**
- * Asserts {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} revision.
- *
- * @param revision Revision.
- * @param zoneId Zone id.
- * @param keyValueStorage Key-value storage.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void assertZonesChangeTriggerKey(
- long revision,
- int zoneId,
- KeyValueStorage keyValueStorage
- ) throws InterruptedException {
- assertValueInStorage(
- keyValueStorage,
- zonesChangeTriggerKey(zoneId).bytes(),
- ByteUtils::bytesToLong,
- revision,
- 1000
- );
- }
-
- /**
- * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value.
- *
- * @param clusterNodes Expected cluster nodes.
- * @param keyValueStorage Key-value storage.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void assertLogicalTopology(
- @Nullable Set<LogicalNode> clusterNodes,
- KeyValueStorage keyValueStorage
- ) throws InterruptedException {
- Set<String> nodes = clusterNodes == null
- ? null
- : clusterNodes.stream().map(LogicalNode::name).collect(Collectors.toSet());
-
- assertValueInStorage(
- keyValueStorage,
- zonesLogicalTopologyKey().bytes(),
- ByteUtils::fromBytes,
- nodes,
- 1000
- );
- }
-
- /**
- * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} value.
- *
- * @param topVer Topology version.
- * @param keyValueStorage Key-value storage.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void assertLogicalTopologyVersion(long topVer, KeyValueStorage keyValueStorage) throws InterruptedException {
- assertValueInStorage(
- keyValueStorage,
- zonesLogicalTopologyVersionKey().bytes(),
- ByteUtils::bytesToLong,
- topVer,
- 1000
- );
- }
-
- /**
- * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value.
- *
- * @param clusterNodes Expected cluster nodes' names.
- * @param keyValueStorage Key-value storage.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void assertLogicalTopologyWithNodeNames(@Nullable Set<String> clusterNodes, KeyValueStorage keyValueStorage)
- throws InterruptedException {
- assertValueInStorage(
- keyValueStorage,
- zonesLogicalTopologyKey().bytes(),
- ByteUtils::fromBytes,
- clusterNodes,
- 1000
- );
- }
-
- /**
- * This method is used to initialize the meta storage revision before starting the distribution zone manager.
- * TODO: IGNITE-19403 Watch listeners must be deployed after the zone manager starts.
- *
- * @param metaStorageManager Meta storage manager.
- * @throws NodeStoppingException If node is stopping.
- * @throws InterruptedException If thread was interrupted.
- */
- public static void deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager metaStorageManager)
- throws NodeStoppingException, InterruptedException {
- // Watches are deployed before distributionZoneManager start in order to update Meta Storage revision before
- // distributionZoneManager's recovery.
- metaStorageManager.deployWatches();
-
- // Bump Meta Storage applied revision by modifying a fake key. DistributionZoneManager breaks on start if Vault is not empty, but
- // Meta Storage revision is equal to 0.
- var fakeKey = new ByteArray("foobar");
-
- CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
- Conditions.notExists(fakeKey),
- Operations.put(fakeKey, fakeKey.bytes()),
- Operations.noop()
- );
-
- assertThat(invokeFuture, willBe(true));
-
- assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() > 0, 10_000));
- }
-
- private static <T> void assertValueInStorage(
- KeyValueStorage keyValueStorage,
- byte[] key,
- Function<byte[], T> valueTransformer,
- @Nullable T expectedValue,
- long timeoutMillis
- ) throws InterruptedException {
- boolean success = waitForCondition(() -> {
- byte[] storageValue = keyValueStorage.get(key).value();
-
- T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
-
- return Objects.equals(actualValue, expectedValue);
- }, timeoutMillis);
-
- // We do a second check simply to print a nice error message in case the condition above is not achieved.
- if (!success) {
- byte[] storageValue = keyValueStorage.get(key).value();
-
- assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
- }
- }
-}
diff --git a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
index 7db6f48943..98daab9098 100644
--- a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
+++ b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java
@@ -17,12 +17,49 @@
package org.apache.ignite.internal.distributionzones;
+import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_FILTER;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesChangeTriggerKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyVersionKey;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
+import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Conditions;
+import org.apache.ignite.internal.metastorage.dsl.Iif;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.schema.configuration.storage.DataStorageChange;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
/**
* Utils to manage distribution zones inside tests.
@@ -90,4 +127,320 @@ public class DistributionZonesTestUtil {
return zoneManager.alterZone(zoneName, distributionZoneCfgBuilder.build());
}
+
+ /**
+ * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)}.
+ *
+ * @param zoneId Zone id.
+ * @param clusterNodes Data nodes.
+ * @param keyValueStorage Key-value storage.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertDataNodesForZone(
+ int zoneId,
+ @Nullable Set<LogicalNode> clusterNodes,
+ KeyValueStorage keyValueStorage
+ ) throws InterruptedException {
+ Set<Node> nodes = clusterNodes == null
+ ? null
+ : clusterNodes.stream().map(n -> new Node(n.name(), n.id())).collect(Collectors.toSet());
+
+ assertValueInStorage(
+ keyValueStorage,
+ zoneDataNodesKey(zoneId).bytes(),
+ value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+ nodes,
+ 2000
+ );
+ }
+
+ /**
+ * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)}.
+ *
+ * @param zoneId Zone id.
+ * @param nodes Data nodes.
+ * @param keyValueStorage Key-value storage.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertDataNodesForZoneWithAttributes(
+ int zoneId,
+ @Nullable Set<Node> nodes,
+ KeyValueStorage keyValueStorage
+ ) throws InterruptedException {
+ assertDataNodesForZoneWithAttributes(zoneId, nodes, keyValueStorage, DEFAULT_FILTER);
+ }
+
+ /**
+ * Asserts data nodes from {@link DistributionZonesUtil#zoneDataNodesKey(int)} and with provided {@code filter}.
+ *
+ * @param zoneId Zone id.
+ * @param nodes Data nodes.
+ * @param keyValueStorage Key-value storage.
+ * @param filter Filter for data nodes.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertDataNodesForZoneWithAttributes(
+ int zoneId,
+ @Nullable Set<Node> nodes,
+ KeyValueStorage keyValueStorage,
+ String filter
+ ) throws InterruptedException {
+ assertValueInStorage(
+ keyValueStorage,
+ zoneDataNodesKey(zoneId).bytes(),
+ value -> DistributionZonesUtil.dataNodes(fromBytes(value)),
+ nodes,
+ 2000
+ );
+ }
+
+ /**
+ * Asserts {@link DistributionZonesUtil#zoneScaleUpChangeTriggerKey(int)} revision.
+ *
+ * @param revision Revision.
+ * @param zoneId Zone id.
+ * @param keyValueStorage Key-value storage.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertZoneScaleUpChangeTriggerKey(
+ @Nullable Long revision,
+ int zoneId,
+ KeyValueStorage keyValueStorage
+ ) throws InterruptedException {
+ assertValueInStorage(
+ keyValueStorage,
+ zoneScaleUpChangeTriggerKey(zoneId).bytes(),
+ ByteUtils::bytesToLong,
+ revision,
+ 2000
+ );
+ }
+
+ /**
+ * Asserts {@link DistributionZonesUtil#zoneScaleDownChangeTriggerKey(int)} revision.
+ *
+ * @param revision Revision.
+ * @param zoneId Zone id.
+ * @param keyValueStorage Key-value storage.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertZoneScaleDownChangeTriggerKey(
+ @Nullable Long revision,
+ int zoneId,
+ KeyValueStorage keyValueStorage
+ ) throws InterruptedException {
+ assertValueInStorage(
+ keyValueStorage,
+ zoneScaleDownChangeTriggerKey(zoneId).bytes(),
+ ByteUtils::bytesToLong,
+ revision,
+ 2000
+ );
+ }
+
+ /**
+ * Asserts {@link DistributionZonesUtil#zonesChangeTriggerKey(int)} revision.
+ *
+ * @param revision Revision.
+ * @param zoneId Zone id.
+ * @param keyValueStorage Key-value storage.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertZonesChangeTriggerKey(
+ long revision,
+ int zoneId,
+ KeyValueStorage keyValueStorage
+ ) throws InterruptedException {
+ assertValueInStorage(
+ keyValueStorage,
+ zonesChangeTriggerKey(zoneId).bytes(),
+ ByteUtils::bytesToLong,
+ revision,
+ 1000
+ );
+ }
+
+ /**
+ * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyKey()} value.
+ *
+ * @param clusterNodes Expected cluster nodes.
+ * @param keyValueStorage Key-value storage.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertLogicalTopology(
+ @Nullable Set<LogicalNode> clusterNodes,
+ KeyValueStorage keyValueStorage
+ ) throws InterruptedException {
+ Set<NodeWithAttributes> nodes = clusterNodes == null
+ ? null
+ : clusterNodes.stream().map(n -> new NodeWithAttributes(n.name(), n.id(), n.nodeAttributes())).collect(Collectors.toSet());
+
+ assertValueInStorage(
+ keyValueStorage,
+ zonesLogicalTopologyKey().bytes(),
+ ByteUtils::fromBytes,
+ nodes,
+ 1000
+ );
+ }
+
+ /**
+ * Asserts {@link DistributionZonesUtil#zonesLogicalTopologyVersionKey()} value.
+ *
+ * @param topVer Topology version.
+ * @param keyValueStorage Key-value storage.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void assertLogicalTopologyVersion(long topVer, KeyValueStorage keyValueStorage) throws InterruptedException {
+ assertValueInStorage(
+ keyValueStorage,
+ zonesLogicalTopologyVersionKey().bytes(),
+ ByteUtils::bytesToLong,
+ topVer,
+ 1000
+ );
+ }
+
+ /**
+ * This method is used to initialize the meta storage revision before starting the distribution zone manager.
+ * TODO: IGNITE-19403 Watch listeners must be deployed after the zone manager starts.
+ *
+ * @param metaStorageManager Meta storage manager.
+ * @throws NodeStoppingException If node is stopping.
+ * @throws InterruptedException If thread was interrupted.
+ */
+ public static void deployWatchesAndUpdateMetaStorageRevision(MetaStorageManager metaStorageManager)
+ throws NodeStoppingException, InterruptedException {
+ // Watches are deployed before distributionZoneManager start in order to update Meta Storage revision before
+ // distributionZoneManager's recovery.
+ metaStorageManager.deployWatches();
+
+ // Bump Meta Storage applied revision by modifying a fake key. DistributionZoneManager breaks on start if Vault is not empty, but
+ // Meta Storage revision is equal to 0.
+ var fakeKey = new ByteArray("foobar");
+
+ CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(
+ Conditions.notExists(fakeKey),
+ Operations.put(fakeKey, fakeKey.bytes()),
+ Operations.noop()
+ );
+
+ assertThat(invokeFuture, willBe(true));
+
+ assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() > 0, 10_000));
+ }
+
+ /**
+ * Sets logical topology to Vault.
+ *
+ * @param nodes Logical topology
+ * @param vaultMgr Vault manager
+ */
+ public static void mockVaultZonesLogicalTopologyKey(Set<LogicalNode> nodes, VaultManager vaultMgr) {
+ Set<NodeWithAttributes> nodesWithAttributes = nodes.stream()
+ .map(n -> new NodeWithAttributes(n.name(), n.id(), n.nodeAttributes()))
+ .collect(Collectors.toSet());
+
+ byte[] newLogicalTopology = toBytes(nodesWithAttributes);
+
+ assertThat(vaultMgr.put(zonesLogicalTopologyKey(), newLogicalTopology), willCompleteSuccessfully());
+ }
+
+ /**
+ * Sets logical topology to Meta Storage.
+ *
+ * @param nodes Logical topology
+ * @param topVer Topology version
+ * @param metaStorageManager Meta Storage manager.
+ */
+ public static void setLogicalTopologyInMetaStorage(Set<LogicalNode> nodes, long topVer, MetaStorageManager metaStorageManager) {
+ Iif iff = iif(
+ Conditions.exists(zonesLogicalTopologyKey()),
+ updateLogicalTopologyAndVersion(nodes, topVer),
+ ops().yield(false)
+ );
+
+ CompletableFuture<Boolean> invokeFuture = metaStorageManager.invoke(iff).thenApply(StatementResult::getAsBoolean);
+
+ assertThat(invokeFuture, willBe(true));
+ }
+
+ /**
+ * Asserts value from the storage.
+ *
+ * @param keyValueStorage Key-value storage.
+ * @param key Key of value to check.
+ * @param valueTransformer Function that is applied to value from the storage.
+ * @param expectedValue Expected value.
+ * @param timeoutMillis Timeout in milliseconds.
+ * @param <T> A type of value from storage.
+ * @throws InterruptedException If interrupted.
+ */
+ public static <T> void assertValueInStorage(
+ KeyValueStorage keyValueStorage,
+ byte[] key,
+ Function<byte[], T> valueTransformer,
+ @Nullable T expectedValue,
+ long timeoutMillis
+ ) throws InterruptedException {
+ boolean success = waitForCondition(() -> {
+ byte[] storageValue = keyValueStorage.get(key).value();
+
+ T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
+
+ return Objects.equals(actualValue, expectedValue);
+ }, timeoutMillis);
+
+ // We do a second check simply to print a nice error message in case the condition above is not achieved.
+ if (!success) {
+ byte[] storageValue = keyValueStorage.get(key).value();
+
+ assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
+ }
+ }
+
+ /**
+ * Asserts value from the meta storage.
+ *
+ * @param metaStorageManager Meta Storage manager.
+ * @param key Key of value to check.
+ * @param valueTransformer Function that is applied to value from the meta storage.
+ * @param expectedValue Expected value.
+ * @param timeoutMillis Timeout in milliseconds.
+ * @param <T> A type of value from the meta storage.
+ * @throws InterruptedException If interrupted.
+ */
+ public static <T> void assertValueInStorage(
+ MetaStorageManager metaStorageManager,
+ ByteArray key,
+ Function<byte[], T> valueTransformer,
+ @Nullable T expectedValue,
+ long timeoutMillis
+ ) throws InterruptedException {
+ boolean success = waitForCondition(() -> {
+ byte[] storageValue = new byte[0];
+ try {
+ storageValue = metaStorageManager.get(key).get().value();
+ } catch (InterruptedException | ExecutionException e) {
+ fail();
+ }
+
+ T actualValue = storageValue == null ? null : valueTransformer.apply(storageValue);
+
+ return Objects.equals(actualValue, expectedValue);
+ }, timeoutMillis);
+
+ // We do a second check simply to print a nice error message in case the condition above is not achieved.
+ if (!success) {
+ byte[] storageValue = new byte[0];
+
+ try {
+ storageValue = metaStorageManager.get(key).get().value();
+ } catch (ExecutionException e) {
+ fail();
+ }
+
+ assertThat(storageValue == null ? null : valueTransformer.apply(storageValue), is(expectedValue));
+ }
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
new file mode 100644
index 0000000000..522a6e07b3
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItDistributionZonesFilterTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distribution.zones;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertValueInStorage;
+import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesKey;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
+import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.distributionzones.Node;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.sql.Session;
+import org.intellij.lang.annotations.Language;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for data nodes' filters functionality.
+ */
+public class ItDistributionZonesFilterTest extends ClusterPerTestIntegrationTest {
+ @Language("JSON")
+ private static final String NODE_ATTRIBUTES = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
+
+ private static final String COLUMN_KEY = "key";
+
+ private static final String COLUMN_VAL = "val";
+
+ private static final int TIMEOUT_MILLIS = 10_000;
+
+ @Language("JSON")
+ private static String createStartConfig(@Language("JSON") String nodeAttributes) {
+ return "{\n"
+ + " network: {\n"
+ + " port: {},\n"
+ + " nodeFinder: {\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },"
+ + " nodeAttributes: {\n"
+ + " nodeAttributes: " + nodeAttributes
+ + " },\n"
+ + "}";
+ }
+
+ @Override
+ protected String getNodeBootstrapConfigTemplate() {
+ return createStartConfig(NODE_ATTRIBUTES);
+ }
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ /**
+ * Tests the scenario when filter was applied to data nodes and stable key for rebalance was changed to that filtered value.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ void testFilteredDataNodesPropagatedToStable() throws Exception {
+ String filter = "'$[?(@.region == \"US\" && @.storage == \"SSD\")]'";
+
+ // This node do not pass the filter
+ @Language("JSON") String firstNodeAttributes = "{region:{attribute:\"EU\"},storage:{attribute:\"SSD\"}}";
+
+ IgniteImpl node = startNode(1, createStartConfig(firstNodeAttributes));
+
+ Session session = node.sql().createSession();
+
+ session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+ + "\"REPLICAS\" = 3, "
+ + "\"PARTITIONS\" = 2, "
+ + "\"DATA_NODES_FILTER\" = " + filter + ", "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
+
+ String tableName = "table1";
+
+ session.execute(null, "CREATE TABLE " + tableName + "("
+ + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) WITH PRIMARY_ZONE='TEST_ZONE'");
+
+ MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
+ .getFieldValue(node, IgniteImpl.class, "metaStorageMgr");
+
+ TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "distributedTblMgr");
+
+ TableImpl table = (TableImpl) tableManager.table(tableName);
+
+ TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ null,
+ TIMEOUT_MILLIS
+ );
+
+ @Language("JSON") String secondNodeAttributes = "{region:{attribute:\"US\"},storage:{attribute:\"SSD\"}}";
+
+ // This node pass the filter
+ startNode(2, createStartConfig(secondNodeAttributes));
+
+ assertValueInStorage(
+ metaStorageManager,
+ zoneDataNodesKey(1),
+ (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+ 3,
+ TIMEOUT_MILLIS
+ );
+
+ Entry dataNodesEntry1 = metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+
+ assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= dataNodesEntry1.revision(), TIMEOUT_MILLIS));
+
+ // We check that two nodes that pass the filter are presented in the stable key.
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ 2,
+ TIMEOUT_MILLIS * 2
+ );
+
+ byte[] stableAssignments = metaStorageManager.get(stablePartAssignmentsKey(partId)).get(5_000, MILLISECONDS).value();
+
+ assertNotNull(stableAssignments);
+
+ Set<String> stable = ((Set<Assignment>) fromBytes(stableAssignments)).stream()
+ .map(Assignment::consistentId)
+ .collect(Collectors.toSet());
+
+ assertEquals(2, stable.size());
+
+ assertTrue(stable.contains(node(0).name()) && stable.contains(node(2).name()));
+ }
+
+ /**
+ * Tests the scenario when removal node from the logical topology leads to empty data nodes, but this empty data nodes do not trigger
+ * rebalance. Data nodes become empty after applying corresponding filter.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19443")
+ void testFilteredEmptyDataNodesDoNotTriggerRebalance() throws Exception {
+ String filter = "'$[?(@.region == \"EU\" && @.storage == \"HDD\")]'";
+
+ // This node do not pass the filter.
+ IgniteImpl node0 = node(0);
+
+ // This node pass the filter
+ @Language("JSON") String firstNodeAttributes = "{region:{attribute:\"EU\"},storage:{attribute:\"HDD\"}}";
+
+ IgniteImpl node1 = startNode(1, createStartConfig(firstNodeAttributes));
+
+ Session session = node1.sql().createSession();
+
+ session.execute(null, "CREATE ZONE \"TEST_ZONE\" WITH "
+ + "\"REPLICAS\" = 1, "
+ + "\"PARTITIONS\" = 1, "
+ + "\"DATA_NODES_FILTER\" = " + filter + ", "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_UP\" = 0, "
+ + "\"DATA_NODES_AUTO_ADJUST_SCALE_DOWN\" = 0");
+
+ String tableName = "table1";
+
+ session.execute(null, "CREATE TABLE " + tableName + "("
+ + COLUMN_KEY + " INT PRIMARY KEY, " + COLUMN_VAL + " VARCHAR) WITH PRIMARY_ZONE='TEST_ZONE'");
+
+ MetaStorageManager metaStorageManager = (MetaStorageManager) IgniteTestUtils
+ .getFieldValue(node0, IgniteImpl.class, "metaStorageMgr");
+
+ TableManager tableManager = (TableManager) IgniteTestUtils.getFieldValue(node0, IgniteImpl.class, "distributedTblMgr");
+
+ TableImpl table = (TableImpl) tableManager.table(tableName);
+
+ TablePartitionId partId = new TablePartitionId(table.tableId(), 0);
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ null,
+ TIMEOUT_MILLIS
+ );
+
+ assertValueInStorage(
+ metaStorageManager,
+ zoneDataNodesKey(1),
+ (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+ 2,
+ TIMEOUT_MILLIS
+ );
+
+ Entry dataNodesEntry = metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+
+ assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= dataNodesEntry.revision(), 5_000));
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ null,
+ TIMEOUT_MILLIS
+ );
+
+ stopNode(1);
+
+ assertValueInStorage(
+ metaStorageManager,
+ zoneDataNodesKey(1),
+ (v) -> ((Map<Node, Integer>) fromBytes(v)).size(),
+ 1,
+ TIMEOUT_MILLIS
+ );
+
+ Entry newDataNodesEntry = metaStorageManager.get(zoneDataNodesKey(1)).get(5_000, MILLISECONDS);
+
+ assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision() >= newDataNodesEntry.revision(), 5_000));
+
+ assertValueInStorage(
+ metaStorageManager,
+ pendingPartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ null,
+ TIMEOUT_MILLIS
+ );
+
+ assertValueInStorage(
+ metaStorageManager,
+ stablePartAssignmentsKey(partId),
+ (v) -> ((Set<Assignment>) fromBytes(v)).size(),
+ null,
+ TIMEOUT_MILLIS
+ );
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 6b0a1c27f2..63b3e4d66b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -190,6 +190,10 @@ public class DdlCommandHandler {
zoneCfgBuilder.partitions(cmd.partitions());
}
+ if (cmd.nodeFilter() != null) {
+ zoneCfgBuilder.filter(cmd.nodeFilter());
+ }
+
zoneCfgBuilder.dataStorageChangeConsumer(
dataStorageManager.zoneDataStorageConsumer(cmd.dataStorage(), cmd.dataStorageOptions()));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index dda37dce6c..93f47ffde1 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -38,6 +38,7 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmen
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -2016,7 +2017,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
LOG.info("Received update on pending assignments. Check if new raft group should be started"
+ " [key={}, partition={}, table={}, localMemberAddress={}]",
- pendingAssignmentsEntry.key(), partId, tbl.name(), localMember.address());
+ new String(pendingAssignmentsEntry.key(), StandardCharsets.UTF_8), partId, tbl.name(), localMember.address());
CompletableFuture<Void> localServicesStartFuture;