You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/12/13 20:33:06 UTC
(helix) branch ApplicationClusterManager updated: Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. (#2712)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/ApplicationClusterManager by this push:
new 5d6396e32 Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. (#2712)
5d6396e32 is described below
commit 5d6396e328492304a2f1032a7165ed73ed94f812
Author: Zachary Pinto <za...@linkedin.com>
AuthorDate: Wed Dec 13 12:33:01 2023 -0800
Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. (#2712)
* Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. This will allow for swap to work in clusters where the non-FaultZone or EndNode domain kv pairs don't directly match the swapping node.
---
.../AbstractEvenDistributionRebalanceStrategy.java | 2 +-
.../strategy/CrushRebalanceStrategy.java | 2 +-
.../strategy/MultiRoundCrushRebalanceStrategy.java | 2 +-
.../controller/rebalancer/topology/Topology.java | 42 +++++++++++++++++--
.../waged/model/ClusterModelProvider.java | 5 ++-
.../rebalancer/TestInstanceOperation.java | 48 ++++++++++++----------
6 files changed, 72 insertions(+), 29 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
index 7750bd70b..c10a6a9ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
@@ -117,7 +117,7 @@ public abstract class AbstractEvenDistributionRebalanceStrategy
Map<String, List<Node>> finalPartitionMap = null;
Topology allNodeTopo =
new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
- clusterData.getClusterConfig());
+ clusterData.getClusterConfig(), true);
// Transform current assignment to instance->partitions map, and get total partitions
Map<Node, List<String>> nodeToPartitionMap =
convertPartitionMap(origPartitionMap, allNodeTopo);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index 08bbaaffa..011da6777 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -77,7 +77,7 @@ public class CrushRebalanceStrategy implements RebalanceStrategy<ResourceControl
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
- new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+ new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
Node topNode = _clusterTopo.getRootNode();
// for log only
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
index a53257f3d..96ddfa485 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
@@ -84,7 +84,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy<Resou
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
- new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+ new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
Node root = _clusterTopo.getRootNode();
Map<String, List<Node>> zoneMapping = new HashMap<>();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 4d4ebabd1..335c30fdf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -57,8 +58,19 @@ public class Topology {
private final Map<String, InstanceConfig> _instanceConfigMap;
private final ClusterTopologyConfig _clusterTopologyConfig;
+ /**
+ * Create a Topology for a cluster.
+ *
+ * @param allNodes allNodes of the given cluster.
+ * @param liveNodes liveNodes of the given cluster.
+ * @param instanceConfigMap instanceConfigMap of the given cluster.
+ * @param clusterConfig clusterConfig of the given cluster.
+ * @param faultZoneLevelOnly whether to include additional non-faultZone level nodes in the
+ * topology tree above the end-nodes.
+ */
public Topology(final List<String> allNodes, final List<String> liveNodes,
- final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+ final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig,
+ boolean faultZoneLevelOnly) {
try {
_md = MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException ex) {
@@ -73,7 +85,20 @@ public class Topology {
_allInstances.removeAll(_instanceConfigMap.keySet())));
}
_clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
- _root = createClusterTree(clusterConfig);
+ _root = createClusterTree(clusterConfig, faultZoneLevelOnly);
+ }
+
+ /**
+ * Create a Topology for a cluster. faultZoneLevelOnly is set to false by default.
+ *
+ * @param allNodes allNodes of the given cluster.
+ * @param liveNodes liveNodes of the given cluster.
+ * @param instanceConfigMap instanceConfigMap of the given cluster.
+ * @param clusterConfig clusterConfig of the given cluster.
+ */
+ public Topology(final List<String> allNodes, final List<String> liveNodes,
+ final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+ this(allNodes, liveNodes, instanceConfigMap, clusterConfig, false);
}
public String getEndNodeType() {
@@ -149,13 +174,18 @@ public class Topology {
return newRoot;
}
- private Node createClusterTree(ClusterConfig clusterConfig) {
+ private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLevelOnly) {
// root
Node root = new Node();
root.setName("root");
root.setId(computeId("root"));
root.setType(Types.ROOT.name());
+ Set<String> unnecessaryTopoKeys =
+ new HashSet<>(_clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
+ unnecessaryTopoKeys.remove(_clusterTopologyConfig.getFaultZoneType());
+ unnecessaryTopoKeys.remove(_clusterTopologyConfig.getEndNodeType());
+
// TODO: Currently we add disabled instance to the topology tree. Since they are not considered
// TODO: in rebalance, maybe we should skip adding them to the tree for consistence.
for (String instanceName : _allInstances) {
@@ -167,6 +197,12 @@ public class Topology {
if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
weight = DEFAULT_NODE_WEIGHT;
}
+
+ if (faultZoneLevelOnly) {
+ // Remove unnecessary keys from the topology map. We do not need to use these to build more layers in
+ // the topology tree. The topology tree only requires FaultZoneType and EndNodeType.
+ unnecessaryTopoKeys.forEach(instanceTopologyMap::remove);
+ }
addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
} catch (IllegalArgumentException e) {
if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 69fec9b2c..dc825f2f7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -213,7 +213,10 @@ public class ClusterModelProvider {
new InstanceConfig(instanceName))
.getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
- Set<String> assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet();
+ // TODO: Figure out why streaming the keySet directly in rare cases causes ConcurrentModificationException
+ // In theory, this should not be happening since cache refresh is at beginning of the pipeline, so could be some other reason.
+ // For now, we just copy the keySet to a new HashSet to avoid the exception.
+ Set<String> assignableLiveInstanceNames = new HashSet<>(dataProvider.getAssignableLiveInstances().keySet());
Set<String> assignableLiveInstanceLogicalIds =
assignableLiveInstanceNames.stream().map(
instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName,
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 3f0aa5d9e..7cd08d86f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -10,6 +10,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -58,7 +59,8 @@ import org.testng.annotations.Test;
public class TestInstanceOperation extends ZkTestBase {
- protected final int NUM_NODE = 6;
+ private final int ZONE_COUNT = 4;
+ protected final int NUM_NODE = 10;
protected static final int START_PORT = 12918;
protected static final int PARTITIONS = 20;
@@ -145,6 +147,13 @@ public class TestInstanceOperation extends ZkTestBase {
@AfterClass
public void afterClass() {
+ // Drop all DBs
+ for (String db : _allDBs) {
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+ }
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
for (MockParticipantManager p : _participants) {
p.syncStop();
}
@@ -208,13 +217,9 @@ public class TestInstanceOperation extends ZkTestBase {
for (int i = 0; i < _participants.size(); i++) {
// If instance is not connected to ZK, replace it
if (!_participants.get(i).isConnected()) {
- // Drop bad instance from the cluster.
- _gSetupTool.getClusterManagementTool()
- .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i)));
- _participants.set(i, createParticipant(_participantNames.get(i), Integer.toString(i),
- "zone_" + i, null, true, -1));
+ // Replace the stopped participant with a new one and inherit the old instance config.
+ _participants.set(i, createParticipant(_participantNames.get(i)));
_participants.get(i).syncStart();
- continue;
}
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null);
@@ -1205,17 +1210,7 @@ public class TestInstanceOperation extends ZkTestBase {
Collections.emptySet(), Set.of(instanceToSwapInName));
}
- private MockParticipantManager createParticipant(String participantName, String logicalId, String zone,
- InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
- InstanceConfig config = new InstanceConfig.Builder().setDomain(
- String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
- logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
- .build(participantName);
- if (capacity >= 0) {
- config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
- }
- _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
-
+ private MockParticipantManager createParticipant(String participantName) {
// start dummy participants
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
StateMachineEngine stateMachine = participant.getStateMachineEngine();
@@ -1227,8 +1222,17 @@ public class TestInstanceOperation extends ZkTestBase {
private void addParticipant(String participantName, String logicalId, String zone,
InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
- MockParticipantManager participant = createParticipant(participantName, logicalId, zone,
- instanceOperation, enabled, capacity);
+ InstanceConfig config = new InstanceConfig.Builder().setDomain(
+ String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
+ logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
+ .build(participantName);
+
+ if (capacity >= 0) {
+ config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
+ }
+ _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
+
+ MockParticipantManager participant = createParticipant(participantName);
participant.syncStart();
_participants.add(participant);
@@ -1236,8 +1240,8 @@ public class TestInstanceOperation extends ZkTestBase {
}
private void addParticipant(String participantName) {
- addParticipant(participantName, Integer.toString(_participants.size()),
- "zone_" + _participants.size(), null, true, -1);
+ addParticipant(participantName, UUID.randomUUID().toString(),
+ "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
}
private void createTestDBs(long delayTime) throws InterruptedException {