You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:05 UTC
[helix] 02/37: Adding the configuration items of the WAGED
rebalancer. (#348)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 39f7adf6774c78f7aab68a5af0d814278655e736
Author: jiajunwang <18...@users.noreply.github.com>
AuthorDate: Fri Jul 26 11:42:52 2019 -0700
Adding the configuration items of the WAGED rebalancer. (#348)
* Adding the configuration items of the WAGED rebalancer.
Including: Instance Capacity Keys, Rebalance Preferences, Instance Capacity Details, Partition Capacity (the weight) Details.
Also adding test to cover the new configuration items.
---
.../java/org/apache/helix/model/ClusterConfig.java | 129 +++++++++++---
.../org/apache/helix/model/InstanceConfig.java | 62 +++++--
.../org/apache/helix/model/ResourceConfig.java | 139 ++++++++++++++-
.../org/apache/helix/model/TestClusterConfig.java | 130 ++++++++++++++
.../org/apache/helix/model/TestInstanceConfig.java | 66 +++++++-
.../org/apache/helix/model/TestResourceConfig.java | 186 +++++++++++++++++++++
6 files changed, 669 insertions(+), 43 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 8905dda..ee942c7 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -19,12 +19,8 @@ package org.apache.helix.model;
* under the License.
*/
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
@@ -32,6 +28,12 @@ import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* Cluster configurations
*/
@@ -80,7 +82,19 @@ public class ClusterConfig extends HelixProperty {
DISABLED_INSTANCES,
// Specifies job types and used for quota allocation
- QUOTA_TYPES
+ QUOTA_TYPES,
+
+ // The required instance capacity keys for resource partition assignment calculation.
+ INSTANCE_CAPACITY_KEYS,
+ // The preference of the rebalance result.
+ // EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
+ // LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
+ REBALANCE_PREFERENCE
+ }
+
+ public enum GlobalRebalancePreferenceKey {
+ EVENNESS,
+ LESS_MOVEMENT
}
private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
@@ -94,6 +108,15 @@ public class ClusterConfig extends HelixProperty {
public final static String TASK_QUOTA_RATIO_NOT_SET = "-1";
+ // Default preference for all the aspects should be the same to ensure balanced setup.
+ public final static Map<GlobalRebalancePreferenceKey, Integer>
+ DEFAULT_GLOBAL_REBALANCE_PREFERENCE =
+ ImmutableMap.<GlobalRebalancePreferenceKey, Integer>builder()
+ .put(GlobalRebalancePreferenceKey.EVENNESS, 1)
+ .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
+ private final static int MAX_REBALANCE_PREFERENCE = 10;
+ private final static int MIN_REBALANCE_PREFERENCE = 0;
+
/**
* Instantiate for a specific cluster
* @param cluster the cluster identifier
@@ -112,21 +135,21 @@ public class ClusterConfig extends HelixProperty {
/**
* Set task quota type with the ratio of this quota.
- * @param quotaType String
+ * @param quotaType String
* @param quotaRatio int
*/
public void setTaskQuotaRatio(String quotaType, int quotaRatio) {
if (_record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()) == null) {
_record.setMapField(ClusterConfigProperty.QUOTA_TYPES.name(), new HashMap<String, String>());
}
- _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name()).put(quotaType,
- Integer.toString(quotaRatio));
+ _record.getMapField(ClusterConfigProperty.QUOTA_TYPES.name())
+ .put(quotaType, Integer.toString(quotaRatio));
}
/**
* Set task quota type with the ratio of this quota. Quota ratio must be a String that is
* parse-able into an int.
- * @param quotaType String
+ * @param quotaType String
* @param quotaRatio String
*/
public void setTaskQuotaRatio(String quotaType, String quotaRatio) {
@@ -209,8 +232,8 @@ public class ClusterConfig extends HelixProperty {
* @return
*/
public Boolean isPersistIntermediateAssignment() {
- return _record.getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(),
- false);
+ return _record
+ .getBooleanField(ClusterConfigProperty.PERSIST_INTERMEDIATE_ASSIGNMENT.toString(), false);
}
/**
@@ -232,8 +255,8 @@ public class ClusterConfig extends HelixProperty {
}
public Boolean isPipelineTriggersDisabled() {
- return _record.getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(),
- false);
+ return _record
+ .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false);
}
/**
@@ -402,8 +425,8 @@ public class ClusterConfig extends HelixProperty {
* @return
*/
public int getNumOfflineInstancesForAutoExit() {
- return _record.getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(),
- -1);
+ return _record
+ .getIntField(ClusterConfigProperty.NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT.name(), -1);
}
/**
@@ -443,9 +466,7 @@ public class ClusterConfig extends HelixProperty {
if (obj instanceof ClusterConfig) {
ClusterConfig that = (ClusterConfig) obj;
- if (this.getId().equals(that.getId())) {
- return true;
- }
+ return this.getId().equals(that.getId());
}
return false;
}
@@ -489,8 +510,8 @@ public class ClusterConfig extends HelixProperty {
}
if (!configStrs.isEmpty()) {
- _record.setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(),
- configStrs);
+ _record
+ .setListField(ClusterConfigProperty.STATE_TRANSITION_THROTTLE_CONFIGS.name(), configStrs);
}
}
@@ -578,7 +599,7 @@ public class ClusterConfig extends HelixProperty {
public int getErrorPartitionThresholdForLoadBalance() {
return _record.getIntField(
ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
- DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
+ DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
}
/**
@@ -657,6 +678,70 @@ public class ClusterConfig extends HelixProperty {
}
/**
+ * Set the required Instance Capacity Keys.
+ * @param capacityKeys
+ */
+ public void setInstanceCapacityKeys(List<String> capacityKeys) {
+ if (capacityKeys == null || capacityKeys.isEmpty()) {
+ throw new IllegalArgumentException("The input instance capacity key list is empty.");
+ }
+ _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
+ }
+
+ /**
+ * @return The required Instance Capacity Keys. If not configured, return an empty list.
+ */
+ public List<String> getInstanceCapacityKeys() {
+ List<String> capacityKeys = _record.getListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name());
+ if (capacityKeys == null) {
+ return Collections.emptyList();
+ }
+ return capacityKeys;
+ }
+
+ /**
+ * Set the global rebalancer's assignment preference.
+ * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
+ * The ratio of the configured weights will determine the rebalancer's behavior.
+ */
+ public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) {
+ Map<String, String> preferenceMap = new HashMap<>();
+
+ preference.entrySet().stream().forEach(entry -> {
+ if (entry.getValue() > MAX_REBALANCE_PREFERENCE
+ || entry.getValue() < MIN_REBALANCE_PREFERENCE) {
+ throw new IllegalArgumentException(String
+ .format("Invalid global rebalance preference configuration. Key %s, Value %d.",
+ entry.getKey().name(), entry.getValue()));
+ }
+ preferenceMap.put(entry.getKey().name(), Integer.toString(entry.getValue()));
+ });
+
+ _record.setMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preferenceMap);
+ }
+
+ /**
+ * Get the global rebalancer's assignment preference.
+ */
+ public Map<GlobalRebalancePreferenceKey, Integer> getGlobalRebalancePreference() {
+ Map<String, String> preferenceStrMap =
+ _record.getMapField(ClusterConfigProperty.REBALANCE_PREFERENCE.name());
+ if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) {
+ Map<GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+ for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) {
+ if (!preferenceStrMap.containsKey(key.name())) {
+ // If any key is not configured with a value, return the default config.
+ return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
+ }
+ preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name())));
+ }
+ return preference;
+ }
+ // If configuration is not complete, return the default one.
+ return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
+ }
+
+ /**
* Get IdealState rules defined in the cluster config.
* @return
*/
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index f65a1bd..88fd1dd 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -19,6 +19,14 @@ package org.apache.helix.model;
* under the License.
*/
+import com.google.common.base.Splitter;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -27,15 +35,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.util.HelixUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Splitter;
+import java.util.stream.Collectors;
/**
* Instance configurations
@@ -55,7 +55,8 @@ public class InstanceConfig extends HelixProperty {
INSTANCE_WEIGHT,
DOMAIN,
DELAY_REBALANCE_ENABLED,
- MAX_CONCURRENT_TASK
+ MAX_CONCURRENT_TASK,
+ INSTANCE_CAPACITY_MAP
}
public static final int WEIGHT_NOT_SET = -1;
@@ -505,6 +506,47 @@ public class InstanceConfig extends HelixProperty {
_record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask);
}
+ /**
+ * Get the instance capacity information from the map fields
+ *
+ * @return data map if it exists, or empty map
+ */
+ public Map<String, Integer> getInstanceCapacityMap() {
+ Map<String, String> capacityData =
+ _record.getMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name());
+
+ if (capacityData != null) {
+ return capacityData.entrySet().stream().collect(
+ Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue())));
+ }
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Set the instance capacity information with an Integer mapping
+ * @param capacityDataMap - map of instance capacity data
+ * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty
+ */
+ public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap)
+ throws IllegalArgumentException {
+ if (capacityDataMap == null || capacityDataMap.size() == 0) {
+ throw new IllegalArgumentException("Capacity Data is empty");
+ }
+
+ Map<String, String> capacityData = new HashMap<>();
+
+ capacityDataMap.entrySet().stream().forEach(entry -> {
+ if (entry.getValue() < 0) {
+ throw new IllegalArgumentException(String
+ .format("Capacity Data contains a negative value: %s = %d", entry.getKey(),
+ entry.getValue()));
+ }
+ capacityData.put(entry.getKey(), Integer.toString(entry.getValue()));
+ });
+
+ _record.setMapField(InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityData);
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof InstanceConfig) {
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index 274640c..1ead08e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -19,19 +19,23 @@ package org.apache.helix.model;
* under the License.
*/
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import java.util.TreeMap;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.config.HelixConfigProperty;
import org.apache.helix.api.config.RebalanceConfig;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
/**
* Resource configurations
*/
@@ -53,7 +57,8 @@ public class ResourceConfig extends HelixProperty {
RESOURCE_TYPE,
GROUP_ROUTING_ENABLED,
EXTERNAL_VIEW_DISABLED,
- DELAY_REBALANCE_ENABLED
+ DELAY_REBALANCE_ENABLED,
+ PARTITION_CAPACITY_MAP
}
public enum ResourceConfigConstants {
@@ -61,6 +66,10 @@ public class ResourceConfig extends HelixProperty {
}
private static final Logger _logger = LoggerFactory.getLogger(ResourceConfig.class.getName());
+ private static final ObjectMapper _objectMapper = new ObjectMapper();
+
+ public static final String DEFAULT_PARTITION_KEY = "DEFAULT";
+
/**
* Instantiate for a specific instance
*
@@ -92,10 +101,24 @@ public class ResourceConfig extends HelixProperty {
String stateModelDefRef, String stateModelFactoryName, String numReplica,
int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag,
Boolean helixEnabled, String resourceGroupName, String resourceType,
- Boolean groupRoutingEnabled, Boolean externalViewDisabled,
- RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig,
+ Boolean groupRoutingEnabled, Boolean externalViewDisabled, RebalanceConfig rebalanceConfig,
+ StateTransitionTimeoutConfig stateTransitionTimeoutConfig,
Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields,
Boolean p2pMessageEnabled) {
+ this(resourceId, monitorDisabled, numPartitions, stateModelDefRef, stateModelFactoryName,
+ numReplica, minActiveReplica, maxPartitionsPerInstance, instanceGroupTag, helixEnabled,
+ resourceGroupName, resourceType, groupRoutingEnabled, externalViewDisabled, rebalanceConfig,
+ stateTransitionTimeoutConfig, listFields, mapFields, p2pMessageEnabled, null);
+ }
+
+ private ResourceConfig(String resourceId, Boolean monitorDisabled, int numPartitions,
+ String stateModelDefRef, String stateModelFactoryName, String numReplica,
+ int minActiveReplica, int maxPartitionsPerInstance, String instanceGroupTag,
+ Boolean helixEnabled, String resourceGroupName, String resourceType,
+ Boolean groupRoutingEnabled, Boolean externalViewDisabled,
+ RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig,
+ Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields,
+ Boolean p2pMessageEnabled, Map<String, Map<String, Integer>> partitionCapacityMap) {
super(resourceId);
if (monitorDisabled != null) {
@@ -172,6 +195,15 @@ public class ResourceConfig extends HelixProperty {
if (mapFields != null) {
_record.setMapFields(mapFields);
}
+
+ if (partitionCapacityMap != null) {
+ try {
+ setPartitionCapacityMap(partitionCapacityMap);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ "Failed to set partition capacity. Invalid capacity configuration.");
+ }
+ }
}
@@ -350,6 +382,64 @@ public class ResourceConfig extends HelixProperty {
}
/**
+ * Get the partition capacity information from a JSON among the map fields.
+ * <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
+ *
+ * @return data map if it exists, or empty map
+ * @throws IOException - when JSON conversion fails
+ */
+ public Map<String, Map<String, Integer>> getPartitionCapacityMap() throws IOException {
+ Map<String, String> partitionCapacityData =
+ _record.getMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name());
+ Map<String, Map<String, Integer>> partitionCapacityMap = new HashMap<>();
+ if (partitionCapacityData != null) {
+ for (String partition : partitionCapacityData.keySet()) {
+ Map<String, Integer> capacities = _objectMapper
+ .readValue(partitionCapacityData.get(partition),
+ new TypeReference<Map<String, Integer>>() {
+ });
+ partitionCapacityMap.put(partition, capacities);
+ }
+ }
+ return partitionCapacityMap;
+ }
+
+ /**
+ * Set the partition capacity information with a map <PartitionName or DEFAULT_PARTITION_KEY, <Capacity Key, Capacity Number>>
+ *
+ * @param partitionCapacityMap - map of partition capacity data
+ * @throws IllegalArgumentException - when any of the data value is a negative number or map is empty
+ * @throws IOException - when JSON parsing fails
+ */
+ public void setPartitionCapacityMap(Map<String, Map<String, Integer>> partitionCapacityMap)
+ throws IllegalArgumentException, IOException {
+ if (partitionCapacityMap == null || partitionCapacityMap.isEmpty()) {
+ throw new IllegalArgumentException("Capacity Map is empty");
+ }
+ if (!partitionCapacityMap.containsKey(DEFAULT_PARTITION_KEY)) {
+ throw new IllegalArgumentException(String
+ .format("The default partition capacity with the default key %s is required.",
+ DEFAULT_PARTITION_KEY));
+ }
+
+ Map<String, String> newCapacityRecord = new HashMap<>();
+ for (String partition : partitionCapacityMap.keySet()) {
+ Map<String, Integer> capacities = partitionCapacityMap.get(partition);
+ // Verify the input is valid
+ if (capacities.isEmpty()) {
+ throw new IllegalArgumentException("Capacity Data is empty");
+ }
+ if (capacities.entrySet().stream().anyMatch(entry -> entry.getValue() < 0)) {
+ throw new IllegalArgumentException(
+ String.format("Capacity Data contains a negative value:%s", capacities.toString()));
+ }
+ newCapacityRecord.put(partition, _objectMapper.writeValueAsString(capacities));
+ }
+
+ _record.setMapField(ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), newCapacityRecord);
+ }
+
+ /**
* Put a set of simple configs.
*
* @param configsMap
@@ -476,6 +566,7 @@ public class ResourceConfig extends HelixProperty {
private StateTransitionTimeoutConfig _stateTransitionTimeoutConfig;
private Map<String, List<String>> _preferenceLists;
private Map<String, Map<String, String>> _mapFields;
+ private Map<String, Map<String, Integer>> _partitionCapacityMap;
public Builder(String resourceId) {
_resourceId = resourceId;
@@ -664,6 +755,23 @@ public class ResourceConfig extends HelixProperty {
return _preferenceLists;
}
+ public Builder setPartitionCapacity(Map<String, Integer> defaultCapacity) {
+ setPartitionCapacity(DEFAULT_PARTITION_KEY, defaultCapacity);
+ return this;
+ }
+
+ public Builder setPartitionCapacity(String partition, Map<String, Integer> capacity) {
+ if (_partitionCapacityMap == null) {
+ _partitionCapacityMap = new HashMap<>();
+ }
+ _partitionCapacityMap.put(partition, capacity);
+ return this;
+ }
+
+ public Map<String, Integer> getPartitionCapacity(String partition) {
+ return _partitionCapacityMap.get(partition);
+ }
+
public Builder setMapField(String key, Map<String, String> fields) {
if (_mapFields == null) {
_mapFields = new TreeMap<>();
@@ -708,6 +816,19 @@ public class ResourceConfig extends HelixProperty {
}
}
}
+
+ if (_partitionCapacityMap != null) {
+ if (_partitionCapacityMap.keySet().stream()
+ .noneMatch(partition -> partition.equals(DEFAULT_PARTITION_KEY))) {
+ throw new IllegalArgumentException(
+ "Partition capacity is configured without the DEFAULT capacity!");
+ }
+ if (_partitionCapacityMap.values().stream()
+ .anyMatch(capacity -> capacity.values().stream().anyMatch(value -> value < 0))) {
+ throw new IllegalArgumentException(
+ "Partition capacity is configured with negative capacity value!");
+ }
+ }
}
public ResourceConfig build() {
@@ -718,7 +839,7 @@ public class ResourceConfig extends HelixProperty {
_stateModelFactoryName, _numReplica, _minActiveReplica, _maxPartitionsPerInstance,
_instanceGroupTag, _helixEnabled, _resourceGroupName, _resourceType, _groupRoutingEnabled,
_externalViewDisabled, _rebalanceConfig, _stateTransitionTimeoutConfig, _preferenceLists,
- _mapFields, _p2pMessageEnabled);
+ _mapFields, _p2pMessageEnabled, _partitionCapacityMap);
}
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
new file mode 100644
index 0000000..209b196
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -0,0 +1,130 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableList;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS;
+import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT;
+
+public class TestClusterConfig {
+
+ @Test
+ public void testGetCapacityKeys() {
+ List<String> keys = ImmutableList.of("CPU", "MEMORY", "Random");
+
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.getRecord()
+ .setListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), keys);
+
+ Assert.assertEquals(testConfig.getInstanceCapacityKeys(), keys);
+ }
+
+ @Test
+ public void testGetCapacityKeysEmpty() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ Assert.assertEquals(testConfig.getInstanceCapacityKeys(), Collections.emptyList());
+ }
+
+ @Test
+ public void testSetCapacityKeys() {
+ List<String> keys = ImmutableList.of("CPU", "MEMORY", "Random");
+
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setInstanceCapacityKeys(keys);
+
+ Assert.assertEquals(keys, testConfig.getRecord()
+ .getListField(ClusterConfig.ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name()));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testSetCapacityKeysEmptyList() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setInstanceCapacityKeys(Collections.emptyList());
+ }
+
+ @Test
+ public void testGetRebalancePreference() {
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+ preference.put(EVENNESS, 5);
+ preference.put(LESS_MOVEMENT, 3);
+
+ Map<String, String> mapFieldData = new HashMap<>();
+ for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) {
+ mapFieldData.put(key.name(), String.valueOf(preference.get(key)));
+ }
+
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.getRecord()
+ .setMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name(), mapFieldData);
+
+ Assert.assertEquals(testConfig.getGlobalRebalancePreference(), preference);
+ }
+
+ @Test
+ public void testGetRebalancePreferenceDefault() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
+ ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+ preference.put(EVENNESS, 5);
+ testConfig.setGlobalRebalancePreference(preference);
+
+ Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
+ ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+ }
+
+ @Test
+ public void testSetRebalancePreference() {
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+ preference.put(EVENNESS, 5);
+ preference.put(LESS_MOVEMENT, 3);
+
+ Map<String, String> mapFieldData = new HashMap<>();
+ for (ClusterConfig.GlobalRebalancePreferenceKey key : preference.keySet()) {
+ mapFieldData.put(key.name(), String.valueOf(preference.get(key)));
+ }
+
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setGlobalRebalancePreference(preference);
+
+ Assert.assertEquals(testConfig.getRecord()
+ .getMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name()),
+ mapFieldData);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testSetRebalancePreferenceInvalidNumber() {
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+ preference.put(EVENNESS, -1);
+ preference.put(LESS_MOVEMENT, 3);
+
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setGlobalRebalancePreference(preference);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index 38b1c92..f0da05f 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -19,12 +19,14 @@ package org.apache.helix.model;
* under the License.
*/
-import java.util.Map;
-
+import com.google.common.collect.ImmutableMap;
import org.apache.helix.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
/**
* Created with IntelliJ IDEA.
@@ -58,4 +60,64 @@ public class TestInstanceConfig {
Map<String, String> parsedDomain = instanceConfig.getDomainAsMap();
Assert.assertTrue(parsedDomain.isEmpty());
}
+
+ @Test
+ public void testGetInstanceCapacityMap() {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", 3);
+
+ Map<String, String> capacityDataMapString = ImmutableMap.of("item1", "1",
+ "item2", "2",
+ "item3", "3");
+
+ ZNRecord rec = new ZNRecord("testId");
+ rec.setMapField(InstanceConfig.InstanceConfigProperty.INSTANCE_CAPACITY_MAP.name(), capacityDataMapString);
+ InstanceConfig testConfig = new InstanceConfig(rec);
+
+ Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(capacityDataMap));
+ }
+
+ @Test
+ public void testGetInstanceCapacityMapEmpty() {
+ InstanceConfig testConfig = new InstanceConfig("testId");
+
+ Assert.assertTrue(testConfig.getInstanceCapacityMap().equals(Collections.emptyMap()));
+ }
+
+ @Test
+ public void testSetInstanceCapacityMap() {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", 3);
+
+ Map<String, String> capacityDataMapString = ImmutableMap.of("item1", "1",
+ "item2", "2",
+ "item3", "3");
+
+ InstanceConfig testConfig = new InstanceConfig("testConfig");
+ testConfig.setInstanceCapacityMap(capacityDataMap);
+
+ Assert.assertEquals(testConfig.getRecord().getMapField(InstanceConfig.InstanceConfigProperty.
+ INSTANCE_CAPACITY_MAP.name()), capacityDataMapString);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty")
+ public void testSetInstanceCapacityMapEmpty() {
+ Map<String, Integer> capacityDataMap = new HashMap<>();
+
+ InstanceConfig testConfig = new InstanceConfig("testConfig");
+ testConfig.setInstanceCapacityMap(capacityDataMap);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Capacity Data contains a negative value: item3 = -3")
+ public void testSetInstanceCapacityMapInvalid() {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", -3);
+
+ InstanceConfig testConfig = new InstanceConfig("testConfig");
+ testConfig.setInstanceCapacityMap(capacityDataMap);
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java
new file mode 100644
index 0000000..8099486
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/model/TestResourceConfig.java
@@ -0,0 +1,186 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ZNRecord;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestResourceConfig {
+ private static final ObjectMapper _objectMapper = new ObjectMapper();
+
+ @Test
+ public void testGetPartitionCapacityMap() throws IOException {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", 3);
+
+ ZNRecord rec = new ZNRecord("testId");
+ rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections
+ .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY,
+ _objectMapper.writeValueAsString(capacityDataMap)));
+ ResourceConfig testConfig = new ResourceConfig(rec);
+
+ Assert.assertTrue(testConfig.getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY)
+ .equals(capacityDataMap));
+ }
+
+ @Test
+ public void testGetPartitionCapacityMapEmpty() throws IOException {
+ ResourceConfig testConfig = new ResourceConfig("testId");
+
+ Assert.assertTrue(testConfig.getPartitionCapacityMap().equals(Collections.emptyMap()));
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void testGetPartitionCapacityMapInvalidJson() throws IOException {
+ ZNRecord rec = new ZNRecord("testId");
+ rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(),
+ Collections.singletonMap("test", "gibberish"));
+ ResourceConfig testConfig = new ResourceConfig(rec);
+
+ testConfig.getPartitionCapacityMap();
+ }
+
+ @Test(dependsOnMethods = "testGetPartitionCapacityMap", expectedExceptions = IOException.class)
+ public void testGetPartitionCapacityMapInvalidJsonType() throws IOException {
+ Map<String, String> capacityDataMap = ImmutableMap.of("item1", "1",
+ "item2", "2",
+ "item3", "three");
+
+ ZNRecord rec = new ZNRecord("testId");
+ rec.setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(), Collections
+ .singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY,
+ _objectMapper.writeValueAsString(capacityDataMap)));
+ ResourceConfig testConfig = new ResourceConfig(rec);
+
+ testConfig.getPartitionCapacityMap();
+ }
+
+ @Test
+ public void testSetPartitionCapacityMap() throws IOException {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", 3);
+
+ ResourceConfig testConfig = new ResourceConfig("testConfig");
+ testConfig.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap));
+
+ Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+ PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY),
+ _objectMapper.writeValueAsString(capacityDataMap));
+ }
+
+ @Test
+ public void testSetMultiplePartitionCapacityMap() throws IOException {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", 3);
+
+ Map<String, Map<String, Integer>> totalCapacityMap =
+ ImmutableMap.of(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap,
+ "partition2", capacityDataMap,
+ "partition3", capacityDataMap);
+
+ ResourceConfig testConfig = new ResourceConfig("testConfig");
+ testConfig.setPartitionCapacityMap(totalCapacityMap);
+
+ Assert.assertNull(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+ PARTITION_CAPACITY_MAP.name()).get("partition1"));
+ Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+ PARTITION_CAPACITY_MAP.name()).get(ResourceConfig.DEFAULT_PARTITION_KEY),
+ _objectMapper.writeValueAsString(capacityDataMap));
+ Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+ PARTITION_CAPACITY_MAP.name()).get("partition2"),
+ _objectMapper.writeValueAsString(capacityDataMap));
+ Assert.assertEquals(testConfig.getRecord().getMapField(ResourceConfig.ResourceConfigProperty.
+ PARTITION_CAPACITY_MAP.name()).get("partition3"),
+ _objectMapper.writeValueAsString(capacityDataMap));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data is empty")
+ public void testSetPartitionCapacityMapEmpty() throws IOException {
+ Map<String, Integer> capacityDataMap = new HashMap<>();
+
+ ResourceConfig testConfig = new ResourceConfig("testConfig");
+ testConfig.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.")
+ public void testSetPartitionCapacityMapWithoutDefault() throws IOException {
+ Map<String, Integer> capacityDataMap = new HashMap<>();
+
+ ResourceConfig testConfig = new ResourceConfig("testConfig");
+ testConfig.setPartitionCapacityMap(
+ Collections.singletonMap("Random", capacityDataMap));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Capacity Data contains a negative value:.+")
+ public void testSetPartitionCapacityMapInvalid() throws IOException {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", -3);
+
+ ResourceConfig testConfig = new ResourceConfig("testConfig");
+ testConfig.setPartitionCapacityMap(
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMap));
+ }
+
+ @Test
+ public void testWithResourceBuilder() throws IOException {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", 3);
+
+ ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig");
+ builder.setPartitionCapacity(capacityDataMap);
+ builder.setPartitionCapacity("partition1", capacityDataMap);
+
+ Assert.assertEquals(
+ builder.build().getPartitionCapacityMap().get(ResourceConfig.DEFAULT_PARTITION_KEY),
+ capacityDataMap);
+ Assert.assertEquals(
+ builder.build().getPartitionCapacityMap().get("partition1"),
+ capacityDataMap);
+ Assert.assertNull(
+ builder.build().getPartitionCapacityMap().get("Random"));
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "The default partition capacity with the default key DEFAULT is required.")
+ public void testWithResourceBuilderInvalidInput() {
+ Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1,
+ "item2", 2,
+ "item3", 3);
+
+ ResourceConfig.Builder builder = new ResourceConfig.Builder("testConfig");
+ builder.setPartitionCapacity("Random", capacityDataMap);
+
+ builder.build();
+ }
+}