You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:10 UTC

[helix] 07/37: Add cluster level default instance config. (#413)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0d677647b75f2617bf61d7dc7627b855e1c11171
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Mon Aug 19 10:49:12 2019 -0700

    Add cluster level default instance config. (#413)
    
    This config will be applied to the instance when there is no (or empty) capacity configuration in the Instance Config.
    Also add unit tests.
---
 .../rebalancer/waged/model/AssignableNode.java     |  8 +++-
 .../java/org/apache/helix/model/ClusterConfig.java | 51 ++++++++++++++++++++
 .../org/apache/helix/model/InstanceConfig.java     | 13 ++++--
 .../rebalancer/waged/model/TestAssignableNode.java | 13 ++++++
 .../org/apache/helix/model/TestClusterConfig.java  | 54 ++++++++++++++++++++++
 5 files changed, 134 insertions(+), 5 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 5fc04d7..e2fd676 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -88,11 +88,15 @@ public class AssignableNode {
       Collection<AssignableReplica> existingAssignment) {
     reset();
 
-    _currentCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+    Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap();
+    if (instanceCapacity.isEmpty()) {
+      instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
+    }
+    _currentCapacity.putAll(instanceCapacity);
     _faultZone = computeFaultZone(clusterConfig, instanceConfig);
     _instanceTags = new HashSet<>(instanceConfig.getTags());
     _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
-    _maxCapacity = instanceConfig.getInstanceCapacityMap();
+    _maxCapacity = instanceCapacity;
     _maxPartition = clusterConfig.getMaxPartitionsPerInstance();
 
     assignNewBatch(existingAssignment);
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index ee942c7..a8c1da9 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -29,10 +29,12 @@ import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Cluster configurations
@@ -86,6 +88,8 @@ public class ClusterConfig extends HelixProperty {
 
     // The required instance capacity keys for resource partition assignment calculation.
     INSTANCE_CAPACITY_KEYS,
+    // The default instance capacity if no capacity is configured in the Instance Config node.
+    DEFAULT_INSTANCE_CAPACITY_MAP,
     // The preference of the rebalance result.
     // EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
     // LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
@@ -700,6 +704,53 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Get the default instance capacity information from the map fields.
+   * @return data map if it exists, or empty map
+   */
+  public Map<String, Integer> getDefaultInstanceCapacityMap() {
+    Map<String, String> capacityData =
+        _record.getMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name());
+
+    if (capacityData != null) {
+      return capacityData.entrySet().stream().collect(
+          Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
+    }
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Set the default instance capacity information with an Integer mapping.
+   * @param capacityDataMap - map of instance capacity data
+   * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
+   *
+   * This information is required by the global rebalancer.
+   * @see <a href="Rebalance Algorithm">
+   *   https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
+   *   </a>
+   * If the instance capacity is not configured in neither Instance Config nor Cluster Config, the
+   * cluster topology is considered invalid. So the rebalancer may stop working.
+   */
+  public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap)
+      throws IllegalArgumentException {
+    if (capacityDataMap == null || capacityDataMap.size() == 0) {
+      throw new IllegalArgumentException("Default Instance Capacity Data is empty");
+    }
+
+    Map<String, String> capacityData = new HashMap<>();
+
+    capacityDataMap.entrySet().stream().forEach(entry -> {
+      if (entry.getValue() < 0) {
+        throw new IllegalArgumentException(String
+            .format("Default Instance Capacity Data contains a negative value: %s = %d",
+                entry.getKey(), entry.getValue()));
+      }
+      capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
+    });
+
+    _record.setMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), capacityData);
+  }
+
+  /**
    * Set the global rebalancer's assignment preference.
    * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
    *                   The ratio of the configured weights will determine the rebalancer's behavior.
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 88fd1dd..ac1814d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -507,8 +507,7 @@ public class InstanceConfig extends HelixProperty {
   }
 
   /**
-   * Get the instance capacity information from the map fields
-   *
+   * Get the instance capacity information from the map fields.
    * @return data map if it exists, or empty map
    */
   public Map<String, Integer> getInstanceCapacityMap() {
@@ -523,9 +522,17 @@ public class InstanceConfig extends HelixProperty {
   }
 
   /**
-   * Set the instance capacity information with an Integer mapping
+   * Set the instance capacity information with an Integer mapping.
    * @param capacityDataMap - map of instance capacity data
    * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
+   *
+   * This information is required by the global rebalancer.
+   * @see <a href="Rebalance Algorithm">
+   *   https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter
+   *   </a>
+   * If the instance capacity is not configured in neither Instance Config nor Cluster Config, the
+   * cluster topology is considered invalid. So the rebalancer may stop working.
+   * Note that when a rebalancer requires this capacity information, it will ignore INSTANCE_WEIGHT.
    */
   public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
       throws IllegalArgumentException {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index d7fcce9..f55d0fc 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -200,4 +200,17 @@ public class TestAssignableNode extends AbstractTestClusterModel {
 
     Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
   }
+
+  @Test
+  public void testDefaultInstanceCapacity() {
+    ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
+    testClusterConfig.setDefaultInstanceCapacityMap(_capacityDataMap);
+
+    InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+
+    AssignableNode assignableNode =
+        new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId,
+            Collections.emptyList());
+    Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 209b196..5cf9bff 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -20,6 +20,8 @@ package org.apache.helix.model;
  */
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -127,4 +129,56 @@ public class TestClusterConfig {
     ClusterConfig testConfig = new ClusterConfig("testId");
     testConfig.setGlobalRebalancePreference(preference);
   }
+
+  @Test
+  public void testGetInstanceCapacityMap() {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);
+
+    Map<String, String> capacityDataMapString =
+        ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");
+
+    ZNRecord rec = new ZNRecord("testId");
+    rec.setMapField(ClusterConfig.ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(),
+        capacityDataMapString);
+    ClusterConfig testConfig = new ClusterConfig(rec);
+
+    Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(capacityDataMap));
+  }
+
+  @Test
+  public void testGetInstanceCapacityMapEmpty() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+
+    Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(Collections.emptyMap()));
+  }
+
+  @Test
+  public void testSetInstanceCapacityMap() {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);
+
+    Map<String, String> capacityDataMapString =
+        ImmutableMap.of("item1", "1", "item2", "2", "item3", "3");
+
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
+
+    Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty.
+        DEFAULT_INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data is empty")
+  public void testSetInstanceCapacityMapEmpty() {
+    Map<String, Integer> capacityDataMap = new HashMap<>();
+
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data contains a negative value: item3 = -3")
+  public void testSetInstanceCapacityMapInvalid() {
+    Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", -3);
+
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    testConfig.setDefaultInstanceCapacityMap(capacityDataMap);
+  }
 }