You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by qq...@apache.org on 2023/05/26 00:58:00 UTC

[helix] branch master updated: Implement the WAGED resource weights provider (#2480)

This is an automated email from the ASF dual-hosted git repository.

qqu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9204b79 Implement the WAGED resource weights provider (#2480)
8b9204b79 is described below

commit 8b9204b79daa9d203b7e8fc8bb2825bfd036ae53
Author: Qi (Quincy) Qu <qq...@gmail.com>
AuthorDate: Thu May 25 20:57:53 2023 -0400

    Implement the WAGED resource weights provider (#2480)
    
    Implement the WAGED resource weights provider.
---
 .../rebalancer/util/WagedRebalanceUtil.java        | 23 ++++++++++
 .../rebalancer/util/WagedValidationUtil.java       |  3 +-
 .../waged/WagedResourceWeightsProvider.java        | 49 ++++++++++++++++++++++
 .../rebalancer/waged/model/AssignableReplica.java  | 28 +------------
 .../rebalancer/waged/TestWagedRebalancer.java      | 11 +++++
 5 files changed, 86 insertions(+), 28 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
index 62a5fb515..c43fda68d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
@@ -19,12 +19,15 @@ package org.apache.helix.controller.rebalancer.util;
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.Map;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,4 +53,24 @@ public class WagedRebalanceUtil {
         algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
     return newAssignment;
   }
+
+  /**
+   * Parse the resource config for the partition weight.
+   */
+  public static Map<String, Integer> fetchCapacityUsage(String partitionName,
+      ResourceConfig resourceConfig, ClusterConfig clusterConfig) {
+    Map<String, Map<String, Integer>> capacityMap;
+    try {
+      capacityMap = resourceConfig.getPartitionCapacityMap();
+    } catch (IOException ex) {
+      throw new IllegalArgumentException(
+          "Invalid partition capacity configuration of resource: " + resourceConfig
+              .getResourceName(), ex);
+    }
+    Map<String, Integer> partitionCapacity = WagedValidationUtil
+        .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
+    // Remove the non-required capacity items.
+    partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
+    return partitionCapacity;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
index ac62ed53a..ce0c12e5d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
@@ -85,8 +85,7 @@ public class WagedValidationUtil {
     if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
       throw new HelixException(String.format(
           "The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
-          requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
-          partitionCapacity.toString()));
+          requiredCapacityKeys, resourceConfig.getResourceName(), partitionName, partitionCapacity));
     }
     return partitionCapacity;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
new file mode 100644
index 000000000..a058da10e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedResourceWeightsProvider.java
@@ -0,0 +1,49 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.ResourceConfig;
+
+
+/**
+ * A weights data provider for waged resources.
+ */
+public class WagedResourceWeightsProvider {
+
+  private final ResourceControllerDataProvider _clusterData;
+
+  public WagedResourceWeightsProvider(ResourceControllerDataProvider clusterData) {
+    _clusterData = clusterData;
+  }
+
+  public Map<String, Integer> getPartitionWeights(String resourceName, String partition) {
+    @Nullable ResourceConfig resourceConfig = _clusterData.getResourceConfig(resourceName);
+    IdealState is = _clusterData.getIdealState(resourceName);
+    ResourceConfig mergedResourceConfig =
+        ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, is);
+
+    return WagedRebalanceUtil.fetchCapacityUsage(partition, mergedResourceConfig, _clusterData.getClusterConfig());
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 4ddd1aef6..be052094d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -19,13 +19,9 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.HelixException;
-import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -68,7 +64,7 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
     _replicaState = replicaState;
     _statePriority = statePriority;
     _resourceName = resourceConfig.getResourceName();
-    _capacityUsage = fetchCapacityUsage(partitionName, resourceConfig, clusterConfig);
+    _capacityUsage = WagedRebalanceUtil.fetchCapacityUsage(partitionName, resourceConfig, clusterConfig);
     _resourceInstanceGroupTag = resourceConfig.getInstanceGroupTag();
     _resourceMaxPartitionsPerInstance = resourceConfig.getMaxPartitionsPerInstance();
     _replicaKey = generateReplicaKey(_resourceName, _partitionName,_replicaState);
@@ -144,24 +140,4 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
   public static String generateReplicaKey(String resourceName, String partitionName, String state) {
     return String.format("%s-%s-%s", resourceName, partitionName, state);
   }
-
-  /**
-   * Parse the resource config for the partition weight.
-   */
-  private Map<String, Integer> fetchCapacityUsage(String partitionName,
-      ResourceConfig resourceConfig, ClusterConfig clusterConfig) {
-    Map<String, Map<String, Integer>> capacityMap;
-    try {
-      capacityMap = resourceConfig.getPartitionCapacityMap();
-    } catch (IOException ex) {
-      throw new IllegalArgumentException(
-          "Invalid partition capacity configuration of resource: " + resourceConfig
-              .getResourceName(), ex);
-    }
-    Map<String, Integer> partitionCapacity = WagedValidationUtil
-        .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
-    // Remove the non-required capacity items.
-    partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
-    return partitionCapacity;
-  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index a34c92298..60cc4bc5f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -854,4 +854,15 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
       }
     }
   }
+
+  @Test
+  public void testResourceWeightProvider() throws IOException {
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    WagedResourceWeightsProvider dataProvider = new WagedResourceWeightsProvider(testCache);
+    Map<String, Integer> weights1 = Map.of("item1", 3, "item2", 6, "item3", 0);
+    Assert.assertEquals(dataProvider.getPartitionWeights("Resource1", "Partition1"), weights1);
+    Assert.assertEquals(dataProvider.getPartitionWeights("Resource1", "Partition2"), weights1);
+    Map<String, Integer> weights2 = Map.of("item1", 5, "item2", 10, "item3", 0);
+    Assert.assertEquals(dataProvider.getPartitionWeights("Resource2", "Partition2"), weights2);
+  }
 }