You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/12/13 20:33:06 UTC

(helix) branch ApplicationClusterManager updated: Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. (#2712)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/ApplicationClusterManager by this push:
     new 5d6396e32 Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. (#2712)
5d6396e32 is described below

commit 5d6396e328492304a2f1032a7165ed73ed94f812
Author: Zachary Pinto <za...@linkedin.com>
AuthorDate: Wed Dec 13 12:33:01 2023 -0800

    Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. (#2712)
    
    * Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. This will allow for swap to work in clusters where the non-FaultZone or EndNode domain kv pairs don't directly match the swapping node.
---
 .../AbstractEvenDistributionRebalanceStrategy.java |  2 +-
 .../strategy/CrushRebalanceStrategy.java           |  2 +-
 .../strategy/MultiRoundCrushRebalanceStrategy.java |  2 +-
 .../controller/rebalancer/topology/Topology.java   | 42 +++++++++++++++++--
 .../waged/model/ClusterModelProvider.java          |  5 ++-
 .../rebalancer/TestInstanceOperation.java          | 48 ++++++++++++----------
 6 files changed, 72 insertions(+), 29 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
index 7750bd70b..c10a6a9ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
@@ -117,7 +117,7 @@ public abstract class AbstractEvenDistributionRebalanceStrategy
       Map<String, List<Node>> finalPartitionMap = null;
       Topology allNodeTopo =
           new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
-              clusterData.getClusterConfig());
+              clusterData.getClusterConfig(), true);
       // Transform current assignment to instance->partitions map, and get total partitions
       Map<Node, List<String>> nodeToPartitionMap =
           convertPartitionMap(origPartitionMap, allNodeTopo);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index 08bbaaffa..011da6777 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -77,7 +77,7 @@ public class CrushRebalanceStrategy implements RebalanceStrategy<ResourceControl
       ResourceControllerDataProvider clusterData) throws HelixException {
     Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     _clusterTopo =
-        new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+        new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
     Node topNode = _clusterTopo.getRootNode();
 
     // for log only
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
index a53257f3d..96ddfa485 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java
@@ -84,7 +84,7 @@ public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy<Resou
       ResourceControllerDataProvider clusterData) throws HelixException {
     Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     _clusterTopo =
-        new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
+        new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
     Node root = _clusterTopo.getRootNode();
 
     Map<String, List<Node>> zoneMapping = new HashMap<>();
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 4d4ebabd1..335c30fdf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -23,6 +23,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,8 +58,19 @@ public class Topology {
   private final Map<String, InstanceConfig> _instanceConfigMap;
   private final ClusterTopologyConfig _clusterTopologyConfig;
 
+  /**
+   * Create a Topology for a cluster.
+   *
+   * @param allNodes           allNodes of the given cluster.
+   * @param liveNodes          liveNodes of the given cluster.
+   * @param instanceConfigMap  instanceConfigMap of the given cluster.
+   * @param clusterConfig      clusterConfig of the given cluster.
+   * @param faultZoneLevelOnly whether to include additional non-faultZone level nodes in the
+   *                           topology tree above the end-nodes.
+   */
   public Topology(final List<String> allNodes, final List<String> liveNodes,
-      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig,
+      boolean faultZoneLevelOnly) {
     try {
       _md = MessageDigest.getInstance("SHA-1");
     } catch (NoSuchAlgorithmException ex) {
@@ -73,7 +85,20 @@ public class Topology {
           _allInstances.removeAll(_instanceConfigMap.keySet())));
     }
     _clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
-    _root = createClusterTree(clusterConfig);
+    _root = createClusterTree(clusterConfig, faultZoneLevelOnly);
+  }
+
+  /**
+   * Create a Topology for a cluster. faultZoneLevelOnly is set to false by default.
+   *
+   * @param allNodes          allNodes of the given cluster.
+   * @param liveNodes         liveNodes of the given cluster.
+   * @param instanceConfigMap instanceConfigMap of the given cluster.
+   * @param clusterConfig     clusterConfig of the given cluster.
+   */
+  public Topology(final List<String> allNodes, final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
+    this(allNodes, liveNodes, instanceConfigMap, clusterConfig, false);
   }
 
   public String getEndNodeType() {
@@ -149,13 +174,18 @@ public class Topology {
     return newRoot;
   }
 
-  private Node createClusterTree(ClusterConfig clusterConfig) {
+  private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLevelOnly) {
     // root
     Node root = new Node();
     root.setName("root");
     root.setId(computeId("root"));
     root.setType(Types.ROOT.name());
 
+    Set<String> unnecessaryTopoKeys =
+        new HashSet<>(_clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
+    unnecessaryTopoKeys.remove(_clusterTopologyConfig.getFaultZoneType());
+    unnecessaryTopoKeys.remove(_clusterTopologyConfig.getEndNodeType());
+
     // TODO: Currently we add disabled instance to the topology tree. Since they are not considered
     // TODO: in rebalance, maybe we should skip adding them to the tree for consistence.
     for (String instanceName : _allInstances) {
@@ -167,6 +197,12 @@ public class Topology {
         if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
           weight = DEFAULT_NODE_WEIGHT;
         }
+
+        if (faultZoneLevelOnly) {
+          // Remove unnecessary keys from the topology map. We do not need to use these to build more layers in
+          // the topology tree. The topology tree only requires FaultZoneType and EndNodeType.
+          unnecessaryTopoKeys.forEach(instanceTopologyMap::remove);
+        }
         addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
       } catch (IllegalArgumentException e) {
         if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 69fec9b2c..dc825f2f7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -213,7 +213,10 @@ public class ClusterModelProvider {
                 new InstanceConfig(instanceName))
             .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());
 
-    Set<String> assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet();
+    // TODO: Figure out why streaming the keySet directly in rare cases causes ConcurrentModificationException
+    //  In theory, this should not be happening since cache refresh is at beginning of the pipeline, so could be some other reason.
+    //  For now, we just copy the keySet to a new HashSet to avoid the exception.
+    Set<String> assignableLiveInstanceNames = new HashSet<>(dataProvider.getAssignableLiveInstances().keySet());
     Set<String> assignableLiveInstanceLogicalIds =
         assignableLiveInstanceNames.stream().map(
             instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName,
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 3f0aa5d9e..7cd08d86f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -10,6 +10,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -58,7 +59,8 @@ import org.testng.annotations.Test;
 
 
 public class TestInstanceOperation extends ZkTestBase {
-  protected final int NUM_NODE = 6;
+  private final int ZONE_COUNT = 4;
+  protected final int NUM_NODE = 10;
   protected static final int START_PORT = 12918;
   protected static final int PARTITIONS = 20;
 
@@ -145,6 +147,13 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @AfterClass
   public void afterClass() {
+    // Drop all DBs
+    for (String db : _allDBs) {
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
     for (MockParticipantManager p : _participants) {
       p.syncStop();
     }
@@ -208,13 +217,9 @@ public class TestInstanceOperation extends ZkTestBase {
     for (int i = 0; i < _participants.size(); i++) {
       // If instance is not connected to ZK, replace it
       if (!_participants.get(i).isConnected()) {
-        // Drop bad instance from the cluster.
-        _gSetupTool.getClusterManagementTool()
-            .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i)));
-        _participants.set(i, createParticipant(_participantNames.get(i), Integer.toString(i),
-            "zone_" + i, null, true, -1));
+        // Replace the stopped participant with a new one and inherit the old instance config.
+        _participants.set(i, createParticipant(_participantNames.get(i)));
         _participants.get(i).syncStart();
-        continue;
       }
       _gSetupTool.getClusterManagementTool()
           .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null);
@@ -1205,17 +1210,7 @@ public class TestInstanceOperation extends ZkTestBase {
         Collections.emptySet(), Set.of(instanceToSwapInName));
   }
 
-  private MockParticipantManager createParticipant(String participantName, String logicalId, String zone,
-      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
-    InstanceConfig config = new InstanceConfig.Builder().setDomain(
-            String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
-                logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
-        .build(participantName);
-    if (capacity >= 0) {
-      config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
-    }
-    _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
-
+  private MockParticipantManager createParticipant(String participantName) {
     // start dummy participants
     MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
     StateMachineEngine stateMachine = participant.getStateMachineEngine();
@@ -1227,8 +1222,17 @@ public class TestInstanceOperation extends ZkTestBase {
 
   private void addParticipant(String participantName, String logicalId, String zone,
       InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
-    MockParticipantManager participant = createParticipant(participantName, logicalId, zone,
-        instanceOperation, enabled, capacity);
+    InstanceConfig config = new InstanceConfig.Builder().setDomain(
+            String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
+                logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
+        .build(participantName);
+
+    if (capacity >= 0) {
+      config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
+    }
+    _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);
+
+    MockParticipantManager participant = createParticipant(participantName);
 
     participant.syncStart();
     _participants.add(participant);
@@ -1236,8 +1240,8 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   private void addParticipant(String participantName) {
-    addParticipant(participantName, Integer.toString(_participants.size()),
-        "zone_" + _participants.size(), null, true, -1);
+    addParticipant(participantName, UUID.randomUUID().toString(),
+        "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
   }
 
    private void createTestDBs(long delayTime) throws InterruptedException {