You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:31 UTC

[helix] 28/37: Adjust the expected replica count according to fault zone count. (#476)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 679547fb511754cce2863a51e10ffcc6d8ab98b0
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Tue Sep 17 13:41:56 2019 -0700

    Adjust the expected replica count according to fault zone count. (#476)
    
    The rebalancer should determine the expected replica count according to the fault zone instead of the node count only.
---
 .../rebalancer/waged/model/AssignableNode.java     | 56 ++++++++++------------
 .../waged/model/ClusterModelProvider.java          | 28 ++++++-----
 .../waged/model/ClusterModelTestHelper.java        |  3 +-
 .../rebalancer/waged/model/TestAssignableNode.java | 24 ++++------
 .../rebalancer/waged/model/TestClusterModel.java   |  3 +-
 .../waged/model/TestClusterModelProvider.java      | 33 ++++++++-----
 6 files changed, 76 insertions(+), 71 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 6966353..a3460fb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -64,12 +64,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * @param clusterConfig
    * @param instanceConfig
    * @param instanceName
-   * @param existingAssignment A collection of replicas that have been pre-allocated to the node.
    */
-  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName,
-      Collection<AssignableReplica> existingAssignment) {
+  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) {
     _instanceName = instanceName;
-    refresh(clusterConfig, instanceConfig, existingAssignment);
+    refresh(clusterConfig, instanceConfig);
   }
 
   private void reset() {
@@ -88,10 +86,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * subject to change. If the assumption is no longer true, this function should become private.
    * @param clusterConfig - the Cluster Config of the cluster where the node is located
    * @param instanceConfig - the Instance Config of the node
-   * @param existingAssignment - all the existing replicas that are current assigned to the node
    */
-  private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
-      Collection<AssignableReplica> existingAssignment) {
+  private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
     reset();
 
     Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig);
@@ -101,8 +97,29 @@ public class AssignableNode implements Comparable<AssignableNode> {
     _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
     _maxCapacity = instanceCapacity;
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+  }
+
+  /**
+   * This function should only be used to assign a set of new partitions that are not allocated on
+   * this node.
+   * Using this function avoids the overhead of updating capacity repeatedly.
+   */
+  void assignNewBatch(Collection<AssignableReplica> replicas) {
+    Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+    for (AssignableReplica replica : replicas) {
+      addToAssignmentRecord(replica);
+      // increment the capacity requirement according to partition's capacity configuration.
+      for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
+        totalPartitionCapacity.compute(capacity.getKey(),
+            (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                : totalValue + capacity.getValue());
+      }
+    }
 
-    assignNewBatch(existingAssignment);
+    // Update the global state after all single replications' calculation is done.
+    for (String key : totalPartitionCapacity.keySet()) {
+      updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
+    }
   }
 
   /**
@@ -315,29 +332,6 @@ public class AssignableNode implements Comparable<AssignableNode> {
   }
 
   /**
-   * This function should only be used to assign a set of new partitions that are not allocated on
-   * this node.
-   * Using this function avoids the overhead of updating capacity repeatedly.
-   */
-  private void assignNewBatch(Collection<AssignableReplica> replicas) {
-    Map<String, Integer> totalPartitionCapacity = new HashMap<>();
-    for (AssignableReplica replica : replicas) {
-      addToAssignmentRecord(replica);
-      // increment the capacity requirement according to partition's capacity configuration.
-      for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
-        totalPartitionCapacity.compute(capacity.getKey(),
-            (key, totalValue) -> (totalValue == null) ? capacity.getValue()
-                : totalValue + capacity.getValue());
-      }
-    }
-
-    // Update the global state after all single replications' calculation is done.
-    for (String key : totalPartitionCapacity.keySet()) {
-      updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
-    }
-  }
-
-  /**
    * @throws HelixException if the replica has already been assigned to the node.
    */
   private void addToAssignmentRecord(AssignableReplica replica) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 3570164..20024c7 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -62,10 +62,15 @@ public class ClusterModelProvider {
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
       Map<String, ResourceAssignment> baselineAssignment,
       Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // Construct all the assignable nodes and initialize with the allocated replicas.
+    Set<AssignableNode> assignableNodes =
+        parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
+            activeInstances);
+
     // Generate replica objects for all the resource partitions.
     // <resource, replica set>
     Map<String, Set<AssignableReplica>> replicaMap =
-        parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
+        parseAllReplicas(dataProvider, resourceMap, assignableNodes);
 
     // Check if the replicas need to be reassigned.
     Map<String, Set<AssignableReplica>> allocatedReplicas =
@@ -74,10 +79,9 @@ public class ClusterModelProvider {
         findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
             bestPossibleAssignment, allocatedReplicas);
 
-    // Construct all the assignable nodes and initialize with the allocated replicas.
-    Set<AssignableNode> assignableNodes =
-        parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
-            activeInstances, allocatedReplicas);
+    // Update the allocated replicas to the assignable nodes.
+    assignableNodes.stream().forEach(node -> node.assignNewBatch(
+        allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet())));
 
     // Construct and initialize cluster context.
     ClusterContext context = new ClusterContext(
@@ -171,15 +175,13 @@ public class ClusterModelProvider {
    * @param clusterConfig     The cluster configuration.
    * @param instanceConfigMap A map of all the instance configuration.
    * @param activeInstances   All the instances that are online and enabled.
-   * @param allocatedReplicas A map of all the assigned replicas, which will not be reassigned during the rebalance.
    * @return A map of assignable node set, <InstanceName, node set>.
    */
   private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
-      Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances,
-      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+      Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances) {
     return activeInstances.stream().map(
         instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
-            instanceName, allocatedReplicas.getOrDefault(instanceName, Collections.emptySet())))
+            instanceName))
         .collect(Collectors.toSet());
   }
 
@@ -188,11 +190,12 @@ public class ClusterModelProvider {
    *
    * @param dataProvider The cluster status cache that contains the current cluster status.
    * @param resourceMap  All the valid resources that are managed by the rebalancer.
+   * @param assignableNodes All the active assignable nodes.
    * @return A map of assignable replica set, <ResourceName, replica set>.
    */
   private static Map<String, Set<AssignableReplica>> parseAllReplicas(
       ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
-      int instanceCount) {
+      Set<AssignableNode> assignableNodes) {
     Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
     ClusterConfig clusterConfig = dataProvider.getClusterConfig();
 
@@ -211,8 +214,11 @@ public class ClusterModelProvider {
                 is.getStateModelDefRef(), resourceName));
       }
 
+      int activeFaultZoneCount =
+          assignableNodes.stream().map(node -> node.getFaultZone()).collect(Collectors.toSet())
+              .size();
       Map<String, Integer> stateCountMap =
-          def.getStateCountMap(instanceCount, is.getReplicaCount(instanceCount));
+          def.getStateCountMap(activeFaultZoneCount, is.getReplicaCount(assignableNodes.size()));
 
       for (String partition : is.getPartitionSet()) {
         for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 76f1141..08143c6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -43,8 +43,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
     Set<AssignableNode> nodeSet = new HashSet<>();
     testCache.getInstanceConfigMap().values().stream()
             .forEach(config -> nodeSet.add(new AssignableNode(testCache.getClusterConfig(),
-                    testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
-                    Collections.emptyList())));
+                    testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName())));
     return nodeSet;
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 92a6998..6975901 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -64,7 +64,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     expectedCapacityMap.put("item3", 30);
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+    assignableNode.assignNewBatch(assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
@@ -167,8 +168,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
     AssignableReplica removingReplica = new AssignableReplica(testCache.getClusterConfig(),
         testCache.getResourceConfig(_resourceNames.get(1)), _partitionNames.get(2) + "non-exist",
         "MASTER", 1);
@@ -183,7 +183,8 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId, assignmentSet);
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+    assignableNode.assignNewBatch(assignmentSet);
     AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
         testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
     assignableNode.assign(duplicateReplica);
@@ -206,8 +207,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
   }
 
   @Test
@@ -227,8 +227,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/");
 
@@ -245,8 +244,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
     assignableNode = new AssignableNode(testCache.getClusterConfig(),
-        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId,
-        Collections.emptyList());
+        testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
   }
@@ -259,8 +257,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
 
     AssignableNode assignableNode =
-        new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
-            Collections.emptyList());
+        new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
     Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
   }
 
@@ -274,7 +271,6 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
     testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
 
-    new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
-        Collections.emptyList());
+    new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
index a45b729..5112413 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -43,8 +43,7 @@ public class TestClusterModel extends AbstractTestClusterModel {
     Set<AssignableNode> nodeSet = new HashSet<>();
     testCache.getInstanceConfigMap().values().stream().forEach(config -> nodeSet.add(
         new AssignableNode(testCache.getClusterConfig(),
-            testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName(),
-            Collections.emptyList())));
+            testCache.getInstanceConfigMap().get(_testInstanceId), config.getInstanceName())));
     return nodeSet;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 1ec92a9..ad608b6 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,6 +19,14 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -34,14 +42,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.when;
 
@@ -111,7 +111,18 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     Assert.assertEquals(
         clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
             .collect(Collectors.toSet()), _instances);
-    // Shall have 2 resources and 12 replicas
+    // Shall have 2 resources and 4 replicas, since all nodes are in the same fault zone.
+    Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
+    Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
+        .allMatch(replicaSet -> replicaSet.size() == 4));
+
+    // Adjust instance fault zone, so they have different fault zones.
+    testCache.getInstanceConfigMap().values().stream()
+        .forEach(config -> config.setZoneId(config.getInstanceName()));
+    clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
+            .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
+        _instances, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+    // Shall have 2 resources and 12 replicas after fault zone adjusted.
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
         .allMatch(replicaSet -> replicaSet.size() == 12));
@@ -197,10 +208,10 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
         _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
             Collections.singleton(changedResourceName)), Collections.emptyMap(),
         bestPossibleAssignment);
-    // There should be no existing assignment for all the resource except for resource2.
+    // There should be no existing assignment for all the resource except for resource2
     Assert.assertEquals(clusterModel.getContext().getAssignmentForFaultZoneMap().size(), 1);
     Map<String, Set<String>> resourceAssignmentMap =
-        clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testFaultZoneId);
+        clusterModel.getContext().getAssignmentForFaultZoneMap().get(_testInstanceId);
     // Should be only resource2 in the map
     Assert.assertEquals(resourceAssignmentMap.size(), 1);
     for (String resource : _resourceNames) {