You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/12/01 00:09:24 UTC

[helix] 03/03: Fix the topstate capacity usage recording and add the test method to evaluate.

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ed4a07caf7c7b6847a389b2b9b8a1a99f6dcc66e
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Mon Nov 30 16:08:25 2020 -0800

    Fix the topstate capacity usage recording and add the test method to evaluate.
---
 .../ResourceTopStateUsageConstraint.java           |   2 +-
 .../rebalancer/waged/model/AssignableNode.java     |  37 +++++
 .../WagedRebalancer/TestWagedRebalance.java        | 180 +++++++++++++++++++--
 3 files changed, 209 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
index 8ba9cdc..1209c04 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -40,7 +40,7 @@ class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
       return 0;
     }
     float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
-    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+    float projectedHighestUtilization = node.getProjectedHighestTopStateUtilization(replica.getCapacity());
     return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..f8307fe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available node capacity
   private Map<String, Integer> _remainingCapacity;
+  private Map<String, Integer> _remainingTopStateCapacity;
 
   /**
    * Update the node with a ClusterDataCache. This resets the current assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
     // make a copy of max capacity
     _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
     _remainingCapacity = new HashMap<>(instanceCapacity);
+    _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
     _currentAssignedReplicaMap = new HashMap<>();
   }
@@ -93,6 +95,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
    */
   void assignInitBatch(Collection<AssignableReplica> replicas) {
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+    Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
       // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
       addToAssignmentRecord(replica);
@@ -101,12 +104,18 @@ public class AssignableNode implements Comparable<AssignableNode> {
         totalPartitionCapacity.compute(capacity.getKey(),
             (key, totalValue) -> (totalValue == null) ? capacity.getValue()
                 : totalValue + capacity.getValue());
+        if (replica.isReplicaTopState()) {
+          totalTopStatePartitionCapacity.compute(capacity.getKey(),
+              (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                  : totalValue + capacity.getValue());
+        }
       }
     }
 
     // Update the global state after all single replications' calculation is done.
     for (String capacityKey : totalPartitionCapacity.keySet()) {
       updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey));
+      updateRemainingTopStateCapacity(capacityKey, totalTopStatePartitionCapacity.get(capacityKey));
     }
   }
 
@@ -118,6 +127,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     addToAssignmentRecord(assignableReplica);
     assignableReplica.getCapacity().entrySet().stream()
             .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue()));
+    if (assignableReplica.isReplicaTopState()) {
+      assignableReplica.getCapacity().entrySet().stream()
+          .forEach(capacity -> updateRemainingTopStateCapacity(capacity.getKey(), capacity.getValue()));
+    }
   }
 
   /**
@@ -148,6 +161,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     AssignableReplica removedReplica = partitionMap.remove(partitionName);
     removedReplica.getCapacity().entrySet().stream()
         .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue()));
+    if (removedReplica.isReplicaTopState()) {
+      removedReplica.getCapacity().entrySet().stream()
+          .forEach(entry -> updateRemainingTopStateCapacity(entry.getKey(), -1 * entry.getValue()));
+    }
   }
 
   /**
@@ -239,6 +256,17 @@ public class AssignableNode implements Comparable<AssignableNode> {
     return highestCapacityUtilization;
   }
 
+  public float getProjectedHighestTopStateUtilization(Map<String, Integer> newUsage) {
+    float highestCapacityUtilization = 0;
+    for (String capacityKey : _maxAllowedCapacity.keySet()) {
+      float capacityValue = _maxAllowedCapacity.get(capacityKey);
+      float utilization = (capacityValue - _remainingTopStateCapacity.get(capacityKey) + newUsage
+          .getOrDefault(capacityKey, 0)) / capacityValue;
+      highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
+    }
+    return highestCapacityUtilization;
+  }
+
   public String getInstanceName() {
     return _instanceName;
   }
@@ -320,6 +348,15 @@ public class AssignableNode implements Comparable<AssignableNode> {
     _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage);
   }
 
+  private void updateRemainingTopStateCapacity(String capacityKey, int usage) {
+    if (!_remainingTopStateCapacity.containsKey(capacityKey)) {
+      //if the capacityKey belongs to replicas does not exist in the instance's capacity,
+      // it will be treated as if it has unlimited capacity of that capacityKey
+      return;
+    }
+    _remainingTopStateCapacity.put(capacityKey, _remainingTopStateCapacity.get(capacityKey) - usage);
+  }
+
   /**
    * Get and validate the instance capacity from instance config.
    * @throws HelixException if any required capacity key is not configured in the instance config.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index a7049f9..09a0baa 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -27,8 +28,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.math.stat.descriptive.moment.StandardDeviation;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
@@ -37,15 +41,19 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -53,12 +61,17 @@ import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.helix.model.ResourceConfig.DEFAULT_PARTITION_KEY;
+
+
 public class TestWagedRebalance extends ZkTestBase {
   protected final int NUM_NODE = 6;
   protected static final int START_PORT = 12918;
@@ -76,11 +89,8 @@ public class TestWagedRebalance extends ZkTestBase {
   private Set<String> _allDBs = new HashSet<>();
   private int _replica = 3;
 
-  private static String[] _testModels = {
-      BuiltInStateModelDefinitions.OnlineOffline.name(),
-      BuiltInStateModelDefinitions.MasterSlave.name(),
-      BuiltInStateModelDefinitions.LeaderStandby.name()
-  };
+  private static String[] _testModels =
+      {BuiltInStateModelDefinitions.OnlineOffline.name(), BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name()};
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -677,8 +687,7 @@ public class TestWagedRebalance extends ZkTestBase {
     HelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setDeactivatedNodeAwareness(true).setResources(_allDBs)
-            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-            .build();
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
     try {
       Assert.assertTrue(_clusterVerifier.verify(5000));
     } finally {
@@ -722,8 +731,7 @@ public class TestWagedRebalance extends ZkTestBase {
     ZkHelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setDeactivatedNodeAwareness(true).setResources(_allDBs)
-            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-            .build();
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
     try {
       Assert.assertTrue(_clusterVerifier.verifyByPolling());
     } finally {
@@ -749,4 +757,158 @@ public class TestWagedRebalance extends ZkTestBase {
     }
     deleteCluster(CLUSTER_NAME);
   }
+
+  public static void main(String[] args)
+      throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
+    String clusterName = "ESPRESSO_MT1";
+    String zkAddr = "zk-ltx1-espresso.stg.linkedin.com:12913";
+
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+    BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
+
+    // Read cluster parameters from ZK
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    List<InstanceConfig> instanceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().instanceConfigs(), true);
+    List<String> liveInstances =
+        instanceConfigs.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+    List<IdealState> idealStates =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().idealStates(), true);
+    List<ResourceConfig> resourceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().resourceConfigs(), true);
+
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.getRecord().getMapFields()
+          .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyMap()).clear();
+      instanceConfig.getRecord().getListFields()
+          .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyList()).clear();
+    }
+
+    clusterConfig.setInstanceCapacityKeys(Collections.singletonList("CU"));
+
+    Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap();
+    for (String key : clusterConfig.getInstanceCapacityKeys()) {
+      defaultInstanceCapacityMap
+          .put(key, clusterConfig.getDefaultInstanceCapacityMap().getOrDefault(key, 0) * 2);
+    }
+    clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacityMap);
+
+    List<IdealState> filteredIdealStates = new ArrayList<>();
+    for (IdealState idealState : idealStates) {
+      if (idealState.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) && idealState
+          .getStateModelDefRef().equals("MasterSlave")) {
+        filteredIdealStates.add(idealState);
+      }
+    }
+
+    Map<String, ResourceAssignment> baselineResult = new HashMap<>();
+    for (IdealState idealState : filteredIdealStates) {
+      ResourceAssignment resourceAssignment = new ResourceAssignment(idealState.getResourceName());
+      Map<String, Map<String, String>> partitionMap = HelixUtil
+          .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, liveInstances, idealState,
+              new ArrayList<>(idealState.getPartitionSet()), idealState.getRebalanceStrategy());
+      for (String partition : partitionMap.keySet()) {
+        resourceAssignment.addReplicaMap(new Partition(partition), partitionMap.get(partition));
+      }
+      baselineResult.put(idealState.getResourceName(), resourceAssignment);
+    }
+    outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, baselineResult);
+
+    for (IdealState idealState : filteredIdealStates) {
+      idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    }
+
+    Map<String, ResourceAssignment> utilResult = HelixUtil
+        .getTargetAssignmentForWagedFullAuto(zkAddr, clusterConfig, instanceConfigs, liveInstances,
+            filteredIdealStates, resourceConfigs);
+
+    outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, utilResult);
+  }
+
+  private static void outputAssignments(HelixDataAccessor dataAccessor, ClusterConfig clusterConfig,
+      List<IdealState> filteredIdealStates, Map<String, ResourceAssignment> assignmentMap)
+      throws IOException {
+    Map<String, Integer> defaultWeightMap = clusterConfig.getDefaultPartitionWeightMap();
+
+    Map<String, Integer> partitionCountMap = new HashMap<>();
+    Map<String, Integer> topStateCountMap = new HashMap<>();
+    Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>();
+    Map<String, Map<String, Integer>> topStateWeightMap = new HashMap<>();
+
+    for (IdealState idealState : filteredIdealStates) {
+      StateModelDefinition stateModelDefinition =
+          BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+              .getStateModelDefinition();
+
+      ResourceConfig resourceConfig = dataAccessor
+          .getProperty(dataAccessor.keyBuilder().resourceConfig(idealState.getResourceName()));
+
+      Map<String, Map<String, Integer>> fullWeightMap = resourceConfig.getPartitionCapacityMap();
+
+      for (String partition : idealState.getPartitionSet()) {
+        Map<String, String> instanceStateMap =
+            assignmentMap.get(idealState.getResourceName()).getRecord().getMapField(partition);
+        Map<String, Integer> weightMap = new HashMap<>(defaultWeightMap);
+        weightMap.putAll(
+            fullWeightMap.getOrDefault(partition, fullWeightMap.get(DEFAULT_PARTITION_KEY)));
+
+        for (String instanceName : instanceStateMap.keySet()) {
+          String state = instanceStateMap.get(instanceName);
+          if (state.equals(stateModelDefinition.getTopState())) {
+            topStateCountMap.put(instanceName, 1 + topStateCountMap.getOrDefault(instanceName, 0));
+
+            Map<String, Integer> topStateWeights =
+                topStateWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+            for (String key : weightMap.keySet()) {
+              topStateWeights.put(key, topStateWeights.getOrDefault(key, 0) + weightMap.get(key));
+            }
+          }
+
+          partitionCountMap.put(instanceName, 1 + partitionCountMap.getOrDefault(instanceName, 0));
+
+          Map<String, Integer> weights =
+              partitionWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+          for (String key : weightMap.keySet()) {
+            weights.put(key, weights.getOrDefault(key, 0) + weightMap.get(key));
+          }
+        }
+      }
+    }
+    //System.out.println("Partition weights: " + partitionWeightMap);
+    //System.out.println("Topstate partition weights: " + topStateWeightMap);
+
+    List<Integer> regWeightList =
+        partitionWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+    List<Integer> weightList =
+        topStateWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+    int max = weightList.stream().max(Integer::compareTo).get();
+    int min = weightList.stream().min(Integer::compareTo).get();
+    StandardDeviation standardDeviation = new StandardDeviation();
+
+    double[] nums = new double[weightList.size()];
+    for (int i = 0; i < weightList.size(); i++) {
+      nums[i] = weightList.get(i);
+    }
+    double std = standardDeviation.evaluate(nums);
+
+    System.out.println("Topstate Weights Max " + max + " Min " + min + " STD " + std);
+
+    int regmax = regWeightList.stream().max(Integer::compareTo).get();
+    int regmin = regWeightList.stream().min(Integer::compareTo).get();
+    double[] regnums = new double[regWeightList.size()];
+    for (int i = 0; i < regWeightList.size(); i++) {
+      regnums[i] = regWeightList.get(i);
+    }
+    double regstd = standardDeviation.evaluate(regnums);
+
+    System.out.println("Regular Weights Max " + regmax + " Min " + regmin + " STD " + regstd);
+  }
 }