You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/04 23:25:39 UTC
[helix] branch master updated: New REST api partitionAssignment --
return potential assignment given cluster change (#1747)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new fa83198 New REST api partitionAssignment -- return potential assignment given cluster change (#1747)
fa83198 is described below
commit fa83198a0b64e6056a35b19079525df1c6924f6e
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jun 4 16:25:32 2021 -0700
New REST api partitionAssignment -- return potential assignment given cluster change (#1747)
New REST api partitionAssignment -- return potential assignment given cluster change
---
.../rebalancer/waged/ReadOnlyWagedRebalancer.java | 5 +
.../apache/helix/rest/server/ServerContext.java | 19 ++
.../resources/helix/AbstractHelixResource.java | 5 +
.../helix/ResourceAssignmentOptimizerAccessor.java | 341 +++++++++++++++++++++
.../helix/rest/server/AbstractTestClass.java | 9 +-
.../helix/rest/server/TestClusterAccessor.java | 3 -
.../TestResourceAssignmentOptimizerAccessor.java | 211 +++++++++++++
7 files changed, 589 insertions(+), 4 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index e94148e..d1075d4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -88,5 +88,10 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}
+
+ @Override
+ // BucketDataAccessor will be reused, won't be closed here.
+ public void close() {
+ }
}
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 2549fb7..a1cfb66 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -33,6 +33,7 @@ import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
import org.apache.helix.task.TaskDriver;
@@ -76,6 +77,8 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
private final Map<String, HelixDataAccessor> _helixDataAccessorPool;
// 1 Cluster name will correspond to 1 task driver
private final Map<String, TaskDriver> _taskDriverPool;
+ // Create ZkBucketDataAccessor for ReadOnlyWagedRebalancer.
+ private volatile ZkBucketDataAccessor _zkBucketDataAccessor;
/**
* Multi-ZK support
@@ -277,6 +280,18 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
return _byteArrayZkBaseDataAccessor;
}
+ public ZkBucketDataAccessor getZkBucketDataAccessor() {
+ // ZkBucketDataAccessor constructor will handle realmZK case (when _zkAddr is null)
+ if (_zkBucketDataAccessor == null) {
+ synchronized (this) {
+ if (_zkBucketDataAccessor == null) {
+ _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);
+ }
+ }
+ }
+ return _zkBucketDataAccessor;
+ }
+
public void close() {
if (_zkClient != null) {
_zkClient.close();
@@ -393,6 +408,10 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
_byteArrayZkBaseDataAccessor.close();
_byteArrayZkBaseDataAccessor = null;
}
+ if (_zkBucketDataAccessor != null) {
+ _zkBucketDataAccessor.close();
+ _zkBucketDataAccessor = null;
+ }
_helixDataAccessorPool.clear();
_taskDriverPool.clear();
} catch (Exception e) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
index 5bd8f13..a74e3cb 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -25,6 +25,7 @@ import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.rest.common.ContextPropertyKeys;
import org.apache.helix.rest.server.ServerContext;
import org.apache.helix.rest.server.resources.AbstractResource;
@@ -81,6 +82,10 @@ public class AbstractHelixResource extends AbstractResource {
return getServerContext().getByteArrayZkBaseDataAccessor();
}
+ protected ZkBucketDataAccessor getZkBucketDataAccessor() {
+ return getServerContext().getZkBucketDataAccessor();
+ }
+
protected static ZNRecord toZNRecord(String data)
throws IOException {
return ZNRECORD_READER.readValue(data);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
new file mode 100644
index 0000000..f6b9b82
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -0,0 +1,341 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Path("/clusters/{clusterId}/partitionAssignment")
+public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
+ private static Logger LOG = LoggerFactory.getLogger(
+ org.apache.helix.rest.server.resources.helix.ResourceAssignmentOptimizerAccessor.class
+ .getName());
+
+ private static class InputFields {
+ List<String> newInstances = new ArrayList<>();
+ List<String> instancesToRemove = new ArrayList<>();
+ Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
+ Set<String> instanceFilter = new HashSet<>();
+ Set<String> resourceFilter = new HashSet<>();
+ }
+
+ // TODO: We could add a data cache here to avoid read latency.
+ private static class ClusterState {
+ List<InstanceConfig> instanceConfigs = new ArrayList<>();
+ ClusterConfig clusterConfig;
+ List<String> resources = new ArrayList<>();
+ List<String> instances; // cluster LiveInstance + addInstances - instancesToRemove.
+ }
+
+ // Result format: Map of resource -> partition -> instance -> state.
+ private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> {
+ public AssignmentResult() {
+ super();
+ }
+ }
+
+ private static class InputJsonContent {
+ @JsonProperty("InstanceChange")
+ InstanceChangeMap instanceChangeMap;
+ @JsonProperty("Options")
+ OptionsMap optionsMap;
+ }
+
+ private static class InstanceChangeMap {
+ @JsonProperty("AddInstances")
+ List<String> addInstances;
+ @JsonProperty("RemoveInstances")
+ List<String> removeInstances;
+ @JsonProperty("SwapInstances")
+ Map<String, String> swapInstances;
+ }
+
+ private static class OptionsMap {
+ @JsonProperty("InstanceFilter")
+ Set<String> instanceFilter;
+ @JsonProperty("ResourceFilter")
+ Set<String> resourceFilter;
+ }
+
+ @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+ @Timed(name = HttpConstants.WRITE_REQUEST)
+ @POST
+ public Response computePotentialAssignment(@PathParam("clusterId") String clusterId,
+ String content) {
+
+ InputFields inputFields;
+ ClusterState clusterState;
+ AssignmentResult result;
+
+ try {
+ // 1. Try to parse the content string. If parseable, use it as a KV map. Otherwise, return a REASON String
+ inputFields = readInput(content);
+ // 2. Read cluster status from ZK.
+ clusterState = readClusterStateAndValidateInput(clusterId, inputFields);
+ // 3. Call rebalancer tools for each resource.
+ result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId);
+ // 4. Serialize result to JSON and return.
+ return JSONRepresentation(result);
+ } catch (InvalidParameterException ex) {
+ return badRequest(ex.getMessage());
+ } catch (JsonProcessingException e) {
+ return badRequest("Invalid input: Input can not be parsed into a KV map." + e.getMessage());
+ } catch (OutOfMemoryError e) {
+ LOG.error("OutOfMemoryError while calling partitionAssignment" + Arrays
+ .toString(e.getStackTrace()));
+ return badRequest(
+ "Response size is too large to serialize. Please query by resources or instance filter");
+ } catch (Exception e) {
+ LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace()));
+ return badRequest("Failed to compute partition assignment: " + e);
+ }
+ }
+
+ private InputFields readInput(String content)
+ throws InvalidParameterException, JsonProcessingException {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ InputJsonContent inputJsonContent = objectMapper.readValue(content, InputJsonContent.class);
+ InputFields inputFields = new InputFields();
+
+ if (inputJsonContent.instanceChangeMap != null) {
+ Optional.ofNullable(inputJsonContent.instanceChangeMap.addInstances)
+ .ifPresent(inputFields.newInstances::addAll);
+ Optional.ofNullable(inputJsonContent.instanceChangeMap.removeInstances)
+ .ifPresent(inputFields.instancesToRemove::addAll);
+ Optional.ofNullable(inputJsonContent.instanceChangeMap.swapInstances)
+ .ifPresent(inputFields.nodeSwap::putAll);
+ }
+ if (inputJsonContent.optionsMap != null) {
+ Optional.ofNullable(inputJsonContent.optionsMap.resourceFilter)
+ .ifPresent(inputFields.resourceFilter::addAll);
+ Optional.ofNullable(inputJsonContent.optionsMap.instanceFilter)
+ .ifPresent(inputFields.instanceFilter::addAll);
+ }
+
+ return inputFields;
+ }
+
+ private ClusterState readClusterStateAndValidateInput(String clusterId, InputFields inputFields)
+ throws InvalidParameterException {
+
+ ClusterState clusterState = new ClusterState();
+ ConfigAccessor cfgAccessor = getConfigAccessor();
+ HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+ clusterState.resources = dataAccessor.getChildNames(dataAccessor.keyBuilder().idealStates());
+ // Add existing live instances and new instances from user input to instances list.
+ clusterState.instances = dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+ clusterState.instances.addAll(inputFields.newInstances);
+
+ // Check if to be removed instances and old instances in swap node exist in live instance.
+ if (!inputFields.nodeSwap.isEmpty() || !inputFields.instancesToRemove.isEmpty()) {
+ Set<String> liveInstanceSet = new HashSet<>(clusterState.instances);
+ for (Map.Entry<String, String> nodeSwapPair : inputFields.nodeSwap.entrySet()) {
+ if (!liveInstanceSet.contains(nodeSwapPair.getKey())) {
+ throw new InvalidParameterException("Invalid input: instance [" + nodeSwapPair.getKey()
+ + "] in SwapInstances does not exist in cluster.");
+ }
+ }
+ for (String instanceToRemove : inputFields.instancesToRemove) {
+ if (!liveInstanceSet.contains(instanceToRemove)) {
+ throw new InvalidParameterException("Invalid input: instance [" + instanceToRemove
+ + "] in RemoveInstances does not exist in cluster.");
+ }
+ }
+ if (!inputFields.instancesToRemove.isEmpty()) {
+ clusterState.instances.removeIf(inputFields.instancesToRemove::contains);
+ }
+ }
+
+ // Read instance and cluster config.
+ // It will throw exception if there is no instanceConfig for newly added instance.
+ for (String instance : clusterState.instances) {
+ InstanceConfig config = cfgAccessor.getInstanceConfig(clusterId, instance);
+ clusterState.instanceConfigs.add(config);
+ }
+ clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId);
+ return clusterState;
+ }
+
+ private AssignmentResult computeOptimalAssignmentForResources(InputFields inputFields,
+ ClusterState clusterState, String clusterId) throws Exception {
+
+ AssignmentResult result = new AssignmentResult();
+ // Iterate through resources, read resource level info and get potential assignment.
+ HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+ List<IdealState> wagedResourceIdealState = new ArrayList<>();
+
+ for (String resource : clusterState.resources) {
+ IdealState idealState =
+ dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates(resource));
+ // Compute all Waged resources in a batch later.
+ if (idealState.getRebalancerClassName() != null && idealState.getRebalancerClassName()
+ .equals(WagedRebalancer.class.getName())) {
+ wagedResourceIdealState.add(idealState);
+ continue;
+ }
+ // For non Waged resources, we don't compute resources not in white list.
+ if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) {
+ continue;
+ }
+ // Use getIdealAssignmentForFullAuto for FULL_AUTO resource.
+ Map<String, Map<String, String>> partitionAssignments;
+ if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
+ String rebalanceStrategy = idealState.getRebalanceStrategy();
+ if (rebalanceStrategy == null || rebalanceStrategy
+ .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
+ rebalanceStrategy = AutoRebalanceStrategy.class.getName();
+ }
+ partitionAssignments = new TreeMap<>(HelixUtil
+ .getIdealAssignmentForFullAuto(clusterState.clusterConfig, clusterState.instanceConfigs,
+ clusterState.instances, idealState, new ArrayList<>(idealState.getPartitionSet()),
+ rebalanceStrategy));
+ instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
+ } else if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) {
+ // Use computeIdealMapping for SEMI_AUTO resource.
+ Map<String, List<String>> preferenceLists = idealState.getPreferenceLists();
+ partitionAssignments = new TreeMap<>();
+ HashSet<String> liveInstances = new HashSet<>(clusterState.instances);
+ List<String> disabledInstance =
+ clusterState.instanceConfigs.stream().filter(enabled -> !enabled.getInstanceEnabled())
+ .map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+ liveInstances.removeAll(disabledInstance);
+ StateModelDefinition stateModelDef = dataAccessor
+ .getProperty(dataAccessor.keyBuilder().stateModelDef(idealState.getStateModelDefRef()));
+ for (String partitionName : preferenceLists.keySet()) {
+ if (!preferenceLists.get(partitionName).isEmpty() && preferenceLists.get(partitionName)
+ .get(0)
+ .equalsIgnoreCase(ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.name())) {
+ partitionAssignments.put(partitionName, HelixUtil
+ .computeIdealMapping(clusterState.instances, stateModelDef, liveInstances));
+ } else {
+ partitionAssignments.put(partitionName, HelixUtil
+ .computeIdealMapping(preferenceLists.get(partitionName), stateModelDef,
+ liveInstances));
+ }
+ }
+ instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
+ }
+ }
+
+ if (!wagedResourceIdealState.isEmpty()) {
+ computeWagedAssignmentResult(wagedResourceIdealState, inputFields, clusterState, clusterId,
+ result);
+ }
+
+ return result;
+ }
+
+ private void computeWagedAssignmentResult(List<IdealState> wagedResourceIdealState,
+ InputFields inputFields, ClusterState clusterState, String clusterId,
+ AssignmentResult result) {
+
+ // Use getTargetAssignmentForWagedFullAuto for Waged resources.
+ ConfigAccessor cfgAccessor = getConfigAccessor();
+ List<ResourceConfig> wagedResourceConfigs = new ArrayList<>();
+ for (IdealState idealState : wagedResourceIdealState) {
+ wagedResourceConfigs
+ .add(cfgAccessor.getResourceConfig(clusterId, idealState.getResourceName()));
+ }
+
+ Map<String, ResourceAssignment> wagedAssignmentResult;
+ wagedAssignmentResult = HelixUtil.getTargetAssignmentForWagedFullAuto(getZkBucketDataAccessor(),
+ new ZkBaseDataAccessor<>(getRealmAwareZkClient()), clusterState.clusterConfig,
+ clusterState.instanceConfigs, clusterState.instances, wagedResourceIdealState,
+ wagedResourceConfigs);
+
+ // Convert ResourceAssignment to plain map.
+ for (Map.Entry<String, ResourceAssignment> wagedAssignment : wagedAssignmentResult.entrySet()) {
+ String resource = wagedAssignment.getKey();
+ if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) {
+ continue;
+ }
+ Map<String, Map<String, String>> partitionAssignments = new TreeMap<>();
+ wagedAssignment.getValue().getMappedPartitions().forEach(partition -> partitionAssignments
+ .put(partition.getPartitionName(), wagedAssignment.getValue().getReplicaMap(partition)));
+ instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
+ }
+ }
+
+ private void instanceSwapAndFilter(InputFields inputFields,
+ Map<String, Map<String, String>> partitionAssignments, String resource,
+ AssignmentResult result) {
+
+ if (!inputFields.nodeSwap.isEmpty() || !inputFields.instanceFilter.isEmpty()) {
+ for (Iterator<Map.Entry<String, Map<String, String>>> partitionAssignmentIt =
+ partitionAssignments.entrySet().iterator(); partitionAssignmentIt.hasNext(); ) {
+ Map.Entry<String, Map<String, String>> partitionAssignment = partitionAssignmentIt.next();
+ Map<String, String> instanceStates = partitionAssignment.getValue();
+ Map<String, String> tempInstanceState = new HashMap<>();
+ // Add new pairs to tempInstanceState
+ instanceStates.entrySet().stream()
+ .filter(entry -> inputFields.nodeSwap.containsKey(entry.getKey())).forEach(
+ entry -> tempInstanceState
+ .put(inputFields.nodeSwap.get(entry.getKey()), entry.getValue()));
+ instanceStates.putAll(tempInstanceState);
+ // Only keep instance in instanceFilter
+ instanceStates.entrySet().removeIf(e ->
+ (!inputFields.instanceFilter.isEmpty() && !inputFields.instanceFilter
+ .contains(e.getKey())) || inputFields.nodeSwap.containsKey(e.getKey()));
+ if (instanceStates.isEmpty()) {
+ partitionAssignmentIt.remove();
+ }
+ }
+ }
+ result.put(resource, partitionAssignments);
+ }
+}
\ No newline at end of file
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index a64b37c..cf3a4bf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -316,7 +316,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
}
protected void setupHelixResources() {
- _clusters = createClusters(3);
+ _clusters = createClusters(4);
_gSetupTool.addCluster(_superCluster, true);
_gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
_clusters.add(_superCluster);
@@ -524,6 +524,11 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
protected void post(String uri, Map<String, String> queryParams, Entity entity,
int expectedReturnStatus) {
+ post(uri, queryParams, entity,expectedReturnStatus, false);
+ }
+
+ protected String post(String uri, Map<String, String> queryParams, Entity entity,
+ int expectedReturnStatus, boolean expectBodyReturned) {
WebTarget webTarget = target(uri);
if (queryParams != null) {
for (Map.Entry<String, String> entry : queryParams.entrySet()) {
@@ -531,7 +536,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
}
}
Response response = webTarget.request().post(entity);
+ String result = response.readEntity(String.class);
Assert.assertEquals(response.getStatus(), expectedReturnStatus);
+ return result;
}
protected void delete(String uri, int expectedReturnStatus) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index b4ec2a0..d0593e4 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -34,7 +34,6 @@ import javax.ws.rs.core.Response;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.sun.research.ws.wadl.HTTPMethods;
import org.apache.helix.ConfigAccessor;
@@ -43,8 +42,6 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.cloud.azure.AzureConstants;
import org.apache.helix.cloud.constants.CloudProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterDistributedController;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
new file mode 100644
index 0000000..ad1fda2
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -0,0 +1,211 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestResourceAssignmentOptimizerAccessor extends AbstractTestClass {
+
+ String cluster = "TestCluster_3";
+ String instance1 = cluster + "dummyInstance_localhost_12930";
+ String swapNewInstance = "swapNewInstance";
+ String urlBase = "clusters/TestCluster_3/partitionAssignment/";
+ String swapOldInstance, toRemoveInstance;
+ HelixDataAccessor helixDataAccessor;
+ List<String> resources;
+ List<String> liveInstances;
+
+ @BeforeClass
+ public void beforeClass() {
+ helixDataAccessor = new ZKHelixDataAccessor(cluster, _baseAccessor);
+ _gSetupTool.addInstanceToCluster(cluster, instance1);
+ resources = _gSetupTool.getClusterManagementTool().getResourcesInCluster(cluster);
+ liveInstances = helixDataAccessor.getChildNames(helixDataAccessor.keyBuilder().liveInstances());
+ Assert.assertFalse(resources.isEmpty() || liveInstances.isEmpty());
+
+ toRemoveInstance = liveInstances.get(0);
+ swapOldInstance = liveInstances.get(1);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ for (String resource : resources) {
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
+ }
+ }
+
+ @Test
+ public void testComputePartitionAssignment() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // set all resource to FULL_AUTO except one
+ for (int i = 0; i < resources.size() - 1; ++i) {
+ String resource = resources.get(i);
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+ idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
+ }
+
+ // Test AddInstances, RemoveInstances and SwapInstances
+ String payload = "{\"InstanceChange\" : { \"AddInstances\" : [\"" + instance1
+ + "\"], \"RemoveInstances\" : [ \"" + toRemoveInstance + "\"], \"SwapInstances\" : {\""
+ + swapOldInstance + "\" : \"" + swapNewInstance + "\"} }} ";
+ String body = post(urlBase, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments = OBJECT_MAPPER
+ .readValue(body, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+ });
+ Set<String> hostSet = new HashSet<>();
+ resourceAssignments.forEach((k, v) -> v.forEach((kk, vv) -> hostSet.addAll(vv.keySet())));
+ Assert.assertTrue(hostSet.contains(instance1));
+ Assert.assertTrue(hostSet.contains(swapNewInstance));
+ Assert.assertFalse(hostSet.contains(liveInstances.get(0)));
+ Assert.assertFalse(hostSet.contains(liveInstances.get(1)));
+
+ // Test partitionAssignment host filter
+ String payload2 = "{\"Options\" : { \"InstanceFilter\" : [\"" + liveInstances.get(0) + "\" , \""
+ + liveInstances.get(1) + "\"] }} ";
+ String body2 = post(urlBase, null, Entity.entity(payload2, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments2 = OBJECT_MAPPER
+ .readValue(body2, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+ });
+ Set<String> hostSet2 = new HashSet<>();
+ resourceAssignments2.forEach((k, v) -> v.forEach((kk, vv) -> hostSet2.addAll(vv.keySet())));
+ Assert.assertEquals(hostSet2.size(), 2);
+ Assert.assertTrue(hostSet2.contains(liveInstances.get(0)));
+ Assert.assertTrue(hostSet2.contains(liveInstances.get(1)));
+
+ String payload3 =
+ "{\"Options\" : { \"ResourceFilter\" : [\"" + resources.get(0) + "\" , \"" + resources
+ .get(1) + "\"] }} ";
+ String body3 = post(urlBase, null, Entity.entity(payload3, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments3 = OBJECT_MAPPER
+ .readValue(body3, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+ });
+ Assert.assertEquals(resourceAssignments3.size(), 2);
+ Assert.assertTrue(resourceAssignments3.containsKey(resources.get(0)));
+ Assert.assertTrue(resourceAssignments3.containsKey(resources.get(1)));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testComputePartitionAssignment")
+ public void testComputePartitionAssignmentWaged() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // Use Waged for following tests
+ for (String resource : resources) {
+ IdealState idealState =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+ idealState
+ .setRebalancerClassName("org.apache.helix.controller.rebalancer.waged.WagedRebalancer");
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
+ }
+
+ // Test AddInstances, RemoveInstances and SwapInstances
+ String payload = "{\"InstanceChange\" : { \"AddInstances\" : [\"" + instance1
+ + "\"], \"RemoveInstances\" : [ \"" + toRemoveInstance + "\"], \"SwapInstances\" : {\""
+ + swapOldInstance + "\" : \"" + swapNewInstance + "\"} }} ";
+ String body = post(urlBase, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments = OBJECT_MAPPER
+ .readValue(body, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+ });
+ Set<String> hostSet = new HashSet<>();
+ resourceAssignments.forEach((k, v) -> v.forEach((kk, vv) -> hostSet.addAll(vv.keySet())));
+ Assert.assertTrue(hostSet.contains(instance1));
+ Assert.assertTrue(hostSet.contains(swapNewInstance));
+ Assert.assertFalse(hostSet.contains(liveInstances.get(0)));
+ Assert.assertFalse(hostSet.contains(liveInstances.get(1)));
+
+ // Test partitionAssignment host filter
+ String payload2 = "{\"Options\" : { \"InstanceFilter\" : [\"" + liveInstances.get(0) + "\" , \""
+ + liveInstances.get(1) + "\"] }} ";
+ String body2 = post(urlBase, null, Entity.entity(payload2, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments2 = OBJECT_MAPPER
+ .readValue(body2, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+ });
+ Set<String> hostSet2 = new HashSet<>();
+ resourceAssignments2.forEach((k, v) -> v.forEach((kk, vv) -> hostSet2.addAll(vv.keySet())));
+ Assert.assertEquals(hostSet2.size(), 2);
+ Assert.assertTrue(hostSet2.contains(liveInstances.get(0)));
+ Assert.assertTrue(hostSet2.contains(liveInstances.get(1)));
+
+ String payload3 =
+ "{\"Options\" : { \"ResourceFilter\" : [\"" + resources.get(0) + "\" , \"" + resources
+ .get(1) + "\"] }} ";
+ String body3 = post(urlBase, null, Entity.entity(payload3, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments3 = OBJECT_MAPPER
+ .readValue(body3, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+ });
+ Assert.assertEquals(resourceAssignments3.size(), 2);
+ Assert.assertTrue(resourceAssignments3.containsKey(resources.get(0)));
+ Assert.assertTrue(resourceAssignments3.containsKey(resources.get(1)));
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testComputePartitionAssignmentWaged")
+ public void testComputePartitionAssignmentNegativeInput() throws IOException {
+ System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+ // Test negative input
+ String payload4 = "{\"InstanceChange\" : { \"AddInstances\" : [\" nonExistInstanceName \"] }} ";
+ post(urlBase, null, Entity.entity(payload4, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.BAD_REQUEST.getStatusCode(), true);
+
+ String payload5 =
+ "{\"InstanceChange\" : { \"RemoveInstances\" : [\" nonExistInstanceName \"] }} ";
+ post(urlBase, null, Entity.entity(payload5, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.BAD_REQUEST.getStatusCode(), true);
+
+ String payload6 =
+ "{\"InstanceChange\" : { \"SwapInstances\" : {\" nonExistInstanceName \" : \" swapNewInstance \"} }} ";
+ post(urlBase, null, Entity.entity(payload6, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.BAD_REQUEST.getStatusCode(), true);
+
+ System.out.println("End test :" + TestHelper.getTestMethodName());
+ }
+}
\ No newline at end of file