You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by qq...@apache.org on 2023/05/26 00:58:00 UTC
[helix] branch master updated: Implement the WAGED resource weights provider (#2480)
This is an automated email from the ASF dual-hosted git repository.
qqu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 8b9204b79 Implement the WAGED resource weights provider (#2480)
8b9204b79 is described below
commit 8b9204b79daa9d203b7e8fc8bb2825bfd036ae53
Author: Qi (Quincy) Qu <qq...@gmail.com>
AuthorDate: Thu May 25 20:57:53 2023 -0400
Implement the WAGED resource weights provider (#2480)
Implement the WAGED resource weights provider.
---
.../rebalancer/util/WagedRebalanceUtil.java | 23 ++++++++++
.../rebalancer/util/WagedValidationUtil.java | 3 +-
.../waged/WagedResourceWeightsProvider.java | 49 ++++++++++++++++++++++
.../rebalancer/waged/model/AssignableReplica.java | 28 +------------
.../rebalancer/waged/TestWagedRebalancer.java | 11 +++++
5 files changed, 86 insertions(+), 28 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
index 62a5fb515..c43fda68d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
@@ -19,12 +19,15 @@ package org.apache.helix.controller.rebalancer.util;
* under the License.
*/
+import java.io.IOException;
import java.util.Map;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,4 +53,24 @@ public class WagedRebalanceUtil {
algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
return newAssignment;
}
+
+ /**
+ * Parse the resource config for the partition weight.
+ */
+ public static Map<String, Integer> fetchCapacityUsage(String partitionName,
+ ResourceConfig resourceConfig, ClusterConfig clusterConfig) {
+ Map<String, Map<String, Integer>> capacityMap;
+ try {
+ capacityMap = resourceConfig.getPartitionCapacityMap();
+ } catch (IOException ex) {
+ throw new IllegalArgumentException(
+ "Invalid partition capacity configuration of resource: " + resourceConfig
+ .getResourceName(), ex);
+ }
+ Map<String, Integer> partitionCapacity = WagedValidationUtil
+ .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
+ // Remove the non-required capacity items.
+ partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
+ return partitionCapacity;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
index ac62ed53a..ce0c12e5d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
@@ -85,8 +85,7 @@ public class WagedValidationUtil {
if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
throw new HelixException(String.format(
"The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
- requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
- partitionCapacity.toString()));
+ requiredCapacityKeys, resourceConfig.getResourceName(), partitionName, partitionCapacity));
}
return partitionCapacity;
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
new file mode 100644
index 000000000..a058da10e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
@@ -0,0 +1,49 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
+
+
+/**
+ * A weights data provider for waged resources.
+ */
+public class WagedResourceWeightsProvider {
+
+ private final ResourceControllerDataProvider _clusterData;
+
+ public WagedResourceWeightsProvider(ResourceControllerDataProvider clusterData) {
+ _clusterData = clusterData;
+ }
+
+ public Map<String, Integer> getPartitionWeights(String resourceName, String partition) {
+ @Nullable ResourceConfig resourceConfig = _clusterData.getResourceConfig(resourceName);
+ IdealState is = _clusterData.getIdealState(resourceName);
+ ResourceConfig mergedResourceConfig =
+ ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, is);
+
+ return WagedRebalanceUtil.fetchCapacityUsage(partition, mergedResourceConfig, _clusterData.getClusterConfig());
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 4ddd1aef6..be052094d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -19,13 +19,9 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.helix.HelixException;
-import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -68,7 +64,7 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
_replicaState = replicaState;
_statePriority = statePriority;
_resourceName = resourceConfig.getResourceName();
- _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig, clusterConfig);
+ _capacityUsage = WagedRebalanceUtil.fetchCapacityUsage(partitionName, resourceConfig, clusterConfig);
_resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
_resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance();
_replicaKey = generateReplicaKey(_resourceName, _partitionName,_replicaState);
@@ -144,24 +140,4 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
public static String generateReplicaKey(String resourceName, String partitionName, String state) {
return String.format("%s-%s-%s", resourceName, partitionName, state);
}
-
- /**
- * Parse the resource config for the partition weight.
- */
- private Map<String, Integer> fetchCapacityUsage(String partitionName,
- ResourceConfig resourceConfig, ClusterConfig clusterConfig) {
- Map<String, Map<String, Integer>> capacityMap;
- try {
- capacityMap = resourceConfig.getPartitionCapacityMap();
- } catch (IOException ex) {
- throw new IllegalArgumentException(
- "Invalid partition capacity configuration of resource: " + resourceConfig
- .getResourceName(), ex);
- }
- Map<String, Integer> partitionCapacity = WagedValidationUtil
- .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
- // Remove the non-required capacity items.
- partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
- return partitionCapacity;
- }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index a34c92298..60cc4bc5f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -854,4 +854,15 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
}
}
}
+
+ @Test
+ public void testResourceWeightProvider() throws IOException {
+ ResourceControllerDataProvider testCache = setupClusterDataCache();
+ WagedResourceWeightsProvider dataProvider = new WagedResourceWeightsProvider(testCache);
+ Map<String, Integer> weights1 = Map.of("item1", 3, "item2", 6, "item3", 0);
+ Assert.assertEquals(dataProvider.getPartitionWeights("Resource1", "Partition1"), weights1);
+ Assert.assertEquals(dataProvider.getPartitionWeights("Resource1", "Partition2"), weights1);
+ Map<String, Integer> weights2 = Map.of("item1", 5, "item2", 10, "item3", 0);
+ Assert.assertEquals(dataProvider.getPartitionWeights("Resource2", "Partition2"), weights2);
+ }
}