You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/12/01 00:09:21 UTC

[helix] branch topStatePOC updated (d5941ca -> ed4a07c)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a change to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git.


    from d5941ca  Fix targeted job quota calculation for given up tasks (#1548)
     new b49d7ae  POC
     new c67896f  bug fix
     new ed4a07c  Fix the topstate capacity usage recording and add the test method to evaluate.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ConstraintBasedAlgorithmFactory.java           |   4 +-
 ...t.java => ResourceTopStateUsageConstraint.java} |  18 ++-
 .../rebalancer/waged/model/AssignableNode.java     |  37 +++++
 .../rebalancer/waged/model/ClusterContext.java     |  17 ++
 .../WagedRebalancer/TestWagedRebalance.java        | 180 +++++++++++++++++++--
 5 files changed, 238 insertions(+), 18 deletions(-)
 copy helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/{MaxCapacityUsageInstanceConstraint.java => ResourceTopStateUsageConstraint.java} (72%)


[helix] 01/03: POC

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git

commit b49d7ae6eadd403a1dc8276917ac0e3e8f8eca92
Author: Neal Sun <ne...@nesun-mn1.linkedin.biz>
AuthorDate: Mon Nov 23 18:27:26 2020 -0800

    POC
---
 .../ConstraintBasedAlgorithmFactory.java           |  2 +-
 .../ResourceTopStateUsageConstraint.java           | 46 ++++++++++++++++++++++
 .../rebalancer/waged/model/ClusterContext.java     | 17 ++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..237d16c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
       put(PartitionMovementConstraint.class.getSimpleName(), 2f);
       put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
       put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
-      put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
+      put(ResourceTopStateUsageConstraint.class.getSimpleName(), 3f);
       put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
     }
   };
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
new file mode 100644
index 0000000..8ba9cdc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -0,0 +1,46 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+
+/**
+ * Evaluate the proposed assignment according to the top state resource usage on the instance.
+ * The higher the maximum usage value for the capacity key, the lower the score will be, implying
+ * that it is that much less desirable to assign anything on the given node.
+ * It is a greedy approach since it evaluates only on the most used capacity key.
+ */
+class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
+  @Override
+  protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    if (!replica.isReplicaTopState()) {
+      // For non top state replica, this constraint is not applicable.
+      // So return zero on any assignable node candidate.
+      return 0;
+    }
+    float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
+    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+    return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..2f6650b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
   private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
   // This estimation helps to ensure global resource usage evenness.
   private final float _estimatedMaxUtilization;
+  // This estimation helps to ensure global resource top state usage evenness.
+  private final float _estimatedTopStateMaxUtilization;
 
   // map{zoneName : map{resourceName : set(partitionNames)}}
   private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
     Map<String, Integer> totalUsage = new HashMap<>();
+    Map<String, Integer> totalTopStateUsage = new HashMap<>();
     Map<String, Integer> totalCapacity = new HashMap<>();
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
       for (AssignableReplica replica : entry.getValue()) {
         if (replica.isReplicaTopState()) {
           totalTopStateReplicas += 1;
+          replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage
+              .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue()
+                  : (v + capacityEntry.getValue())));
         }
         replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage
             .compute(capacityEntry.getKey(),
@@ -90,15 +96,22 @@ public class ClusterContext {
     if (totalCapacity.isEmpty()) {
       // If no capacity is configured, we treat the cluster as fully utilized.
       _estimatedMaxUtilization = 1f;
+      _estimatedTopStateMaxUtilization = 1f;
     } else {
       float estimatedMaxUsage = 0;
+      float estimatedTopStateMaxUsage = 0;
       for (String capacityKey : totalCapacity.keySet()) {
         int maxCapacity = totalCapacity.get(capacityKey);
         int usage = totalUsage.getOrDefault(capacityKey, 0);
         float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
         estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+
+        int topStateUsage = totalTopStateUsage.getOrDefault(capacityKey, 0);
+        float topStateUtilization = (maxCapacity == 0) ? 1 : (float) topStateUsage / maxCapacity;
+        estimatedTopStateMaxUsage = Math.max(estimatedTopStateMaxUsage, topStateUtilization);
       }
       _estimatedMaxUtilization = estimatedMaxUsage;
+      _estimatedTopStateMaxUtilization = estimatedTopStateMaxUsage;
     }
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
     _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +148,10 @@ public class ClusterContext {
     return _estimatedMaxUtilization;
   }
 
+  public float getEstimatedTopStateMaxUtilization() {
+    return _estimatedTopStateMaxUtilization;
+  }
+
   public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
         .getOrDefault(resourceName, Collections.emptySet());


[helix] 03/03: Fix the topstate capacity usage recording and add the test method to evaluate.

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ed4a07caf7c7b6847a389b2b9b8a1a99f6dcc66e
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Mon Nov 30 16:08:25 2020 -0800

    Fix the topstate capacity usage recording and add the test method to evaluate.
---
 .../ResourceTopStateUsageConstraint.java           |   2 +-
 .../rebalancer/waged/model/AssignableNode.java     |  37 +++++
 .../WagedRebalancer/TestWagedRebalance.java        | 180 +++++++++++++++++++--
 3 files changed, 209 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
index 8ba9cdc..1209c04 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -40,7 +40,7 @@ class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
       return 0;
     }
     float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
-    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+    float projectedHighestUtilization = node.getProjectedHighestTopStateUtilization(replica.getCapacity());
     return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index d3d014d..f8307fe 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -62,6 +62,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
   private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
   // A map of <capacity key, capacity value> that tracks the current available node capacity
   private Map<String, Integer> _remainingCapacity;
+  private Map<String, Integer> _remainingTopStateCapacity;
 
   /**
    * Update the node with a ClusterDataCache. This resets the current assignment and recalculates
@@ -81,6 +82,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
     // make a copy of max capacity
     _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
     _remainingCapacity = new HashMap<>(instanceCapacity);
+    _remainingTopStateCapacity = new HashMap<>(instanceCapacity);
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
     _currentAssignedReplicaMap = new HashMap<>();
   }
@@ -93,6 +95,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
    */
   void assignInitBatch(Collection<AssignableReplica> replicas) {
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
+    Map<String, Integer> totalTopStatePartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
       // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
       addToAssignmentRecord(replica);
@@ -101,12 +104,18 @@ public class AssignableNode implements Comparable<AssignableNode> {
         totalPartitionCapacity.compute(capacity.getKey(),
             (key, totalValue) -> (totalValue == null) ? capacity.getValue()
                 : totalValue + capacity.getValue());
+        if (replica.isReplicaTopState()) {
+          totalTopStatePartitionCapacity.compute(capacity.getKey(),
+              (key, totalValue) -> (totalValue == null) ? capacity.getValue()
+                  : totalValue + capacity.getValue());
+        }
       }
     }
 
     // Update the global state after all single replications' calculation is done.
     for (String capacityKey : totalPartitionCapacity.keySet()) {
       updateRemainingCapacity(capacityKey, totalPartitionCapacity.get(capacityKey));
+      updateRemainingTopStateCapacity(capacityKey, totalTopStatePartitionCapacity.get(capacityKey));
     }
   }
 
@@ -118,6 +127,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     addToAssignmentRecord(assignableReplica);
     assignableReplica.getCapacity().entrySet().stream()
             .forEach(capacity -> updateRemainingCapacity(capacity.getKey(), capacity.getValue()));
+    if (assignableReplica.isReplicaTopState()) {
+      assignableReplica.getCapacity().entrySet().stream()
+          .forEach(capacity -> updateRemainingTopStateCapacity(capacity.getKey(), capacity.getValue()));
+    }
   }
 
   /**
@@ -148,6 +161,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
     AssignableReplica removedReplica = partitionMap.remove(partitionName);
     removedReplica.getCapacity().entrySet().stream()
         .forEach(entry -> updateRemainingCapacity(entry.getKey(), -1 * entry.getValue()));
+    if (removedReplica.isReplicaTopState()) {
+      removedReplica.getCapacity().entrySet().stream()
+          .forEach(entry -> updateRemainingTopStateCapacity(entry.getKey(), -1 * entry.getValue()));
+    }
   }
 
   /**
@@ -239,6 +256,17 @@ public class AssignableNode implements Comparable<AssignableNode> {
     return highestCapacityUtilization;
   }
 
+  public float getProjectedHighestTopStateUtilization(Map<String, Integer> newUsage) {
+    float highestCapacityUtilization = 0;
+    for (String capacityKey : _maxAllowedCapacity.keySet()) {
+      float capacityValue = _maxAllowedCapacity.get(capacityKey);
+      float utilization = (capacityValue - _remainingTopStateCapacity.get(capacityKey) + newUsage
+          .getOrDefault(capacityKey, 0)) / capacityValue;
+      highestCapacityUtilization = Math.max(highestCapacityUtilization, utilization);
+    }
+    return highestCapacityUtilization;
+  }
+
   public String getInstanceName() {
     return _instanceName;
   }
@@ -320,6 +348,15 @@ public class AssignableNode implements Comparable<AssignableNode> {
     _remainingCapacity.put(capacityKey, _remainingCapacity.get(capacityKey) - usage);
   }
 
+  private void updateRemainingTopStateCapacity(String capacityKey, int usage) {
+    if (!_remainingTopStateCapacity.containsKey(capacityKey)) {
+      //if the capacityKey belongs to replicas does not exist in the instance's capacity,
+      // it will be treated as if it has unlimited capacity of that capacityKey
+      return;
+    }
+    _remainingTopStateCapacity.put(capacityKey, _remainingTopStateCapacity.get(capacityKey) - usage);
+  }
+
   /**
    * Get and validate the instance capacity from instance config.
    * @throws HelixException if any required capacity key is not configured in the instance config.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index a7049f9..09a0baa 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -19,6 +19,7 @@ package org.apache.helix.integration.rebalancer.WagedRebalancer;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -27,8 +28,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.math.stat.descriptive.moment.StandardDeviation;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
@@ -37,15 +41,19 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -53,12 +61,17 @@ import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.helix.model.ResourceConfig.DEFAULT_PARTITION_KEY;
+
+
 public class TestWagedRebalance extends ZkTestBase {
   protected final int NUM_NODE = 6;
   protected static final int START_PORT = 12918;
@@ -76,11 +89,8 @@ public class TestWagedRebalance extends ZkTestBase {
   private Set<String> _allDBs = new HashSet<>();
   private int _replica = 3;
 
-  private static String[] _testModels = {
-      BuiltInStateModelDefinitions.OnlineOffline.name(),
-      BuiltInStateModelDefinitions.MasterSlave.name(),
-      BuiltInStateModelDefinitions.LeaderStandby.name()
-  };
+  private static String[] _testModels =
+      {BuiltInStateModelDefinitions.OnlineOffline.name(), BuiltInStateModelDefinitions.MasterSlave.name(), BuiltInStateModelDefinitions.LeaderStandby.name()};
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -677,8 +687,7 @@ public class TestWagedRebalance extends ZkTestBase {
     HelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setDeactivatedNodeAwareness(true).setResources(_allDBs)
-            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-            .build();
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
     try {
       Assert.assertTrue(_clusterVerifier.verify(5000));
     } finally {
@@ -722,8 +731,7 @@ public class TestWagedRebalance extends ZkTestBase {
     ZkHelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setDeactivatedNodeAwareness(true).setResources(_allDBs)
-            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-            .build();
+            .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
     try {
       Assert.assertTrue(_clusterVerifier.verifyByPolling());
     } finally {
@@ -749,4 +757,158 @@ public class TestWagedRebalance extends ZkTestBase {
     }
     deleteCluster(CLUSTER_NAME);
   }
+
+  public static void main(String[] args)
+      throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
+    String clusterName = "ESPRESSO_MT1";
+    String zkAddr = "zk-ltx1-espresso.stg.linkedin.com:12913";
+
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+    BaseDataAccessor baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
+
+    // Read cluster parameters from ZK
+    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+    ClusterConfig clusterConfig =
+        dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+    List<InstanceConfig> instanceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().instanceConfigs(), true);
+    List<String> liveInstances =
+        instanceConfigs.stream().map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+    List<IdealState> idealStates =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().idealStates(), true);
+    List<ResourceConfig> resourceConfigs =
+        dataAccessor.getChildValues(dataAccessor.keyBuilder().resourceConfigs(), true);
+
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.getRecord().getMapFields()
+          .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyMap()).clear();
+      instanceConfig.getRecord().getListFields()
+          .getOrDefault("HELIX_DISABLED_PARTITION", Collections.emptyList()).clear();
+    }
+
+    clusterConfig.setInstanceCapacityKeys(Collections.singletonList("CU"));
+
+    Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap();
+    for (String key : clusterConfig.getInstanceCapacityKeys()) {
+      defaultInstanceCapacityMap
+          .put(key, clusterConfig.getDefaultInstanceCapacityMap().getOrDefault(key, 0) * 2);
+    }
+    clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacityMap);
+
+    List<IdealState> filteredIdealStates = new ArrayList<>();
+    for (IdealState idealState : idealStates) {
+      if (idealState.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO) && idealState
+          .getStateModelDefRef().equals("MasterSlave")) {
+        filteredIdealStates.add(idealState);
+      }
+    }
+
+    Map<String, ResourceAssignment> baselineResult = new HashMap<>();
+    for (IdealState idealState : filteredIdealStates) {
+      ResourceAssignment resourceAssignment = new ResourceAssignment(idealState.getResourceName());
+      Map<String, Map<String, String>> partitionMap = HelixUtil
+          .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, liveInstances, idealState,
+              new ArrayList<>(idealState.getPartitionSet()), idealState.getRebalanceStrategy());
+      for (String partition : partitionMap.keySet()) {
+        resourceAssignment.addReplicaMap(new Partition(partition), partitionMap.get(partition));
+      }
+      baselineResult.put(idealState.getResourceName(), resourceAssignment);
+    }
+    outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, baselineResult);
+
+    for (IdealState idealState : filteredIdealStates) {
+      idealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    }
+
+    Map<String, ResourceAssignment> utilResult = HelixUtil
+        .getTargetAssignmentForWagedFullAuto(zkAddr, clusterConfig, instanceConfigs, liveInstances,
+            filteredIdealStates, resourceConfigs);
+
+    outputAssignments(dataAccessor, clusterConfig, filteredIdealStates, utilResult);
+  }
+
+  private static void outputAssignments(HelixDataAccessor dataAccessor, ClusterConfig clusterConfig,
+      List<IdealState> filteredIdealStates, Map<String, ResourceAssignment> assignmentMap)
+      throws IOException {
+    Map<String, Integer> defaultWeightMap = clusterConfig.getDefaultPartitionWeightMap();
+
+    Map<String, Integer> partitionCountMap = new HashMap<>();
+    Map<String, Integer> topStateCountMap = new HashMap<>();
+    Map<String, Map<String, Integer>> partitionWeightMap = new HashMap<>();
+    Map<String, Map<String, Integer>> topStateWeightMap = new HashMap<>();
+
+    for (IdealState idealState : filteredIdealStates) {
+      StateModelDefinition stateModelDefinition =
+          BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+              .getStateModelDefinition();
+
+      ResourceConfig resourceConfig = dataAccessor
+          .getProperty(dataAccessor.keyBuilder().resourceConfig(idealState.getResourceName()));
+
+      Map<String, Map<String, Integer>> fullWeightMap = resourceConfig.getPartitionCapacityMap();
+
+      for (String partition : idealState.getPartitionSet()) {
+        Map<String, String> instanceStateMap =
+            assignmentMap.get(idealState.getResourceName()).getRecord().getMapField(partition);
+        Map<String, Integer> weightMap = new HashMap<>(defaultWeightMap);
+        weightMap.putAll(
+            fullWeightMap.getOrDefault(partition, fullWeightMap.get(DEFAULT_PARTITION_KEY)));
+
+        for (String instanceName : instanceStateMap.keySet()) {
+          String state = instanceStateMap.get(instanceName);
+          if (state.equals(stateModelDefinition.getTopState())) {
+            topStateCountMap.put(instanceName, 1 + topStateCountMap.getOrDefault(instanceName, 0));
+
+            Map<String, Integer> topStateWeights =
+                topStateWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+            for (String key : weightMap.keySet()) {
+              topStateWeights.put(key, topStateWeights.getOrDefault(key, 0) + weightMap.get(key));
+            }
+          }
+
+          partitionCountMap.put(instanceName, 1 + partitionCountMap.getOrDefault(instanceName, 0));
+
+          Map<String, Integer> weights =
+              partitionWeightMap.computeIfAbsent(instanceName, map -> new HashMap<>());
+          for (String key : weightMap.keySet()) {
+            weights.put(key, weights.getOrDefault(key, 0) + weightMap.get(key));
+          }
+        }
+      }
+    }
+    //System.out.println("Partition weights: " + partitionWeightMap);
+    //System.out.println("Topstate partition weights: " + topStateWeightMap);
+
+    List<Integer> regWeightList =
+        partitionWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+    List<Integer> weightList =
+        topStateWeightMap.values().stream().map(map -> map.get("CU")).collect(Collectors.toList());
+
+    int max = weightList.stream().max(Integer::compareTo).get();
+    int min = weightList.stream().min(Integer::compareTo).get();
+    StandardDeviation standardDeviation = new StandardDeviation();
+
+    double[] nums = new double[weightList.size()];
+    for (int i = 0; i < weightList.size(); i++) {
+      nums[i] = weightList.get(i);
+    }
+    double std = standardDeviation.evaluate(nums);
+
+    System.out.println("Topstate Weights Max " + max + " Min " + min + " STD " + std);
+
+    int regmax = regWeightList.stream().max(Integer::compareTo).get();
+    int regmin = regWeightList.stream().min(Integer::compareTo).get();
+    double[] regnums = new double[regWeightList.size()];
+    for (int i = 0; i < regWeightList.size(); i++) {
+      regnums[i] = regWeightList.get(i);
+    }
+    double regstd = standardDeviation.evaluate(regnums);
+
+    System.out.println("Regular Weights Max " + regmax + " Min " + regmin + " STD " + regstd);
+  }
 }


[helix] 02/03: bug fix

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c67896fb89c58d8ad2de599edc69da4e1f341ce1
Author: Neal Sun <ne...@nesun-mn1.linkedin.biz>
AuthorDate: Wed Nov 25 10:15:35 2020 -0800

    bug fix
---
 .../rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 237d16c..b0da403 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -69,7 +69,7 @@ public class ConstraintBasedAlgorithmFactory {
     List<SoftConstraint> softConstraints = ImmutableList
         .of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(),
             new ResourcePartitionAntiAffinityConstraint(),
-            new ResourceTopStateAntiAffinityConstraint(), new MaxCapacityUsageInstanceConstraint());
+            new ResourceTopStateUsageConstraint(), new MaxCapacityUsageInstanceConstraint());
     Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> {
       String name = key.getClass().getSimpleName();
       float weight = MODEL.get(name);