You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/11/13 19:17:58 UTC
[helix] branch wagedRebalancer updated: Add Java API for adding and
validating resources for WAGED rebalancer (#570)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedRebalancer by this push:
new 043f445 Add Java API for adding and validating resources for WAGED rebalancer (#570)
043f445 is described below
commit 043f445d336317fd48a02eb42d6bc2059726c79a
Author: Hunter Lee <na...@gmail.com>
AuthorDate: Wed Nov 13 11:17:49 2019 -0800
Add Java API for adding and validating resources for WAGED rebalancer (#570)
Add Java API methods for adding and validating resources for WAGED rebalancer. This is a set of convenience APIs provided through HelixAdmin the user could use to more easily add resources and validate them for WAGED rebalance usage.
Changelist:
1. Add API methods in HelixAdmin
2. Implement the said methods
3. Add tests
---
.../src/main/java/org/apache/helix/HelixAdmin.java | 49 ++++
.../rebalancer/util/WagedValidationUtil.java | 91 +++++++
.../rebalancer/waged/model/AssignableNode.java | 36 +--
.../rebalancer/waged/model/AssignableReplica.java | 24 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 282 ++++++++++++++++++---
.../rebalancer/waged/model/TestAssignableNode.java | 2 +-
.../apache/helix/manager/zk/TestZkHelixAdmin.java | 109 ++++++++
.../java/org/apache/helix/mock/MockHelixAdmin.java | 22 ++
8 files changed, 538 insertions(+), 77 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 7402c19..1b0544d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -22,6 +22,7 @@ package org.apache.helix;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
import org.apache.helix.model.ConstraintItem;
@@ -30,8 +31,10 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+
/*
* Helix cluster management
*/
@@ -576,4 +579,50 @@ public interface HelixAdmin {
* Release resources
*/
void close();
+
+ /**
+ * Adds a resource with IdealState and ResourceConfig to be rebalanced by WAGED rebalancer with validation.
+ * Validation includes the following:
+ * 1. Check ResourceConfig has the WEIGHT field
+ * 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
+ * 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
+ * @param clusterName
+ * @param idealState
+ * @param resourceConfig
+ * @return true if the resource has been added successfully. False otherwise
+ */
+ boolean addResourceWithWeight(String clusterName, IdealState idealState,
+ ResourceConfig resourceConfig);
+
+ /**
+ * Batch-enables Waged rebalance for the names of resources given.
+ * @param clusterName
+ * @param resourceNames
+ * @return
+ */
+ boolean enableWagedRebalance(String clusterName, List<String> resourceNames);
+
+ /**
+ * Validates the resources to see if their weight configs have been set properly.
+ * Validation includes the following:
+ * 1. Check ResourceConfig has the WEIGHT field
+ * 2. Check that all capacity keys from ClusterConfig are set up in the WEIGHT field
+ * 3. Check that all ResourceConfig's weightMap fields have all of the capacity keys
+ * @param resourceNames
+ * @return for each resource, true if the weight configs have been set properly, false otherwise
+ */
+ Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
+ List<String> resourceNames);
+
+ /**
+ * Validates the instances to ensure their weights in InstanceConfigs have been set up properly.
+ * Validation includes the following:
+ * 1. If default instance capacity is not set, check that the InstanceConfigs have the CAPACITY field
+ * 2. Check that all capacity keys defined in ClusterConfig are present in the CAPACITY field
+ * @param clusterName
+ * @param instancesNames
+ * @return
+ */
+ Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
+ List<String> instancesNames);
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
new file mode 100644
index 0000000..e9f86e7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedValidationUtil.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceConfig;
+
+
+/**
+ * A util class that contains validation-related static methods for WAGED rebalancer.
+ */
+public class WagedValidationUtil {
+ /**
+ * Validates and returns instance capacities. The validation logic ensures that all required capacity keys (in ClusterConfig) are present in InstanceConfig.
+ * @param clusterConfig
+ * @param instanceConfig
+ * @return
+ */
+ public static Map<String, Integer> validateAndGetInstanceCapacity(ClusterConfig clusterConfig,
+ InstanceConfig instanceConfig) {
+ // Fetch the capacity of instance from 2 possible sources according to the following priority.
+ // 1. The instance capacity that is configured in the instance config.
+ // 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
+ Map<String, Integer> instanceCapacity =
+ new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
+ instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());
+
+ List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+ // All the required keys must exist in the instance config.
+ if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
+ throw new HelixException(String.format(
+ "The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
+ requiredCapacityKeys.toString(), instanceConfig.getInstanceName(),
+ instanceCapacity.toString()));
+ }
+ return instanceCapacity;
+ }
+
+ /**
+ * Validates and returns partition capacities. The validation logic ensures that all required capacity keys (from ClusterConfig) are present in the ResourceConfig for the partition.
+ * @param partitionName
+ * @param resourceConfig
+ * @param clusterConfig
+ * @return
+ */
+ public static Map<String, Integer> validateAndGetPartitionCapacity(String partitionName,
+ ResourceConfig resourceConfig, Map<String, Map<String, Integer>> capacityMap,
+ ClusterConfig clusterConfig) {
+ // Fetch the capacity of partition from 3 possible sources according to the following priority.
+ // 1. The partition capacity that is explicitly configured in the resource config.
+ // 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
+ // 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
+ Map<String, Integer> partitionCapacity =
+ new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
+ partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
+ capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));
+
+ List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
+ // If any required capacity key is not configured in the resource config, fail the model creating.
+ if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
+ throw new HelixException(String.format(
+ "The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
+ requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
+ partitionCapacity.toString()));
+ }
+ return partitionCapacity;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 6beda6a..09a3cba 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -31,11 +31,13 @@ import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* This class represents a possible allocation of the replication.
* Note that any usage updates to the AssignableNode are not thread safe.
@@ -113,7 +115,7 @@ public class AssignableNode implements Comparable<AssignableNode> {
void assign(AssignableReplica assignableReplica) {
addToAssignmentRecord(assignableReplica);
assignableReplica.getCapacity().entrySet().stream()
- .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
+ .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
}
/**
@@ -121,7 +123,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
* If the replication is not on this node, the assignable node is not updated.
* @param replica - the replica to be released
*/
- void release(AssignableReplica replica) throws IllegalArgumentException {
+ void release(AssignableReplica replica)
+ throws IllegalArgumentException {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();
@@ -320,12 +323,12 @@ public class AssignableNode implements Comparable<AssignableNode> {
private void addToAssignmentRecord(AssignableReplica replica) {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();
- if (_currentAssignedReplicaMap.containsKey(resourceName)
- && _currentAssignedReplicaMap.get(resourceName).containsKey(partitionName)) {
- throw new HelixException(String.format(
- "Resource %s already has a replica with state %s from partition %s on node %s",
- replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
- getInstanceName()));
+ if (_currentAssignedReplicaMap.containsKey(resourceName) && _currentAssignedReplicaMap
+ .get(resourceName).containsKey(partitionName)) {
+ throw new HelixException(String
+ .format("Resource %s already has a replica with state %s from partition %s on node %s",
+ replica.getResourceName(), replica.getReplicaState(), replica.getPartitionName(),
+ getInstanceName()));
} else {
_currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new HashMap<>())
.put(partitionName, replica);
@@ -348,23 +351,10 @@ public class AssignableNode implements Comparable<AssignableNode> {
*/
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
InstanceConfig instanceConfig) {
- // Fetch the capacity of instance from 2 possible sources according to the following priority.
- // 1. The instance capacity that is configured in the instance config.
- // 2. If the default instance capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
Map<String, Integer> instanceCapacity =
- new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
- instanceCapacity.putAll(instanceConfig.getInstanceCapacityMap());
-
- List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
- // All the required keys must exist in the instance config.
- if (!instanceCapacity.keySet().containsAll(requiredCapacityKeys)) {
- throw new HelixException(String.format(
- "The required capacity keys: %s are not fully configured in the instance: %s, capacity map: %s.",
- requiredCapacityKeys.toString(), _instanceName, instanceCapacity.toString()));
- }
+ WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
// Remove all the non-required capacity items from the map.
- instanceCapacity.keySet().retainAll(requiredCapacityKeys);
-
+ instanceCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return instanceCapacity;
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index b679a54..12b5105 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixException;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -149,27 +150,10 @@ public class AssignableReplica implements Comparable<AssignableReplica> {
"Invalid partition capacity configuration of resource: " + resourceConfig
.getResourceName(), ex);
}
-
- // Fetch the capacity of partition from 3 possible sources according to the following priority.
- // 1. The partition capacity that is explicitly configured in the resource config.
- // 2. Or, the default partition capacity that is configured under partition name DEFAULT_PARTITION_KEY in the resource config.
- // 3. If the default partition capacity that is configured in the cluster config contains more capacity keys, fill the capacity map with those additional values.
- Map<String, Integer> partitionCapacity =
- new HashMap<>(clusterConfig.getDefaultPartitionWeightMap());
- partitionCapacity.putAll(capacityMap.getOrDefault(partitionName,
- capacityMap.getOrDefault(ResourceConfig.DEFAULT_PARTITION_KEY, new HashMap<>())));
-
- List<String> requiredCapacityKeys = clusterConfig.getInstanceCapacityKeys();
- // If any required capacity key is not configured in the resource config, fail the model creating.
- if (!partitionCapacity.keySet().containsAll(requiredCapacityKeys)) {
- throw new HelixException(String.format(
- "The required capacity keys: %s are not fully configured in the resource: %s, partition: %s, weight map: %s.",
- requiredCapacityKeys.toString(), resourceConfig.getResourceName(), partitionName,
- partitionCapacity.toString()));
- }
+ Map<String, Integer> partitionCapacity = WagedValidationUtil
+ .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, clusterConfig);
// Remove the non-required capacity items.
- partitionCapacity.keySet().retainAll(requiredCapacityKeys);
-
+ partitionCapacity.keySet().retainAll(clusterConfig.getInstanceCapacityKeys());
return partitionCapacity;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 12ab636..bbdb3f7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
@@ -56,6 +57,10 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.ClusterConfig;
@@ -75,6 +80,7 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.HelixUtil;
@@ -83,6 +89,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class ZKHelixAdmin implements HelixAdmin {
public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
private static final String MAINTENANCE_ZNODE_ID = "maintenance";
@@ -173,7 +180,7 @@ public class ZKHelixAdmin implements HelixAdmin {
// does not repeatedly write instance history)
logger.warn("Retrying dropping instance {} with exception {}",
instanceConfig.getInstanceName(), e.getCause().getMessage());
- retryCnt ++;
+ retryCnt++;
} else {
logger.error("Failed to drop instance {} (not retryable).",
instanceConfig.getInstanceName(), e.getCause());
@@ -396,7 +403,8 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
- return accessor.getBaseDataAccessor().exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
+ return accessor.getBaseDataAccessor()
+ .exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
}
@Override
@@ -429,16 +437,16 @@ public class ZKHelixAdmin implements HelixAdmin {
* @param customFields
* @param triggeringEntity
*/
- private void processMaintenanceMode(String clusterName, final boolean enabled, final String reason,
- final MaintenanceSignal.AutoTriggerReason internalReason, final Map<String, String> customFields,
+ private void processMaintenanceMode(String clusterName, final boolean enabled,
+ final String reason, final MaintenanceSignal.AutoTriggerReason internalReason,
+ final Map<String, String> customFields,
final MaintenanceSignal.TriggeringEntity triggeringEntity) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName,
triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically"
- : "manually",
- enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
+ : "manually", enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
final long currentTime = System.currentTimeMillis();
if (!enabled) {
// Exit maintenance mode
@@ -452,23 +460,23 @@ public class ZKHelixAdmin implements HelixAdmin {
maintenanceSignal.setTimestamp(currentTime);
maintenanceSignal.setTriggeringEntity(triggeringEntity);
switch (triggeringEntity) {
- case CONTROLLER:
- // autoEnable
- maintenanceSignal.setAutoTriggerReason(internalReason);
- break;
- case USER:
- case UNKNOWN:
- // manuallyEnable
- if (customFields != null && !customFields.isEmpty()) {
- // Enter all custom fields provided by the user
- Map<String, String> simpleFields = maintenanceSignal.getRecord().getSimpleFields();
- for (Map.Entry<String, String> entry : customFields.entrySet()) {
- if (!simpleFields.containsKey(entry.getKey())) {
- simpleFields.put(entry.getKey(), entry.getValue());
+ case CONTROLLER:
+ // autoEnable
+ maintenanceSignal.setAutoTriggerReason(internalReason);
+ break;
+ case USER:
+ case UNKNOWN:
+ // manuallyEnable
+ if (customFields != null && !customFields.isEmpty()) {
+ // Enter all custom fields provided by the user
+ Map<String, String> simpleFields = maintenanceSignal.getRecord().getSimpleFields();
+ for (Map.Entry<String, String> entry : customFields.entrySet()) {
+ if (!simpleFields.containsKey(entry.getKey())) {
+ simpleFields.put(entry.getKey(), entry.getValue());
+ }
}
}
- }
- break;
+ break;
}
if (!accessor.createMaintenance(maintenanceSignal)) {
throw new HelixException("Failed to create maintenance signal!");
@@ -476,16 +484,17 @@ public class ZKHelixAdmin implements HelixAdmin {
}
// Record a MaintenanceSignal history
- if (!accessor.getBaseDataAccessor().update(keyBuilder.controllerLeaderHistory().getPath(),
- new DataUpdater<ZNRecord>() {
+ if (!accessor.getBaseDataAccessor()
+ .update(keyBuilder.controllerLeaderHistory().getPath(), new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord oldRecord) {
try {
if (oldRecord == null) {
oldRecord = new ZNRecord(PropertyType.HISTORY.toString());
}
- return new ControllerHistory(oldRecord).updateMaintenanceHistory(enabled, reason,
- currentTime, internalReason, customFields, triggeringEntity);
+ return new ControllerHistory(oldRecord)
+ .updateMaintenanceHistory(enabled, reason, currentTime, internalReason,
+ customFields, triggeringEntity);
} catch (IOException e) {
logger.error("Failed to update maintenance history! Exception: {}", e);
return oldRecord;
@@ -1234,7 +1243,8 @@ public class ZKHelixAdmin implements HelixAdmin {
setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord));
}
- private static byte[] readFile(String filePath) throws IOException {
+ private static byte[] readFile(String filePath)
+ throws IOException {
File file = new File(filePath);
int size = (int) file.length();
@@ -1257,7 +1267,8 @@ public class ZKHelixAdmin implements HelixAdmin {
@Override
public void addStateModelDef(String clusterName, String stateModelDefName,
- String stateModelDefFile) throws IOException {
+ String stateModelDefFile)
+ throws IOException {
ZNRecord record =
(ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile)));
if (record == null || record.getId() == null || !record.getId().equals(stateModelDefName)) {
@@ -1280,9 +1291,9 @@ public class ZKHelixAdmin implements HelixAdmin {
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
- ClusterConstraints constraints = currentData == null ?
- new ClusterConstraints(constraintType) :
- new ClusterConstraints(currentData);
+ ClusterConstraints constraints =
+ currentData == null ? new ClusterConstraints(constraintType)
+ : new ClusterConstraints(currentData);
constraints.addConstraintItem(constraintId, constraintItem);
return constraints.getRecord();
@@ -1488,9 +1499,7 @@ public class ZKHelixAdmin implements HelixAdmin {
+ ", instance config does not exist");
}
- baseAccessor.update(path, new DataUpdater<ZNRecord>()
-
- {
+ baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
if (currentData == null) {
@@ -1578,4 +1587,211 @@ public class ZKHelixAdmin implements HelixAdmin {
}
}
+ @Override
+ public boolean addResourceWithWeight(String clusterName, IdealState idealState,
+ ResourceConfig resourceConfig) {
+ // Null checks
+ if (clusterName == null || clusterName.isEmpty()) {
+ throw new HelixException("Cluster name is null or empty!");
+ }
+ if (idealState == null || !idealState.isValid()) {
+ throw new HelixException("IdealState is null or invalid!");
+ }
+ if (resourceConfig == null || !resourceConfig.isValid()) {
+ // TODO This might be okay because of default weight?
+ throw new HelixException("ResourceConfig is null or invalid!");
+ }
+
+ // Make sure IdealState and ResourceConfig are for the same resource
+ if (!idealState.getResourceName().equals(resourceConfig.getResourceName())) {
+ throw new HelixException("Resource names in IdealState and ResourceConfig are different!");
+ }
+
+ // Order in which a resource should be added:
+ // 1. Validate the weights in ResourceConfig against ClusterConfig
+ // Check that all capacity keys in ClusterConfig are set up in every partition in ResourceConfig field
+ if (!validateWeightForResourceConfig(_configAccessor.getClusterConfig(clusterName),
+ resourceConfig, idealState)) {
+ throw new HelixException(String
+ .format("Could not add resource %s with weight! Failed to validate the ResourceConfig!",
+ idealState.getResourceName()));
+ }
+
+ // 2. Add the resourceConfig to ZK
+ _configAccessor
+ .setResourceConfig(clusterName, resourceConfig.getResourceName(), resourceConfig);
+
+ // 3. Add the idealState to ZK
+ setResourceIdealState(clusterName, idealState.getResourceName(), idealState);
+
+ // 4. rebalance the resource
+ rebalance(clusterName, idealState.getResourceName(), Integer.parseInt(idealState.getReplicas()),
+ idealState.getResourceName(), idealState.getInstanceGroupTag());
+
+ return true;
+ }
+
+ @Override
+ public boolean enableWagedRebalance(String clusterName, List<String> resourceNames) {
+ // Null checks
+ if (clusterName == null || clusterName.isEmpty()) {
+ throw new HelixException("Cluster name is invalid!");
+ }
+ if (resourceNames == null || resourceNames.isEmpty()) {
+ throw new HelixException("Resource name list is invalid!");
+ }
+
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates());
+ List<String> nullIdealStates = new ArrayList<>();
+ for (int i = 0; i < idealStates.size(); i++) {
+ if (idealStates.get(i) == null) {
+ nullIdealStates.add(resourceNames.get(i));
+ } else {
+ idealStates.get(i).setRebalancerClassName(WagedRebalancer.class.getName());
+ idealStates.get(i).setRebalanceMode(RebalanceMode.FULL_AUTO);
+ }
+ }
+ if (!nullIdealStates.isEmpty()) {
+ throw new HelixException(
+ String.format("Not all IdealStates exist in the cluster: %s", nullIdealStates));
+ }
+ List<PropertyKey> idealStateKeys = new ArrayList<>();
+ idealStates.forEach(
+ idealState -> idealStateKeys.add(keyBuilder.idealStates(idealState.getResourceName())));
+ boolean[] success = accessor.setChildren(idealStateKeys, idealStates);
+ for (boolean s : success) {
+ if (!s) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
+ List<String> resourceNames) {
+ // Null checks
+ if (clusterName == null || clusterName.isEmpty()) {
+ throw new HelixException("Cluster name is invalid!");
+ }
+ if (resourceNames == null || resourceNames.isEmpty()) {
+ throw new HelixException("Resource name list is invalid!");
+ }
+
+ // Ensure that all instances are valid
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+ List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
+ if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) {
+ throw new HelixException(String
+ .format("Instance capacities haven't been configured properly for cluster %s",
+ clusterName));
+ }
+
+ Map<String, Boolean> result = new HashMap<>();
+ ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
+ for (String resourceName : resourceNames) {
+ IdealState idealState = getResourceIdealState(clusterName, resourceName);
+ if (idealState == null || !idealState.isValid()) {
+ result.put(resourceName, false);
+ continue;
+ }
+ ResourceConfig resourceConfig = _configAccessor.getResourceConfig(clusterName, resourceName);
+ result.put(resourceName,
+ validateWeightForResourceConfig(clusterConfig, resourceConfig, idealState));
+ }
+ return result;
+ }
+
+ @Override
+ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
+ List<String> instanceNames) {
+ // Null checks
+ if (clusterName == null || clusterName.isEmpty()) {
+ throw new HelixException("Cluster name is invalid!");
+ }
+ if (instanceNames == null || instanceNames.isEmpty()) {
+ throw new HelixException("Instance name list is invalid!");
+ }
+
+ Map<String, Boolean> result = new HashMap<>();
+ ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
+ for (String instanceName : instanceNames) {
+ InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(clusterName, instanceName);
+ if (instanceConfig == null || !instanceConfig.isValid()) {
+ result.put(instanceName, false);
+ continue;
+ }
+ WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
+ result.put(instanceName, true);
+ }
+
+ return result;
+ }
+
+ /**
+ * Validates ResourceConfig's weight field against the given ClusterConfig.
+ * @param clusterConfig
+ * @param resourceConfig
+ * @param idealState
+ * @return true if ResourceConfig has all the required fields. False otherwise.
+ */
+ private boolean validateWeightForResourceConfig(ClusterConfig clusterConfig,
+ ResourceConfig resourceConfig, IdealState idealState) {
+ if (resourceConfig == null) {
+ if (clusterConfig.getDefaultPartitionWeightMap().isEmpty()) {
+ logger.error(
+ "ResourceConfig for {} is null, and there are no default weights set in ClusterConfig!",
+ idealState.getResourceName());
+ return false;
+ }
+ // If ResourceConfig is null AND the default partition weight map is defined, and the map has all the required keys, we consider this valid since the default weights will be used
+ // Need to check the map contains all the required keys
+ if (clusterConfig.getDefaultPartitionWeightMap().keySet()
+ .containsAll(clusterConfig.getInstanceCapacityKeys())) {
+ // Contains all the required keys, so consider it valid since it will use the default weights
+ return true;
+ }
+ logger.error(
+ "ResourceConfig for {} is null, and ClusterConfig's default partition weight map doesn't have all the required keys!",
+ idealState.getResourceName());
+ return false;
+ }
+
+ // Parse the entire capacityMap from ResourceConfig
+ Map<String, Map<String, Integer>> capacityMap;
+ try {
+ capacityMap = resourceConfig.getPartitionCapacityMap();
+ } catch (IOException ex) {
+ logger.error("Invalid partition capacity configuration of resource: {}",
+ idealState.getResourceName(), ex);
+ return false;
+ }
+
+ Set<String> capacityMapSet = new HashSet<>(capacityMap.keySet());
+ boolean hasDefaultCapacity = capacityMapSet.contains(ResourceConfig.DEFAULT_PARTITION_KEY);
+ // Remove DEFAULT key
+ capacityMapSet.remove(ResourceConfig.DEFAULT_PARTITION_KEY);
+
+ // Make sure capacityMap contains all partitions defined in IdealState
+ // Here, IdealState has not been rebalanced, so listFields might be null, in which case, we would get an emptyList from getPartitionSet()
+ // So check using numPartitions instead
+ // This check allows us to fail early on instead of having to loop through all partitions
+ if (capacityMapSet.size() != idealState.getNumPartitions() && !hasDefaultCapacity) {
+ logger.error(
+ "ResourceConfig for {} does not have all partitions defined in PartitionCapacityMap!",
+ idealState.getResourceName());
+ return false;
+ }
+
+ // Loop through all partitions and validate
+ capacityMap.keySet().forEach(partitionName -> WagedValidationUtil
+ .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap,
+ clusterConfig));
+ return true;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 187c423..2b93353 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -272,7 +272,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
requiredCapacityKeys.add("AdditionalCapacityKey");
testClusterConfig.setInstanceCapacityKeys(requiredCapacityKeys);
- InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
+ InstanceConfig testInstanceConfig = new InstanceConfig(_testInstanceId);
testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId);
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index d372d67..c391085 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -19,12 +19,16 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
@@ -38,7 +42,10 @@ import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -48,18 +55,22 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.zookeeper.data.Stat;
+import org.codehaus.jackson.map.ObjectMapper;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestZkHelixAdmin extends ZkUnitTestBase {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@BeforeClass
public void beforeClass() {
@@ -505,4 +516,102 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
.getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
2);
}
+
+ /**
+ * Test addResourceWithWeight() and validateResourcesForWagedRebalance() by trying to add a resource with incomplete ResourceConfig.
+ */
+ @Test
+ public void testAddResourceWithWeightAndValidation()
+ throws IOException {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ String mockInstance = "MockInstance";
+ String testResourcePrefix = "TestResource";
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ admin.addStateModelDef(clusterName, "MasterSlave", new MasterSlaveSMD());
+
+ // Create a dummy instance
+ InstanceConfig instanceConfig = new InstanceConfig(mockInstance);
+ Map<String, Integer> mockInstanceCapacity =
+ ImmutableMap.of("WCU", 100, "RCU", 100, "STORAGE", 100);
+ instanceConfig.setInstanceCapacityMap(mockInstanceCapacity);
+ admin.addInstance(clusterName, instanceConfig);
+ MockParticipantManager mockParticipantManager =
+ new MockParticipantManager(ZK_ADDR, clusterName, mockInstance);
+ mockParticipantManager.syncStart();
+
+ IdealState idealState = new IdealState(testResourcePrefix);
+ idealState.setNumPartitions(3);
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+
+ ResourceConfig resourceConfig = new ResourceConfig(testResourcePrefix);
+ // validate
+ Map<String, Boolean> validationResult = admin.validateResourcesForWagedRebalance(clusterName,
+ Collections.singletonList(testResourcePrefix));
+ Assert.assertEquals(validationResult.size(), 1);
+ Assert.assertFalse(validationResult.get(testResourcePrefix));
+ try {
+ admin.addResourceWithWeight(clusterName, idealState, resourceConfig);
+ Assert.fail();
+ } catch (HelixException e) {
+ // OK since resourceConfig is empty
+ }
+
+ // Set PARTITION_CAPACITY_MAP
+ Map<String, String> capacityDataMap =
+ ImmutableMap.of("WCU", "1", "RCU", "2", "STORAGE", "3");
+ resourceConfig.getRecord()
+ .setMapField(ResourceConfig.ResourceConfigProperty.PARTITION_CAPACITY_MAP.name(),
+ Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY,
+ OBJECT_MAPPER.writeValueAsString(capacityDataMap)));
+
+ // validate
+ validationResult = admin.validateResourcesForWagedRebalance(clusterName,
+ Collections.singletonList(testResourcePrefix));
+ Assert.assertEquals(validationResult.size(), 1);
+ Assert.assertFalse(validationResult.get(testResourcePrefix));
+
+ // Add the capacity key to ClusterConfig
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(clusterName, _baseAccessor);
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+ ClusterConfig clusterConfig = dataAccessor.getProperty(keyBuilder.clusterConfig());
+ clusterConfig.setInstanceCapacityKeys(Arrays.asList("WCU", "RCU", "STORAGE"));
+ dataAccessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
+
+ // Should succeed now
+ Assert.assertTrue(admin.addResourceWithWeight(clusterName, idealState, resourceConfig));
+ // validate
+ validationResult = admin.validateResourcesForWagedRebalance(clusterName,
+ Collections.singletonList(testResourcePrefix));
+ Assert.assertEquals(validationResult.size(), 1);
+ Assert.assertTrue(validationResult.get(testResourcePrefix));
+ }
+
+ /**
+ * Test enabledWagedRebalance by checking the rebalancer class name changed.
+ */
+ @Test
+ public void testEnableWagedRebalance() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ String testResourcePrefix = "TestResource";
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ admin.addStateModelDef(clusterName, "MasterSlave", new MasterSlaveSMD());
+
+ // Add an IdealState
+ IdealState idealState = new IdealState(testResourcePrefix);
+ idealState.setNumPartitions(3);
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ admin.addResource(clusterName, testResourcePrefix, idealState);
+
+ admin.enableWagedRebalance(clusterName, Collections.singletonList(testResourcePrefix));
+ IdealState is = admin.getResourceIdealState(clusterName, testResourcePrefix);
+ Assert.assertEquals(is.getRebalancerClassName(), WagedRebalancer.class.getName());
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 6cb7790..4319bfb 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -38,6 +38,7 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
public class MockHelixAdmin implements HelixAdmin {
@@ -427,4 +428,25 @@ public class MockHelixAdmin implements HelixAdmin {
@Override public void close() {
}
+
+ @Override
+ public boolean addResourceWithWeight(String clusterName, IdealState idealState, ResourceConfig resourceConfig) {
+ return false;
+ }
+
+ @Override
+ public boolean enableWagedRebalance(String clusterName, List<String> resourceNames) {
+ return false;
+ }
+
+ @Override
+ public Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName, List<String> resourceNames) {
+ return null;
+ }
+
+ @Override
+ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
+ List<String> instancesNames) {
+ return null;
+ }
}