You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:35 UTC

[helix] 32/37: Separate AssignableNode properties by Immutable and Mutable (#485)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0b87426cfd92ab319234d6b988269b49a47107ae
Author: Yi Wang <i3...@gmail.com>
AuthorDate: Sat Sep 28 00:49:13 2019 -0700

    Separate AssignableNode properties by Immutable and Mutable (#485)
    
    Make AssignableNode properties different by Immutable and Mutable
    - It helps detect any wrong usage of these properties early
---
 .../waged/constraints/NodeCapacityConstraint.java  |   2 +-
 .../rebalancer/waged/model/AssignableNode.java     | 119 ++++++++++-----------
 .../waged/model/ClusterModelProvider.java          |   2 +-
 .../constraints/TestNodeCapacityConstraint.java    |   4 +-
 .../rebalancer/waged/model/TestAssignableNode.java |  15 ++-
 5 files changed, 66 insertions(+), 76 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
index 5fc2faf..827d6ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
@@ -30,7 +30,7 @@ class NodeCapacityConstraint extends HardConstraint {
   @Override
   boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
-    Map<String, Integer> nodeCapacity = node.getCurrentCapacity();
+    Map<String, Integer> nodeCapacity = node.getRemainingCapacity();
     Map<String, Integer> replicaCapacity = replica.getCapacity();
 
     for (String key : replicaCapacity.keySet()) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 20de6da..2a68e15 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +34,10 @@ import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
 /**
  * This class represents a possible allocation of the replication.
  * Note that any usage updates to the AssignableNode are not thread safe.
@@ -42,39 +45,25 @@ import org.slf4j.LoggerFactory;
 public class AssignableNode implements Comparable<AssignableNode> {
   private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName());
 
-  // basic node information
+  // Immutable Instance Properties
   private final String _instanceName;
-  private Set<String> _instanceTags;
-  private String _faultZone;
-  private Map<String, List<String>> _disabledPartitionsMap;
-  private Map<String, Integer> _maxCapacity;
-  private int _maxPartition; // maximum number of the partitions that can be assigned to the node.
-
+  private final String _faultZone;
+  // maximum number of the partitions that can be assigned to the instance.
+  private final int _maxPartition;
+  private final ImmutableSet<String> _instanceTags;
+  private final ImmutableMap<String, List<String>> _disabledPartitionsMap;
+  private final ImmutableMap<String, Integer> _maxAllowedCapacity;
+
+  // Mutable (Dynamic) Instance Properties
   // A map of <resource name, <partition name, replica>> that tracks the replicas assigned to the
   // node.
   private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available node capacity
-  private Map<String, Integer> _currentCapacityMap;
+  private Map<String, Integer> _remainingCapacity;
   // The maximum capacity utilization (0.0 - 1.0) across all the capacity categories.
   private float _highestCapacityUtilization;
 
   /**
-   * @param clusterConfig
-   * @param instanceConfig
-   * @param instanceName
-   */
-  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) {
-    _instanceName = instanceName;
-    refresh(clusterConfig, instanceConfig);
-  }
-
-  private void reset() {
-    _currentAssignedReplicaMap = new HashMap<>();
-    _currentCapacityMap = new HashMap<>();
-    _highestCapacityUtilization = 0;
-  }
-
-  /**
    * Update the node with a ClusterDataCache. This resets the current assignment and recalculates
    * currentCapacity.
    * NOTE: While this is required to be used in the constructor, this can also be used when the
@@ -82,29 +71,31 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * refreshed. This is under the assumption that the capacity mappings of InstanceConfig and
    * ResourceConfig could
    * subject to change. If the assumption is no longer true, this function should become private.
-   * @param clusterConfig - the Cluster Config of the cluster where the node is located
-   * @param instanceConfig - the Instance Config of the node
    */
-  private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
-    reset();
-
+  AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) {
+    _instanceName = instanceName;
     Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig);
-    _currentCapacityMap.putAll(instanceCapacity);
     _faultZone = computeFaultZone(clusterConfig, instanceConfig);
-    _instanceTags = new HashSet<>(instanceConfig.getTags());
-    _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
-    _maxCapacity = instanceCapacity;
+    _instanceTags = ImmutableSet.copyOf(instanceConfig.getTags());
+    _disabledPartitionsMap = ImmutableMap.copyOf(instanceConfig.getDisabledPartitionsMap());
+    // make a copy of max capacity
+    _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
+    _remainingCapacity = new HashMap<>(instanceCapacity);
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+    _currentAssignedReplicaMap = new HashMap<>();
+    _highestCapacityUtilization = 0f;
   }
 
   /**
    * This function should only be used to assign a set of new partitions that are not allocated on
-   * this node.
+   * this node. It's because the any exception could occur at the middle of batch assignment and the
+   * previous finished assignment cannot be reverted
    * Using this function avoids the overhead of updating capacity repeatedly.
    */
-  void assignNewBatch(Collection<AssignableReplica> replicas) {
+  void assignInitBatch(Collection<AssignableReplica> replicas) {
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
+      // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
       addToAssignmentRecord(replica);
       // increment the capacity requirement according to partition's capacity configuration.
       for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
@@ -115,8 +106,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
 
     // Update the global state after all single replications' calculation is done.
-    for (String key : totalPartitionCapacity.keySet()) {
-      updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
+    for (String capacityKey : totalPartitionCapacity.keySet()) {
+      updateCapacityAndUtilization(capacityKey, totalPartitionCapacity.get(capacityKey));
     }
   }
 
@@ -127,7 +118,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   void assign(AssignableReplica assignableReplica) {
     addToAssignmentRecord(assignableReplica);
     assignableReplica.getCapacity().entrySet().stream()
-        .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
+            .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
   }
 
   /**
@@ -218,8 +209,16 @@ public class AssignableNode implements Comparable<AssignableNode> {
   /**
    * @return The current available capacity.
    */
-  public Map<String, Integer> getCurrentCapacity() {
-    return _currentCapacityMap;
+  public Map<String, Integer> getRemainingCapacity() {
+    return _remainingCapacity;
+  }
+
+  /**
+   * @return A map of <capacity category, capacity number> that describes the max capacity of the
+   *         node.
+   */
+  public Map<String, Integer> getMaxCapacity() {
+    return _maxAllowedCapacity;
   }
 
   /**
@@ -228,7 +227,6 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * categories.
    * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
    * return 0.9.
-   *
    * @return The highest utilization number of the node among all the capacity category.
    */
   public float getHighestCapacityUtilization() {
@@ -260,14 +258,6 @@ public class AssignableNode implements Comparable<AssignableNode> {
   }
 
   /**
-   * @return A map of <capacity category, capacity number> that describes the max capacity of the
-   *         node.
-   */
-  public Map<String, Integer> getMaxCapacity() {
-    return _maxCapacity;
-  }
-
-  /**
    * @return The max partition count that are allowed to be allocated on the node.
    */
   public int getMaxPartition() {
@@ -294,14 +284,15 @@ public class AssignableNode implements Comparable<AssignableNode> {
     if (topologyStr == null || faultZoneType == null) {
       LOG.debug("Topology configuration is not complete. Topology define: {}, Fault Zone Type: {}",
           topologyStr, faultZoneType);
-      // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault zone.
+      // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault
+      // zone.
       String zoneId = instanceConfig.getZoneId();
       return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
     } else {
       // Get the fault zone information from the complete topology definition.
       String[] topologyDef = topologyStr.trim().split("/");
-      if (topologyDef.length == 0 ||
-          Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
+      if (topologyDef.length == 0
+          || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
         throw new HelixException(
             "The configured topology definition is empty or does not contain the fault zone type.");
       }
@@ -350,22 +341,22 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
   }
 
-  private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) {
-    if (_currentCapacityMap.containsKey(capacityKey)) {
-      int newCapacity = _currentCapacityMap.get(capacityKey) - valueToSubtract;
-      _currentCapacityMap.put(capacityKey, newCapacity);
-      // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
-      float utilization = Math.min(
-          (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1);
-      _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization);
+  private void updateCapacityAndUtilization(String capacityKey, int usage) {
+    if (!_remainingCapacity.containsKey(capacityKey)) {
+      //if the capacityKey belongs to replicas does not exist in the instance's capacity,
+      // it will be treated as if it has unlimited capacity of that capacityKey
+      return;
     }
-    // else if the capacityKey does not exist in the capacity map, this method essentially becomes
-    // a NOP; in other words, this node will be treated as if it has unlimited capacity.
+    int newCapacity = _remainingCapacity.get(capacityKey) - usage;
+    _remainingCapacity.put(capacityKey, newCapacity);
+    // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
+    float utilization = Math.min((float) (_maxAllowedCapacity.get(capacityKey) - newCapacity)
+        / _maxAllowedCapacity.get(capacityKey), 1);
+    _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization);
   }
 
   /**
    * Get and validate the instance capacity from instance config.
-   *
    * @throws HelixException if any required capacity key is not configured in the instance config.
    */
   private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 2b53422..276b998 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -80,7 +80,7 @@ public class ClusterModelProvider {
             bestPossibleAssignment, allocatedReplicas);
 
     // Update the allocated replicas to the assignable nodes.
-    assignableNodes.stream().forEach(node -> node.assignNewBatch(
+    assignableNodes.stream().forEach(node -> node.assignInitBatch(
         allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet())));
 
     // Construct and initialize cluster context.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
index 511f881..4365a42 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
@@ -39,7 +39,7 @@ public class TestNodeCapacityConstraint {
   @Test
   public void testConstraintValidWhenNodeHasEnoughSpace() {
     String key = "testKey";
-    when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key,  10));
+    when(_testNode.getRemainingCapacity()).thenReturn(ImmutableMap.of(key,  10));
     when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
     Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
   }
@@ -47,7 +47,7 @@ public class TestNodeCapacityConstraint {
   @Test
   public void testConstraintInValidWhenNodeHasInsufficientSpace() {
     String key = "testKey";
-    when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key,  1));
+    when(_testNode.getRemainingCapacity()).thenReturn(ImmutableMap.of(key,  1));
     when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
     Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 6975901..b48587f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -19,9 +19,10 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -37,8 +38,6 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.when;
-
 public class TestAssignableNode extends AbstractTestClusterModel {
   @BeforeClass
   public void initialize() {
@@ -65,7 +64,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
-    assignableNode.assignNewBatch(assignmentSet);
+    assignableNode.assignInitBatch(assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
     Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
@@ -74,7 +73,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
     Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
-    Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap);
     Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
         expectedAssignmentSet1);
@@ -114,7 +113,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
     Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
-    Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap);
     Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
         expectedAssignmentSet1);
@@ -147,7 +146,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
     Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
-    Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap);
     Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
     Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
         expectedAssignmentSet1);
@@ -184,7 +183,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
-    assignableNode.assignNewBatch(assignmentSet);
+    assignableNode.assignInitBatch(assignmentSet);
     AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
         testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
     assignableNode.assign(duplicateReplica);