You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:10 UTC
[helix] 07/37: Add cluster level default instance config. (#413)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0d677647b75f2617bf61d7dc7627b855e1c11171
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Mon Aug 19 10:49:12 2019 -0700
Add cluster level default instance config. (#413)
This config will be applied to the instance when there is no (or empty) capacity configuration in the Instance Config.
Also add unit tests.
---
.../rebalancer/waged/model/AssignableNode.java | 8 +++-
.../java/org/apache/helix/model/ClusterConfig.java | 51 ++++++++++++++++++++
.../org/apache/helix/model/InstanceConfig.java | 13 ++++--
.../rebalancer/waged/model/TestAssignableNode.java | 13 ++++++
.../org/apache/helix/model/TestClusterConfig.java | 54 ++++++++++++++++++++++
5 files changed, 134 insertions(+), 5 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 5fc04d7..e2fd676 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -88,11 +88,15 @@ public class AssignableNode {
Collection<AssignableReplica> existingAssignment) {
reset();
- _currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+ Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
+ if (instanceCapacity.isEmpty()) {
+ instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
+ }
+ _currentCapacity.putAll(instanceCapacity);
_faultZone = computeFaultZone(clusterConfig, instanceConfig);
_instanceTags = new HashSet<>(instanceConfig.getTags());
_disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
- _maxCapacity = instanceConfig.getInstanceCapacityMap();
+ _maxCapacity = instanceCapacity;
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
assignNewBatch(existingAssignment);
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index ee942c7..a8c1da9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -29,10 +29,12 @@ import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Cluster configurations
@@ -86,6 +88,8 @@ public class ClusterConfig extends HelixProperty {
// The required instance capacity keys for resource partition assignment calculation.
INSTANCE_CAPACITY_KEYS,
+ // The default instance capacity if no capacity is configured in the Instance Config node.
+ DEFAULT_INSTANCE_CAPACITY_MAP,
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
@@ -700,6 +704,53 @@ public class ClusterConfig extends HelixProperty {
}
/**
+ * Get the default instance capacity information from the map fields.
+ * @return data map if it exists, or empty map
+ */
+ public Map<String, Integer> getDefaultInstanceCapacityMap() {
+ Map<String, String> capacityData =
+ _record.getMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name());
+
+ if (capacityData != null) {
+ return capacityData.entrySet().stream().collect(
+ Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
+ }
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Set the default instance capacity information with an Integer mapping.
+ * @param capacityDataMap - map of instance capacity data
+ * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
+ *
+ * This information is required by the global rebalancer.
+ * @see <a href="Rebalance Algorithm">
+ * https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
+ * </a>
+ * If the instance capacity is not configured in neither Instance Config nor Cluster Config, the
+ * cluster topology is considered invalid. So the rebalancer may stop working.
+ */
+ public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
+ throws IllegalArgumentException {
+ if (capacityDataMap == null || capacityDataMap.size() == 0) {
+ throw new IllegalArgumentException("Default Instance Capacity Data is empty");
+ }
+
+ Map<String, String> capacityData = new HashMap<>();
+
+ capacityDataMap.entrySet().stream().forEach(entry -> {
+ if (entry.getValue() < 0) {
+ throw new IllegalArgumentException(String
+ .format("Default Instance Capacity Data contains a negative value: %s = %d",
+ entry.getKey(), entry.getValue()));
+ }
+ capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
+ });
+
+ _record.setMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), capacityData);
+ }
+
+ /**
* Set the global rebalancer's assignment preference.
* @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
* The ratio of the configured weights will determine the rebalancer's behavior.
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 88fd1dd..ac1814d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -507,8 +507,7 @@ public class InstanceConfig extends HelixProperty {
}
/**
- * Get the instance capacity information from the map fields
- *
+ * Get the instance capacity information from the map fields.
* @return data map if it exists, or empty map
*/
public Map<String, Integer> getInstanceCapacityMap() {
@@ -523,9 +522,17 @@ public class InstanceConfig extends HelixProperty {
}
/**
- * Set the instance capacity information with an Integer mapping
+ * Set the instance capacity information with an Integer mapping.
* @param capacityDataMap - map of instance capacity data
* @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
+ *
+ * This information is required by the global rebalancer.
+ * @see <a href="Rebalance Algorithm">
+ * https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
+ * </a>
+ * If the instance capacity is not configured in neither Instance Config nor Cluster Config, the
+ * cluster topology is considered invalid. So the rebalancer may stop working.
+ * Note that when a rebalancer requires this capacity information, it will ignore INSTANCE_WEIGHT.
*/
public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
throws IllegalArgumentException {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index d7fcce9..f55d0fc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -200,4 +200,17 @@ public class TestAssignableNode extends AbstractTestClusterModel {
Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
}
+
+ @Test
+ public void testDefaultInstanceCapacity() {
+ ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+ testClusterConfig.setDefaultInstanceCapacityMap(_capacityDataMap);
+
+ InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+
+ AssignableNode assignableNode =
+ new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
+ Collections.emptyList());
+ Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 209b196..5cf9bff 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -20,6 +20,8 @@ package org.apache.helix.model;
*/
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -127,4 +129,56 @@ public class TestClusterConfig {
ClusterConfig testConfig = new ClusterConfig("testId");
testConfig.setGlobalRebalancePreference(preference);
}
+
+ @Test
+ public void testGetInstanceCapacityMap() {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);
+
+ Map<String, String> capacityDataMapString =
+ ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");
+
+ ZNRecord rec = new ZNRecord("testId");
+ rec.setMapField(ClusterConfig.ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(),
+ capacityDataMapString);
+ ClusterConfig testConfig = new ClusterConfig(rec);
+
+ Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(capacityDataMap));
+ }
+
+ @Test
+ public void testGetInstanceCapacityMapEmpty() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+
+ Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(Collections.emptyMap()));
+ }
+
+ @Test
+ public void testSetInstanceCapacityMap() {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);
+
+ Map<String, String> capacityDataMapString =
+ ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");
+
+ ClusterConfig testConfig = new ClusterConfig("testConfig");
+ testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
+
+ Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
+ DEFAULT_INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data is empty")
+ public void testSetInstanceCapacityMapEmpty() {
+ Map<String, Integer> capacityDataMap = new HashMap<>();
+
+ ClusterConfig testConfig = new ClusterConfig("testConfig");
+ testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data contains a negative value: item3 = -3")
+ public void testSetInstanceCapacityMapInvalid() {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", -3);
+
+ ClusterConfig testConfig = new ClusterConfig("testConfig");
+ testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
+ }
}