You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/12/01 00:09:22 UTC
[helix] 01/03: POC
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch topStatePOC
in repository https://gitbox.apache.org/repos/asf/helix.git
commit b49d7ae6eadd403a1dc8276917ac0e3e8f8eca92
Author: Neal Sun <ne...@nesun-mn1.linkedin.biz>
AuthorDate: Mon Nov 23 18:27:26 2020 -0800
POC
---
.../ConstraintBasedAlgorithmFactory.java | 2 +-
.../ResourceTopStateUsageConstraint.java | 46 ++++++++++++++++++++++
.../rebalancer/waged/model/ClusterContext.java | 17 ++++++++
3 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 934bfa7..237d16c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -41,7 +41,7 @@ public class ConstraintBasedAlgorithmFactory {
put(PartitionMovementConstraint.class.getSimpleName(), 2f);
put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
- put(ResourceTopStateAntiAffinityConstraint.class.getSimpleName(), 3f);
+ put(ResourceTopStateUsageConstraint.class.getSimpleName(), 3f);
put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 5f);
}
};
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
new file mode 100644
index 0000000..8ba9cdc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourceTopStateUsageConstraint.java
@@ -0,0 +1,46 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+
+/**
+ * Evaluate the proposed assignment according to the top state resource usage on the instance.
+ * The higher the maximum usage value for the capacity key, the lower the score will be, implying
+ * that it is that much less desirable to assign anything on the given node.
+ * It is a greedy approach since it evaluates only on the most used capacity key.
+ */
+class ResourceTopStateUsageConstraint extends UsageSoftConstraint {
+ @Override
+ protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
+ ClusterContext clusterContext) {
+ if (!replica.isReplicaTopState()) {
+ // For non top state replica, this constraint is not applicable.
+ // So return zero on any assignable node candidate.
+ return 0;
+ }
+ float estimatedTopStateMaxUtilization = clusterContext.getEstimatedTopStateMaxUtilization();
+ float projectedHighestUtilization = node.getProjectedHighestUtilization(replica.getCapacity());
+ return computeUtilizationScore(estimatedTopStateMaxUtilization, projectedHighestUtilization);
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 46392c9..2f6650b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -43,6 +43,8 @@ public class ClusterContext {
private final Map<String, Integer> _estimatedMaxPartitionByResource = new HashMap<>();
// This estimation helps to ensure global resource usage evenness.
private final float _estimatedMaxUtilization;
+ // This estimation helps to ensure global resource top state usage evenness.
+ private final float _estimatedTopStateMaxUtilization;
// map{zoneName : map{resourceName : set(partitionNames)}}
private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
@@ -63,6 +65,7 @@ public class ClusterContext {
int totalReplicas = 0;
int totalTopStateReplicas = 0;
Map<String, Integer> totalUsage = new HashMap<>();
+ Map<String, Integer> totalTopStateUsage = new HashMap<>();
Map<String, Integer> totalCapacity = new HashMap<>();
for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
@@ -77,6 +80,9 @@ public class ClusterContext {
for (AssignableReplica replica : entry.getValue()) {
if (replica.isReplicaTopState()) {
totalTopStateReplicas += 1;
+ replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalTopStateUsage
+ .compute(capacityEntry.getKey(), (k, v) -> (v == null) ? capacityEntry.getValue()
+ : (v + capacityEntry.getValue())));
}
replica.getCapacity().entrySet().stream().forEach(capacityEntry -> totalUsage
.compute(capacityEntry.getKey(),
@@ -90,15 +96,22 @@ public class ClusterContext {
if (totalCapacity.isEmpty()) {
// If no capacity is configured, we treat the cluster as fully utilized.
_estimatedMaxUtilization = 1f;
+ _estimatedTopStateMaxUtilization = 1f;
} else {
float estimatedMaxUsage = 0;
+ float estimatedTopStateMaxUsage = 0;
for (String capacityKey : totalCapacity.keySet()) {
int maxCapacity = totalCapacity.get(capacityKey);
int usage = totalUsage.getOrDefault(capacityKey, 0);
float utilization = (maxCapacity == 0) ? 1 : (float) usage / maxCapacity;
estimatedMaxUsage = Math.max(estimatedMaxUsage, utilization);
+
+ int topStateUsage = totalTopStateUsage.getOrDefault(capacityKey, 0);
+ float topStateUtilization = (maxCapacity == 0) ? 1 : (float) topStateUsage / maxCapacity;
+ estimatedTopStateMaxUsage = Math.max(estimatedTopStateMaxUsage, topStateUtilization);
}
_estimatedMaxUtilization = estimatedMaxUsage;
+ _estimatedTopStateMaxUtilization = estimatedTopStateMaxUsage;
}
_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -135,6 +148,10 @@ public class ClusterContext {
return _estimatedMaxUtilization;
}
+ public float getEstimatedTopStateMaxUtilization() {
+ return _estimatedTopStateMaxUtilization;
+ }
+
public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
.getOrDefault(resourceName, Collections.emptySet());