You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/12/01 00:09:22 UTC

[helix] 01/03: POC

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git

commit b49d7ae6eadd403a1dc8276917ac0e3e8f8eca92
Author: Neal Sun <ne...@nesun-mn1.linkedin.biz>
AuthorDate: Mon Nov 23 18:27:26 2020 -0800

    POC
---
 .../ConstraintBasedAlgorithmFactory.java           |  2 +-
 .../ResourceTopStateUsageConstraint.java           | 46 ++++++++++++++++++++++
 .../rebalancer/waged/model/ClusterContext.java     | 17 ++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..237d16c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
       put(PartitionMovementConstraint.class.getSimpleName(), 2f);
       put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
       put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
-      put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
+      put(ResourceTopStateUsageConstraint.class.getSimpleName(), 3f);
       put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
     }
   };
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
new file mode 100644
index 0000000..8ba9cdc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -0,0 +1,46 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+
+/**
+ * Evaluate the proposed assignment according to the top state resource usage on the instance.
+ * The higher the maximum usage value for the capacity key, the lower the score will be, implying
+ * that it is that much less desirable to assign anything on the given node.
+ * It is a greedy approach since it evaluates only on the most used capacity key.
+ */
+class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
+  @Override
+  protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    if (!replica.isReplicaTopState()) {
+      // For non top state replica, this constraint is not applicable.
+      // So return zero on any assignable node candidate.
+      return 0;
+    }
+    float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
+    float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+    return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..2f6650b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
   private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
   // This estimation helps to ensure global resource usage evenness.
   private final float _estimatedMaxUtilization;
+  // This estimation helps to ensure global resource top state usage evenness.
+  private final float _estimatedTopStateMaxUtilization;
 
   // map{zoneName : map{resourceName : set(partitionNames)}}
   private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
     int totalReplicas = 0;
     int totalTopStateReplicas = 0;
     Map<String, Integer> totalUsage = new HashMap<>();
+    Map<String, Integer> totalTopStateUsage = new HashMap<>();
     Map<String, Integer> totalCapacity = new HashMap<>();
 
     for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
       for (AssignableReplica replica : entry.getValue()) {
         if (replica.isReplicaTopState()) {
           totalTopStateReplicas += 1;
+          replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage
+              .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue()
+                  : (v + capacityEntry.getValue())));
         }
         replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage
             .compute(capacityEntry.getKey(),
@@ -90,15 +96,22 @@ public class ClusterContext {
     if (totalCapacity.isEmpty()) {
       // If no capacity is configured, we treat the cluster as fully utilized.
       _estimatedMaxUtilization = 1f;
+      _estimatedTopStateMaxUtilization = 1f;
     } else {
       float estimatedMaxUsage = 0;
+      float estimatedTopStateMaxUsage = 0;
       for (String capacityKey : totalCapacity.keySet()) {
         int maxCapacity = totalCapacity.get(capacityKey);
         int usage = totalUsage.getOrDefault(capacityKey, 0);
         float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
         estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+
+        int topStateUsage = totalTopStateUsage.getOrDefault(capacityKey, 0);
+        float topStateUtilization = (maxCapacity == 0) ? 1 : (float) topStateUsage / maxCapacity;
+        estimatedTopStateMaxUsage = Math.max(estimatedTopStateMaxUsage, topStateUtilization);
       }
       _estimatedMaxUtilization = estimatedMaxUsage;
+      _estimatedTopStateMaxUtilization = estimatedTopStateMaxUsage;
     }
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
     _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +148,10 @@ public class ClusterContext {
     return _estimatedMaxUtilization;
   }
 
+  public float getEstimatedTopStateMaxUtilization() {
+    return _estimatedTopStateMaxUtilization;
+  }
+
   public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
         .getOrDefault(resourceName, Collections.emptySet());