You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/12/01 00:13:17 UTC
[helix] 01/01: Fix the topstate capacity usage recording and add
the test method to evaluate.
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4916b89fe1c1d58428829f27982e86f34e902b1f
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Mon Nov 30 16:08:25 2020 -0800
Fix the topstate capacity usage recording and add the test method to evaluate.
---
.../ResourceTopStateUsageConstraint.java | 2 +-
.../rebalancer/waged/model/AssignableNode.java | 37 +++++
.../WagedRebalancer/TestWagedRebalance.java | 180 +++++++++++++++++++--
3 files changed, 209 insertions(+), 10 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
index 8ba9cdc..1209c04 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -40,7 +40,7 @@ class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
return 0;
}
float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
- float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+ float projectedHighestUtilization = node.getProjectedHighestTopStateUtilization(replica.getCapacity());
return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..f8307fe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
// A map of <capacity key, capacity value> that tracks the current available node capacity
private Map<String, Integer> _remainingCapacity;
+ private Map<String, Integer> _remainingTopStateCapacity;
/**
* Update the node with a ClusterDataCache. This resets the current assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
// make a copy of max capacity
_maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
_remainingCapacity = new HashMap<>(instanceCapacity);
+ _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
_currentAssignedReplicaMap = new HashMap<>();
}
@@ -93,6 +95,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
*/
void assignInitBatch(Collection<AssignableReplica> replicas) {
Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+ Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
for (AssignableReplica replica : replicas) {
// TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
addToAssignmentRecord(replica);
@@ -101,12 +104,18 @@ public class AssignableNode implements Comparable<AssignableNode> {
totalPartitionCapacity.compute(capacity.getKey(),
(key, totalValue) -> (totalValue == null) ? capacity.getValue()
: totalValue + capacity.getValue());
+ if (replica.isReplicaTopState()) {
+ totalTopStatePartitionCapacity.compute(capacity.getKey(),
+ (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+ : totalValue + capacity.getValue());
+ }
}
}
// Update the global state after all single replications' calculation is done.
for (String capacityKey : totalPartitionCapacity.keySet()) {
updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey));
+ updateRemainingTopStateCapacity(capacityKey, totalTopStatePartitionCapacity.get(capacityKey));
}
}
@@ -118,6 +127,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
addToAssignmentRecord(assignableReplica);
assignableReplica.getCapacity().entrySet().stream()
.forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue()));
+ if (assignableReplica.isReplicaTopState()) {
+ assignableReplica.getCapacity().entrySet().stream()
+ .forEach(capacity -> updateRemainingTopStateCapacity(capacity.getKey(), capacity.getValue()));
+ }
}
/**
@@ -148,6 +161,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
AssignableReplica removedReplica = partitionMap.remove(partitionName);
removedReplica.getCapacity().entrySet().stream()
.forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue()));
+ if (removedReplica.isReplicaTopState()) {
+ removedReplica.getCapacity().entrySet().stream()
+ .forEach(entry -> updateRemainingTopStateCapacity(entry.getKey(), -1 * entry.getValue()));
+ }
}
/**
@@ -239,6 +256,17 @@ public class AssignableNode implements Comparable<AssignableNode> {
return highestCapacityUtilization;
}
+ public float getProjectedHighestTopStateUtilization(Map<String, Integer> newUsage) {
+ float highestCapacityUtilization = 0;
+ for (String capacityKey : _maxAllowedCapacity.keySet()) {
+ float capacityValue = _maxAllowedCapacity.get(capacityKey);
+ float utilization = (capacityValue - _remainingTopStateCapacity.get(capacityKey) + newUsage
+ .getOrDefault(capacityKey, 0)) / capacityValue;
+ highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
+ }
+ return highestCapacityUtilization;
+ }
+
public String getInstanceName() {
return _instanceName;
}
@@ -320,6 +348,15 @@ public class AssignableNode implements Comparable<AssignableNode> {
_remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage);
}
+ private void updateRemainingTopStateCapacity(String capacityKey, int usage) {
+ if (!_remainingTopStateCapacity.containsKey(capacityKey)) {
+ //if the capacityKey belongs to replicas does not exist in the instance's capacity,
+ // it will be treated as if it has unlimited capacity of that capacityKey
+ return;
+ }
+ _remainingTopStateCapacity.put(capacityKey, _remainingTopStateCapacity.get(capacityKey) - usage);
+ }
+
/**
* Get and validate the instance capacity from instance config.
* @throws HelixException if any required capacity key is not configured in the instance config.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index a7049f9..0e3e354 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
* under the License.
*/
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -27,8 +28,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
+import org.apache.commons.math.stat.descriptive.moment.StandardDeviation;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
@@ -37,15 +41,19 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -53,12 +61,17 @@ import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.helix.model.ResourceConfig.DEFAULT_PARTITION_KEY;
+
+
public class TestWagedRebalance extends ZkTestBase {
protected final int NUM_NODE = 6;
protected static final int START_PORT = 12918;
@@ -76,11 +89,8 @@ public class TestWagedRebalance extends ZkTestBase {
private Set<String> _allDBs = new HashSet<>();
private int _replica = 3;
- private static String[] _testModels = {
- BuiltInStateModelDefinitions.OnlineOffline.name(),
- BuiltInStateModelDefinitions.MasterSlave.name(),
- BuiltInStateModelDefinitions.LeaderStandby.name()
- };
+ private static String[] _testModels =
+ {BuiltInStateModelDefinitions.OnlineOffline.name(), BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name()};
@BeforeClass
public void beforeClass() throws Exception {
@@ -677,8 +687,7 @@ public class TestWagedRebalance extends ZkTestBase {
HelixClusterVerifier _clusterVerifier =
new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setDeactivatedNodeAwareness(true).setResources(_allDBs)
- .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
- .build();
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
try {
Assert.assertTrue(_clusterVerifier.verify(5000));
} finally {
@@ -722,8 +731,7 @@ public class TestWagedRebalance extends ZkTestBase {
ZkHelixClusterVerifier _clusterVerifier =
new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setDeactivatedNodeAwareness(true).setResources(_allDBs)
- .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
- .build();
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
try {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
} finally {
@@ -749,4 +757,158 @@ public class TestWagedRebalance extends ZkTestBase {
}
deleteCluster(CLUSTER_NAME);
}
+
+ public static void main(String[] args)
+ throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
+ String clusterName = "test";
+ String zkAddr = "localhost:2181";
+
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+ BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
+
+ // Read cluster parameters from ZK
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+ ClusterConfig clusterConfig =
+ dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+ List<InstanceConfig> instanceConfigs =
+ dataAccessor.getChildValues(dataAccessor.keyBuilder().instanceConfigs(), true);
+ List<String> liveInstances =
+ instanceConfigs.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+ List<IdealState> idealStates =
+ dataAccessor.getChildValues(dataAccessor.keyBuilder().idealStates(), true);
+ List<ResourceConfig> resourceConfigs =
+ dataAccessor.getChildValues(dataAccessor.keyBuilder().resourceConfigs(), true);
+
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ instanceConfig.setInstanceEnabled(true);
+ instanceConfig.getRecord().getMapFields()
+ .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyMap()).clear();
+ instanceConfig.getRecord().getListFields()
+ .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyList()).clear();
+ }
+
+ clusterConfig.setInstanceCapacityKeys(Collections.singletonList("CU"));
+
+ Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap();
+ for (String key : clusterConfig.getInstanceCapacityKeys()) {
+ defaultInstanceCapacityMap
+ .put(key, clusterConfig.getDefaultInstanceCapacityMap().getOrDefault(key, 0) * 2);
+ }
+ clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacityMap);
+
+ List<IdealState> filteredIdealStates = new ArrayList<>();
+ for (IdealState idealState : idealStates) {
+ if (idealState.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) && idealState
+ .getStateModelDefRef().equals("MasterSlave")) {
+ filteredIdealStates.add(idealState);
+ }
+ }
+
+ Map<String, ResourceAssignment> baselineResult = new HashMap<>();
+ for (IdealState idealState : filteredIdealStates) {
+ ResourceAssignment resourceAssignment = new ResourceAssignment(idealState.getResourceName());
+ Map<String, Map<String, String>> partitionMap = HelixUtil
+ .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, liveInstances, idealState,
+ new ArrayList<>(idealState.getPartitionSet()), idealState.getRebalanceStrategy());
+ for (String partition : partitionMap.keySet()) {
+ resourceAssignment.addReplicaMap(new Partition(partition), partitionMap.get(partition));
+ }
+ baselineResult.put(idealState.getResourceName(), resourceAssignment);
+ }
+ outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, baselineResult);
+
+ for (IdealState idealState : filteredIdealStates) {
+ idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+ }
+
+ Map<String, ResourceAssignment> utilResult = HelixUtil
+ .getTargetAssignmentForWagedFullAuto(zkAddr, clusterConfig, instanceConfigs, liveInstances,
+ filteredIdealStates, resourceConfigs);
+
+ outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, utilResult);
+ }
+
+ private static void outputAssignments(HelixDataAccessor dataAccessor, ClusterConfig clusterConfig,
+ List<IdealState> filteredIdealStates, Map<String, ResourceAssignment> assignmentMap)
+ throws IOException {
+ Map<String, Integer> defaultWeightMap = clusterConfig.getDefaultPartitionWeightMap();
+
+ Map<String, Integer> partitionCountMap = new HashMap<>();
+ Map<String, Integer> topStateCountMap = new HashMap<>();
+ Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>();
+ Map<String, Map<String, Integer>> topStateWeightMap = new HashMap<>();
+
+ for (IdealState idealState : filteredIdealStates) {
+ StateModelDefinition stateModelDefinition =
+ BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+ .getStateModelDefinition();
+
+ ResourceConfig resourceConfig = dataAccessor
+ .getProperty(dataAccessor.keyBuilder().resourceConfig(idealState.getResourceName()));
+
+ Map<String, Map<String, Integer>> fullWeightMap = resourceConfig.getPartitionCapacityMap();
+
+ for (String partition : idealState.getPartitionSet()) {
+ Map<String, String> instanceStateMap =
+ assignmentMap.get(idealState.getResourceName()).getRecord().getMapField(partition);
+ Map<String, Integer> weightMap = new HashMap<>(defaultWeightMap);
+ weightMap.putAll(
+ fullWeightMap.getOrDefault(partition, fullWeightMap.get(DEFAULT_PARTITION_KEY)));
+
+ for (String instanceName : instanceStateMap.keySet()) {
+ String state = instanceStateMap.get(instanceName);
+ if (state.equals(stateModelDefinition.getTopState())) {
+ topStateCountMap.put(instanceName, 1 + topStateCountMap.getOrDefault(instanceName, 0));
+
+ Map<String, Integer> topStateWeights =
+ topStateWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+ for (String key : weightMap.keySet()) {
+ topStateWeights.put(key, topStateWeights.getOrDefault(key, 0) + weightMap.get(key));
+ }
+ }
+
+ partitionCountMap.put(instanceName, 1 + partitionCountMap.getOrDefault(instanceName, 0));
+
+ Map<String, Integer> weights =
+ partitionWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+ for (String key : weightMap.keySet()) {
+ weights.put(key, weights.getOrDefault(key, 0) + weightMap.get(key));
+ }
+ }
+ }
+ }
+ //System.out.println("Partition weights: " + partitionWeightMap);
+ //System.out.println("Topstate partition weights: " + topStateWeightMap);
+
+ List<Integer> regWeightList =
+ partitionWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+ List<Integer> weightList =
+ topStateWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+ int max = weightList.stream().max(Integer::compareTo).get();
+ int min = weightList.stream().min(Integer::compareTo).get();
+ StandardDeviation standardDeviation = new StandardDeviation();
+
+ double[] nums = new double[weightList.size()];
+ for (int i = 0; i < weightList.size(); i++) {
+ nums[i] = weightList.get(i);
+ }
+ double std = standardDeviation.evaluate(nums);
+
+ System.out.println("Topstate Weights Max " + max + " Min " + min + " STD " + std);
+
+ int regmax = regWeightList.stream().max(Integer::compareTo).get();
+ int regmin = regWeightList.stream().min(Integer::compareTo).get();
+ double[] regnums = new double[regWeightList.size()];
+ for (int i = 0; i < regWeightList.size(); i++) {
+ regnums[i] = regWeightList.get(i);
+ }
+ double regstd = standardDeviation.evaluate(regnums);
+
+ System.out.println("Regular Weights Max " + regmax + " Min " + regmin + " STD " + regstd);
+ }
}