You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/12/01 00:13:16 UTC

[helix] branch topStatePOC updated (ed4a07c -> 4916b89)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a change to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git.


 discard ed4a07c  Fix the topstate capacity usage recording and add the test method to evaluate.
     new 4916b89  Fix the topstate capacity usage recording and add the test method to evaluate.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ed4a07c)
            \
             N -- N -- N   refs/heads/topStatePOC (4916b89)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../integration/rebalancer/WagedRebalancer/TestWagedRebalance.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)


[helix] 01/01: Fix the topstate capacity usage recording and add the test method to evaluate.

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 4916b89fe1c1d58428829f27982e86f34e902b1f
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Mon Nov 30 16:08:25 2020 -0800

    Fix the topstate capacity usage recording and add the test method to evaluate.
---
 .../ResourceTopStateUsageConstraint.java           |   2 +-
 .../rebalancer/waged/model/AssignableNode.java     |  37 +++++
 .../WagedRebalancer/TestWagedRebalance.java        | 180 +++++++++++++++++++--
 3 files changed, 209 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
index 8ba9cdc..1209c04 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -40,7 +40,7 @@ class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
       return 0;
     }
     float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
-    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+    float projectedHighestUtilization = node.getProjectedHighestTopStateUtilization(replica.getCapacity());
     return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..f8307fe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available node capacity
   private Map<String, Integer> _remainingCapacity;
+  private Map<String, Integer> _remainingTopStateCapacity;
 
   /**
    * Update the node with a ClusterDataCache. This resets the current assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
     // make a copy of max capacity
     _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
     _remainingCapacity = new HashMap<>(instanceCapacity);
+    _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
     _currentAssignedReplicaMap = new HashMap<>();
   }
@@ -93,6 +95,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
    */
   void assignInitBatch(Collection<AssignableReplica> replicas) {
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+    Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
       // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
       addToAssignmentRecord(replica);
@@ -101,12 +104,18 @@ public class AssignableNode implements Comparable<AssignableNode> {
         totalPartitionCapacity.compute(capacity.getKey(),
             (key, totalValue) -> (totalValue == null) ? capacity.getValue()
                 : totalValue + capacity.getValue());
+        if (replica.isReplicaTopState()) {
+          totalTopStatePartitionCapacity.compute(capacity.getKey(),
+              (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                  : totalValue + capacity.getValue());
+        }
       }
     }
 
     // Update the global state after all single replications' calculation is done.
     for (String capacityKey : totalPartitionCapacity.keySet()) {
       updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey));
+      updateRemainingTopStateCapacity(capacityKey, totalTopStatePartitionCapacity.get(capacityKey));
     }
   }
 
@@ -118,6 +127,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     addToAssignmentRecord(assignableReplica);
     assignableReplica.getCapacity().entrySet().stream()
             .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue()));
+    if (assignableReplica.isReplicaTopState()) {
+      assignableReplica.getCapacity().entrySet().stream()
+          .forEach(capacity -> updateRemainingTopStateCapacity(capacity.getKey(), capacity.getValue()));
+    }
   }
 
   /**
@@ -148,6 +161,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     AssignableReplica removedReplica = partitionMap.remove(partitionName);
     removedReplica.getCapacity().entrySet().stream()
         .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue()));
+    if (removedReplica.isReplicaTopState()) {
+      removedReplica.getCapacity().entrySet().stream()
+          .forEach(entry -> updateRemainingTopStateCapacity(entry.getKey(), -1 * entry.getValue()));
+    }
   }
 
   /**
@@ -239,6 +256,17 @@ public class AssignableNode implements Comparable<AssignableNode> {
     return highestCapacityUtilization;
   }
 
+  public float getProjectedHighestTopStateUtilization(Map<String, Integer> newUsage) {
+    float highestCapacityUtilization = 0;
+    for (String capacityKey : _maxAllowedCapacity.keySet()) {
+      float capacityValue = _maxAllowedCapacity.get(capacityKey);
+      float utilization = (capacityValue - _remainingTopStateCapacity.get(capacityKey) + newUsage
+          .getOrDefault(capacityKey, 0)) / capacityValue;
+      highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
+    }
+    return highestCapacityUtilization;
+  }
+
   public String getInstanceName() {
     return _instanceName;
   }
@@ -320,6 +348,15 @@ public class AssignableNode implements Comparable<AssignableNode> {
     _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage);
   }
 
+  private void updateRemainingTopStateCapacity(String capacityKey, int usage) {
+    if (!_remainingTopStateCapacity.containsKey(capacityKey)) {
+      //if the capacityKey belongs to replicas does not exist in the instance's capacity,
+      // it will be treated as if it has unlimited capacity of that capacityKey
+      return;
+    }
+    _remainingTopStateCapacity.put(capacityKey, _remainingTopStateCapacity.get(capacityKey) - usage);
+  }
+
   /**
    * Get and validate the instance capacity from instance config.
    * @throws HelixException if any required capacity key is not configured in the instance config.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index a7049f9..0e3e354 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -27,8 +28,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.math.stat.descriptive.moment.StandardDeviation;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
@@ -37,15 +41,19 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -53,12 +61,17 @@ import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.helix.model.ResourceConfig.DEFAULT_PARTITION_KEY;
+
+
 public class TestWagedRebalance extends ZkTestBase {
   protected final int NUM_NODE = 6;
   protected static final int START_PORT = 12918;
@@ -76,11 +89,8 @@ public class TestWagedRebalance extends ZkTestBase {
   private Set<String> _allDBs = new HashSet<>();
   private int _replica = 3;
 
-  private static String[] _testModels = {
-      BuiltInStateModelDefinitions.OnlineOffline.name(),
-      BuiltInStateModelDefinitions.MasterSlave.name(),
-      BuiltInStateModelDefinitions.LeaderStandby.name()
-  };
+  private static String[] _testModels =
+      {BuiltInStateModelDefinitions.OnlineOffline.name(), BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name()};
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -677,8 +687,7 @@ public class TestWagedRebalance extends ZkTestBase {
     HelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setDeactivatedNodeAwareness(true).setResources(_allDBs)
-            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-            .build();
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
     try {
       Assert.assertTrue(_clusterVerifier.verify(5000));
     } finally {
@@ -722,8 +731,7 @@ public class TestWagedRebalance extends ZkTestBase {
     ZkHelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setDeactivatedNodeAwareness(true).setResources(_allDBs)
-            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-            .build();
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
     try {
       Assert.assertTrue(_clusterVerifier.verifyByPolling());
     } finally {
@@ -749,4 +757,158 @@ public class TestWagedRebalance extends ZkTestBase {
     }
     deleteCluster(CLUSTER_NAME);
   }
+
+  public static void main(String[] args)
+      throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
+    String clusterName = "test";
+    String zkAddr = "localhost:2181";
+
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+    BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
+
+    // Read cluster parameters from ZK
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    List<InstanceConfig> instanceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().instanceConfigs(), true);
+    List<String> liveInstances =
+        instanceConfigs.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+    List<IdealState> idealStates =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().idealStates(), true);
+    List<ResourceConfig> resourceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().resourceConfigs(), true);
+
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.getRecord().getMapFields()
+          .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyMap()).clear();
+      instanceConfig.getRecord().getListFields()
+          .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyList()).clear();
+    }
+
+    clusterConfig.setInstanceCapacityKeys(Collections.singletonList("CU"));
+
+    Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap();
+    for (String key : clusterConfig.getInstanceCapacityKeys()) {
+      defaultInstanceCapacityMap
+          .put(key, clusterConfig.getDefaultInstanceCapacityMap().getOrDefault(key, 0) * 2);
+    }
+    clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacityMap);
+
+    List<IdealState> filteredIdealStates = new ArrayList<>();
+    for (IdealState idealState : idealStates) {
+      if (idealState.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) && idealState
+          .getStateModelDefRef().equals("MasterSlave")) {
+        filteredIdealStates.add(idealState);
+      }
+    }
+
+    Map<String, ResourceAssignment> baselineResult = new HashMap<>();
+    for (IdealState idealState : filteredIdealStates) {
+      ResourceAssignment resourceAssignment = new ResourceAssignment(idealState.getResourceName());
+      Map<String, Map<String, String>> partitionMap = HelixUtil
+          .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, liveInstances, idealState,
+              new ArrayList<>(idealState.getPartitionSet()), idealState.getRebalanceStrategy());
+      for (String partition : partitionMap.keySet()) {
+        resourceAssignment.addReplicaMap(new Partition(partition), partitionMap.get(partition));
+      }
+      baselineResult.put(idealState.getResourceName(), resourceAssignment);
+    }
+    outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, baselineResult);
+
+    for (IdealState idealState : filteredIdealStates) {
+      idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    }
+
+    Map<String, ResourceAssignment> utilResult = HelixUtil
+        .getTargetAssignmentForWagedFullAuto(zkAddr, clusterConfig, instanceConfigs, liveInstances,
+            filteredIdealStates, resourceConfigs);
+
+    outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, utilResult);
+  }
+
+  private static void outputAssignments(HelixDataAccessor dataAccessor, ClusterConfig clusterConfig,
+      List<IdealState> filteredIdealStates, Map<String, ResourceAssignment> assignmentMap)
+      throws IOException {
+    Map<String, Integer> defaultWeightMap = clusterConfig.getDefaultPartitionWeightMap();
+
+    Map<String, Integer> partitionCountMap = new HashMap<>();
+    Map<String, Integer> topStateCountMap = new HashMap<>();
+    Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>();
+    Map<String, Map<String, Integer>> topStateWeightMap = new HashMap<>();
+
+    for (IdealState idealState : filteredIdealStates) {
+      StateModelDefinition stateModelDefinition =
+          BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+              .getStateModelDefinition();
+
+      ResourceConfig resourceConfig = dataAccessor
+          .getProperty(dataAccessor.keyBuilder().resourceConfig(idealState.getResourceName()));
+
+      Map<String, Map<String, Integer>> fullWeightMap = resourceConfig.getPartitionCapacityMap();
+
+      for (String partition : idealState.getPartitionSet()) {
+        Map<String, String> instanceStateMap =
+            assignmentMap.get(idealState.getResourceName()).getRecord().getMapField(partition);
+        Map<String, Integer> weightMap = new HashMap<>(defaultWeightMap);
+        weightMap.putAll(
+            fullWeightMap.getOrDefault(partition, fullWeightMap.get(DEFAULT_PARTITION_KEY)));
+
+        for (String instanceName : instanceStateMap.keySet()) {
+          String state = instanceStateMap.get(instanceName);
+          if (state.equals(stateModelDefinition.getTopState())) {
+            topStateCountMap.put(instanceName, 1 + topStateCountMap.getOrDefault(instanceName, 0));
+
+            Map<String, Integer> topStateWeights =
+                topStateWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+            for (String key : weightMap.keySet()) {
+              topStateWeights.put(key, topStateWeights.getOrDefault(key, 0) + weightMap.get(key));
+            }
+          }
+
+          partitionCountMap.put(instanceName, 1 + partitionCountMap.getOrDefault(instanceName, 0));
+
+          Map<String, Integer> weights =
+              partitionWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+          for (String key : weightMap.keySet()) {
+            weights.put(key, weights.getOrDefault(key, 0) + weightMap.get(key));
+          }
+        }
+      }
+    }
+    //System.out.println("Partition weights: " + partitionWeightMap);
+    //System.out.println("Topstate partition weights: " + topStateWeightMap);
+
+    List<Integer> regWeightList =
+        partitionWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+    List<Integer> weightList =
+        topStateWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+    int max = weightList.stream().max(Integer::compareTo).get();
+    int min = weightList.stream().min(Integer::compareTo).get();
+    StandardDeviation standardDeviation = new StandardDeviation();
+
+    double[] nums = new double[weightList.size()];
+    for (int i = 0; i < weightList.size(); i++) {
+      nums[i] = weightList.get(i);
+    }
+    double std = standardDeviation.evaluate(nums);
+
+    System.out.println("Topstate Weights Max " + max + " Min " + min + " STD " + std);
+
+    int regmax = regWeightList.stream().max(Integer::compareTo).get();
+    int regmin = regWeightList.stream().min(Integer::compareTo).get();
+    double[] regnums = new double[regWeightList.size()];
+    for (int i = 0; i < regWeightList.size(); i++) {
+      regnums[i] = regWeightList.get(i);
+    }
+    double regstd = standardDeviation.evaluate(regnums);
+
+    System.out.println("Regular Weights Max " + regmax + " Min " + regmin + " STD " + regstd);
+  }
 }