You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/04 23:25:39 UTC

[helix] branch master updated: New REST api partitionAssignment -- return potential assignment given cluster change (#1747)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fa83198  New REST api partitionAssignment -- return potential assignment given cluster change (#1747)
fa83198 is described below

commit fa83198a0b64e6056a35b19079525df1c6924f6e
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jun 4 16:25:32 2021 -0700

    New REST api partitionAssignment -- return potential assignment given cluster change (#1747)
    
    New REST api partitionAssignment -- return potential assignment given cluster change
---
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  |   5 +
 .../apache/helix/rest/server/ServerContext.java    |  19 ++
 .../resources/helix/AbstractHelixResource.java     |   5 +
 .../helix/ResourceAssignmentOptimizerAccessor.java | 341 +++++++++++++++++++++
 .../helix/rest/server/AbstractTestClass.java       |   9 +-
 .../helix/rest/server/TestClusterAccessor.java     |   3 -
 .../TestResourceAssignmentOptimizerAccessor.java   | 211 +++++++++++++
 7 files changed, 589 insertions(+), 4 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index e94148e..d1075d4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -88,5 +88,10 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
       _bestPossibleAssignment = bestPossibleAssignment;
       return true;
     }
+
+    @Override
+    // BucketDataAccessor will be reused, won't be closed here.
+    public void close() {
+    }
   }
 }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 2549fb7..a1cfb66 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -33,6 +33,7 @@ import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.rest.metadatastore.ZkMetadataStoreDirectory;
 import org.apache.helix.task.TaskDriver;
@@ -76,6 +77,8 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
   private final Map<String, HelixDataAccessor> _helixDataAccessorPool;
   // 1 Cluster name will correspond to 1 task driver
   private final Map<String, TaskDriver> _taskDriverPool;
+  // Create ZkBucketDataAccessor for ReadOnlyWagedRebalancer.
+  private volatile ZkBucketDataAccessor _zkBucketDataAccessor;
 
   /**
    * Multi-ZK support
@@ -277,6 +280,18 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
     return _byteArrayZkBaseDataAccessor;
   }
 
+  public ZkBucketDataAccessor getZkBucketDataAccessor() {
+    // ZkBucketDataAccessor constructor will handle realmZK case (when _zkAddr is null)
+    if (_zkBucketDataAccessor == null) {
+      synchronized (this) {
+        if (_zkBucketDataAccessor == null) {
+          _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);
+        }
+      }
+    }
+    return _zkBucketDataAccessor;
+  }
+
   public void close() {
     if (_zkClient != null) {
       _zkClient.close();
@@ -393,6 +408,10 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
           _byteArrayZkBaseDataAccessor.close();
           _byteArrayZkBaseDataAccessor = null;
         }
+        if (_zkBucketDataAccessor != null) {
+          _zkBucketDataAccessor.close();
+          _zkBucketDataAccessor = null;
+        }
         _helixDataAccessorPool.clear();
         _taskDriverPool.clear();
       } catch (Exception e) {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
index 5bd8f13..a74e3cb 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/AbstractHelixResource.java
@@ -25,6 +25,7 @@ import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.rest.common.ContextPropertyKeys;
 import org.apache.helix.rest.server.ServerContext;
 import org.apache.helix.rest.server.resources.AbstractResource;
@@ -81,6 +82,10 @@ public class AbstractHelixResource extends AbstractResource {
     return getServerContext().getByteArrayZkBaseDataAccessor();
   }
 
+  protected ZkBucketDataAccessor getZkBucketDataAccessor() {
+    return getServerContext().getZkBucketDataAccessor();
+  }
+
   protected static ZNRecord toZNRecord(String data)
       throws IOException {
     return ZNRECORD_READER.readValue(data);
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
new file mode 100644
index 0000000..f6b9b82
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -0,0 +1,341 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Path("/clusters/{clusterId}/partitionAssignment")
+public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
+  private static Logger LOG = LoggerFactory.getLogger(
+      org.apache.helix.rest.server.resources.helix.ResourceAssignmentOptimizerAccessor.class
+          .getName());
+
+  private static class InputFields {
+    List<String> newInstances = new ArrayList<>();
+    List<String> instancesToRemove = new ArrayList<>();
+    Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
+    Set<String> instanceFilter = new HashSet<>();
+    Set<String> resourceFilter = new HashSet<>();
+  }
+
+  // TODO: We could add a data cache here to avoid read latency.
+  private static class ClusterState {
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    ClusterConfig clusterConfig;
+    List<String> resources = new ArrayList<>();
+    List<String> instances;        // cluster LiveInstance + addInstances - instancesToRemove.
+  }
+
+  // Result format: Map of resource -> partition -> instance -> state.
+  private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> {
+    public AssignmentResult() {
+      super();
+    }
+  }
+
+  private static class InputJsonContent {
+    @JsonProperty("InstanceChange")
+    InstanceChangeMap instanceChangeMap;
+    @JsonProperty("Options")
+    OptionsMap optionsMap;
+  }
+
+  private static class InstanceChangeMap {
+    @JsonProperty("AddInstances")
+    List<String> addInstances;
+    @JsonProperty("RemoveInstances")
+    List<String> removeInstances;
+    @JsonProperty("SwapInstances")
+    Map<String, String> swapInstances;
+  }
+
+  private static class OptionsMap {
+    @JsonProperty("InstanceFilter")
+    Set<String> instanceFilter;
+    @JsonProperty("ResourceFilter")
+    Set<String> resourceFilter;
+  }
+
+  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+  @Timed(name = HttpConstants.WRITE_REQUEST)
+  @POST
+  public Response computePotentialAssignment(@PathParam("clusterId") String clusterId,
+      String content) {
+
+    InputFields inputFields;
+    ClusterState clusterState;
+    AssignmentResult result;
+
+    try {
+      // 1.  Try to parse the content string. If parseable, use it as a KV map. Otherwise, return a REASON String
+      inputFields = readInput(content);
+      // 2. Read cluster status from ZK.
+      clusterState = readClusterStateAndValidateInput(clusterId, inputFields);
+      // 3. Call rebalancer tools for each resource.
+      result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId);
+      // 4. Serialize result to JSON and return.
+      return JSONRepresentation(result);
+    } catch (InvalidParameterException ex) {
+      return badRequest(ex.getMessage());
+    } catch (JsonProcessingException e) {
+      return badRequest("Invalid input: Input can not be parsed into a KV map." + e.getMessage());
+    } catch (OutOfMemoryError e) {
+      LOG.error("OutOfMemoryError while calling partitionAssignment" + Arrays
+          .toString(e.getStackTrace()));
+      return badRequest(
+          "Response size is too large to serialize. Please query by resources or instance filter");
+    } catch (Exception e) {
+      LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace()));
+      return badRequest("Failed to compute partition assignment: " + e);
+    }
+  }
+
+  private InputFields readInput(String content)
+      throws InvalidParameterException, JsonProcessingException {
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    InputJsonContent inputJsonContent = objectMapper.readValue(content, InputJsonContent.class);
+    InputFields inputFields = new InputFields();
+
+    if (inputJsonContent.instanceChangeMap != null) {
+      Optional.ofNullable(inputJsonContent.instanceChangeMap.addInstances)
+          .ifPresent(inputFields.newInstances::addAll);
+      Optional.ofNullable(inputJsonContent.instanceChangeMap.removeInstances)
+          .ifPresent(inputFields.instancesToRemove::addAll);
+      Optional.ofNullable(inputJsonContent.instanceChangeMap.swapInstances)
+          .ifPresent(inputFields.nodeSwap::putAll);
+    }
+    if (inputJsonContent.optionsMap != null) {
+      Optional.ofNullable(inputJsonContent.optionsMap.resourceFilter)
+          .ifPresent(inputFields.resourceFilter::addAll);
+      Optional.ofNullable(inputJsonContent.optionsMap.instanceFilter)
+          .ifPresent(inputFields.instanceFilter::addAll);
+    }
+
+    return inputFields;
+  }
+
+  private ClusterState readClusterStateAndValidateInput(String clusterId, InputFields inputFields)
+      throws InvalidParameterException {
+
+    ClusterState clusterState = new ClusterState();
+    ConfigAccessor cfgAccessor = getConfigAccessor();
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    clusterState.resources = dataAccessor.getChildNames(dataAccessor.keyBuilder().idealStates());
+    // Add existing live instances and new instances from user input to instances list.
+    clusterState.instances = dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+    clusterState.instances.addAll(inputFields.newInstances);
+
+    // Check if to be removed instances and old instances in swap node exist in live instance.
+    if (!inputFields.nodeSwap.isEmpty() || !inputFields.instancesToRemove.isEmpty()) {
+      Set<String> liveInstanceSet = new HashSet<>(clusterState.instances);
+      for (Map.Entry<String, String> nodeSwapPair : inputFields.nodeSwap.entrySet()) {
+        if (!liveInstanceSet.contains(nodeSwapPair.getKey())) {
+          throw new InvalidParameterException("Invalid input: instance [" + nodeSwapPair.getKey()
+              + "] in SwapInstances does not exist in cluster.");
+        }
+      }
+      for (String instanceToRemove : inputFields.instancesToRemove) {
+        if (!liveInstanceSet.contains(instanceToRemove)) {
+          throw new InvalidParameterException("Invalid input: instance [" + instanceToRemove
+              + "] in RemoveInstances does not exist in cluster.");
+        }
+      }
+      if (!inputFields.instancesToRemove.isEmpty()) {
+        clusterState.instances.removeIf(inputFields.instancesToRemove::contains);
+      }
+    }
+
+    // Read instance and cluster config.
+    // It will throw exception if there is no instanceConfig for newly added instance.
+    for (String instance : clusterState.instances) {
+      InstanceConfig config = cfgAccessor.getInstanceConfig(clusterId, instance);
+      clusterState.instanceConfigs.add(config);
+    }
+    clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId);
+    return clusterState;
+  }
+
+  private AssignmentResult computeOptimalAssignmentForResources(InputFields inputFields,
+      ClusterState clusterState, String clusterId) throws Exception {
+
+    AssignmentResult result = new AssignmentResult();
+    // Iterate through resources, read resource level info and get potential assignment.
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    List<IdealState> wagedResourceIdealState = new ArrayList<>();
+
+    for (String resource : clusterState.resources) {
+      IdealState idealState =
+          dataAccessor.getProperty(dataAccessor.keyBuilder().idealStates(resource));
+      // Compute all Waged resources in a batch later.
+      if (idealState.getRebalancerClassName() != null && idealState.getRebalancerClassName()
+          .equals(WagedRebalancer.class.getName())) {
+        wagedResourceIdealState.add(idealState);
+        continue;
+      }
+      // For non Waged resources, we don't compute resources not in white list.
+      if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) {
+        continue;
+      }
+      // Use getIdealAssignmentForFullAuto for FULL_AUTO resource.
+      Map<String, Map<String, String>> partitionAssignments;
+      if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) {
+        String rebalanceStrategy = idealState.getRebalanceStrategy();
+        if (rebalanceStrategy == null || rebalanceStrategy
+            .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
+          rebalanceStrategy = AutoRebalanceStrategy.class.getName();
+        }
+        partitionAssignments = new TreeMap<>(HelixUtil
+            .getIdealAssignmentForFullAuto(clusterState.clusterConfig, clusterState.instanceConfigs,
+                clusterState.instances, idealState, new ArrayList<>(idealState.getPartitionSet()),
+                rebalanceStrategy));
+        instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
+      } else if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) {
+        // Use computeIdealMapping for SEMI_AUTO resource.
+        Map<String, List<String>> preferenceLists = idealState.getPreferenceLists();
+        partitionAssignments = new TreeMap<>();
+        HashSet<String> liveInstances = new HashSet<>(clusterState.instances);
+        List<String> disabledInstance =
+            clusterState.instanceConfigs.stream().filter(enabled -> !enabled.getInstanceEnabled())
+                .map(InstanceConfig::getInstanceName).collect(Collectors.toList());
+        liveInstances.removeAll(disabledInstance);
+        StateModelDefinition stateModelDef = dataAccessor
+            .getProperty(dataAccessor.keyBuilder().stateModelDef(idealState.getStateModelDefRef()));
+        for (String partitionName : preferenceLists.keySet()) {
+          if (!preferenceLists.get(partitionName).isEmpty() && preferenceLists.get(partitionName)
+              .get(0)
+              .equalsIgnoreCase(ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.name())) {
+            partitionAssignments.put(partitionName, HelixUtil
+                .computeIdealMapping(clusterState.instances, stateModelDef, liveInstances));
+          } else {
+            partitionAssignments.put(partitionName, HelixUtil
+                .computeIdealMapping(preferenceLists.get(partitionName), stateModelDef,
+                    liveInstances));
+          }
+        }
+        instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
+      }
+    }
+
+    if (!wagedResourceIdealState.isEmpty()) {
+      computeWagedAssignmentResult(wagedResourceIdealState, inputFields, clusterState, clusterId,
+          result);
+    }
+
+    return result;
+  }
+
+  private void computeWagedAssignmentResult(List<IdealState> wagedResourceIdealState,
+      InputFields inputFields, ClusterState clusterState, String clusterId,
+      AssignmentResult result) {
+
+    // Use getTargetAssignmentForWagedFullAuto for Waged resources.
+    ConfigAccessor cfgAccessor = getConfigAccessor();
+    List<ResourceConfig> wagedResourceConfigs = new ArrayList<>();
+    for (IdealState idealState : wagedResourceIdealState) {
+      wagedResourceConfigs
+          .add(cfgAccessor.getResourceConfig(clusterId, idealState.getResourceName()));
+    }
+
+    Map<String, ResourceAssignment> wagedAssignmentResult;
+    wagedAssignmentResult = HelixUtil.getTargetAssignmentForWagedFullAuto(getZkBucketDataAccessor(),
+        new ZkBaseDataAccessor<>(getRealmAwareZkClient()), clusterState.clusterConfig,
+        clusterState.instanceConfigs, clusterState.instances, wagedResourceIdealState,
+        wagedResourceConfigs);
+
+    // Convert ResourceAssignment to plain map.
+    for (Map.Entry<String, ResourceAssignment> wagedAssignment : wagedAssignmentResult.entrySet()) {
+      String resource = wagedAssignment.getKey();
+      if (!inputFields.resourceFilter.isEmpty() && !inputFields.resourceFilter.contains(resource)) {
+        continue;
+      }
+      Map<String, Map<String, String>> partitionAssignments = new TreeMap<>();
+      wagedAssignment.getValue().getMappedPartitions().forEach(partition -> partitionAssignments
+          .put(partition.getPartitionName(), wagedAssignment.getValue().getReplicaMap(partition)));
+      instanceSwapAndFilter(inputFields, partitionAssignments, resource, result);
+    }
+  }
+
+  private void instanceSwapAndFilter(InputFields inputFields,
+      Map<String, Map<String, String>> partitionAssignments, String resource,
+      AssignmentResult result) {
+
+    if (!inputFields.nodeSwap.isEmpty() || !inputFields.instanceFilter.isEmpty()) {
+      for (Iterator<Map.Entry<String, Map<String, String>>> partitionAssignmentIt =
+          partitionAssignments.entrySet().iterator(); partitionAssignmentIt.hasNext(); ) {
+        Map.Entry<String, Map<String, String>> partitionAssignment = partitionAssignmentIt.next();
+        Map<String, String> instanceStates = partitionAssignment.getValue();
+        Map<String, String> tempInstanceState = new HashMap<>();
+        // Add new pairs to tempInstanceState
+        instanceStates.entrySet().stream()
+            .filter(entry -> inputFields.nodeSwap.containsKey(entry.getKey())).forEach(
+            entry -> tempInstanceState
+                .put(inputFields.nodeSwap.get(entry.getKey()), entry.getValue()));
+        instanceStates.putAll(tempInstanceState);
+        // Only keep instance in instanceFilter
+        instanceStates.entrySet().removeIf(e ->
+            (!inputFields.instanceFilter.isEmpty() && !inputFields.instanceFilter
+                .contains(e.getKey())) || inputFields.nodeSwap.containsKey(e.getKey()));
+        if (instanceStates.isEmpty()) {
+          partitionAssignmentIt.remove();
+        }
+      }
+    }
+    result.put(resource, partitionAssignments);
+  }
+}
\ No newline at end of file
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index a64b37c..cf3a4bf 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -316,7 +316,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
   }
 
   protected void setupHelixResources() {
-    _clusters = createClusters(3);
+    _clusters = createClusters(4);
     _gSetupTool.addCluster(_superCluster, true);
     _gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
     _clusters.add(_superCluster);
@@ -524,6 +524,11 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
 
   protected void post(String uri, Map<String, String> queryParams, Entity entity,
       int expectedReturnStatus) {
+    post(uri, queryParams, entity,expectedReturnStatus, false);
+  }
+
+  protected String post(String uri, Map<String, String> queryParams, Entity entity,
+      int expectedReturnStatus, boolean  expectBodyReturned) {
     WebTarget webTarget = target(uri);
     if (queryParams != null) {
       for (Map.Entry<String, String> entry : queryParams.entrySet()) {
@@ -531,7 +536,9 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
       }
     }
     Response response = webTarget.request().post(entity);
+    String result = response.readEntity(String.class);
     Assert.assertEquals(response.getStatus(), expectedReturnStatus);
+    return result;
   }
 
   protected void delete(String uri, int expectedReturnStatus) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index b4ec2a0..d0593e4 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -34,7 +34,6 @@ import javax.ws.rs.core.Response;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.sun.research.ws.wadl.HTTPMethods;
 import org.apache.helix.ConfigAccessor;
@@ -43,8 +42,6 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.cloud.azure.AzureConstants;
 import org.apache.helix.cloud.constants.CloudProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
new file mode 100644
index 0000000..ad1fda2
--- /dev/null
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -0,0 +1,211 @@
+package org.apache.helix.rest.server;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.IdealState;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestResourceAssignmentOptimizerAccessor extends AbstractTestClass {
+
+  String cluster = "TestCluster_3";
+  String instance1 = cluster + "dummyInstance_localhost_12930";
+  String swapNewInstance = "swapNewInstance";
+  String urlBase = "clusters/TestCluster_3/partitionAssignment/";
+  String swapOldInstance, toRemoveInstance;
+  HelixDataAccessor helixDataAccessor;
+  List<String> resources;
+  List<String> liveInstances;
+
+  @BeforeClass
+  public void beforeClass() {
+    helixDataAccessor = new ZKHelixDataAccessor(cluster, _baseAccessor);
+    _gSetupTool.addInstanceToCluster(cluster, instance1);
+    resources = _gSetupTool.getClusterManagementTool().getResourcesInCluster(cluster);
+    liveInstances =  helixDataAccessor.getChildNames(helixDataAccessor.keyBuilder().liveInstances());
+    Assert.assertFalse(resources.isEmpty() || liveInstances.isEmpty());
+
+    toRemoveInstance = liveInstances.get(0);
+    swapOldInstance = liveInstances.get(1);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    for (String resource : resources) {
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+      idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+      _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
+    }
+  }
+
+  @Test
+  public void testComputePartitionAssignment() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // set all resource to FULL_AUTO except one
+    for (int i = 0; i < resources.size() - 1; ++i) {
+      String resource = resources.get(i);
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+      idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
+    }
+
+    // Test AddInstances, RemoveInstances and SwapInstances
+    String payload = "{\"InstanceChange\" : { \"AddInstances\" : [\"" + instance1
+        + "\"], \"RemoveInstances\" : [ \"" + toRemoveInstance + "\"], \"SwapInstances\" : {\""
+        + swapOldInstance + "\" : \"" + swapNewInstance + "\"} }}  ";
+    String body = post(urlBase, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments = OBJECT_MAPPER
+        .readValue(body, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+        });
+    Set<String> hostSet = new HashSet<>();
+    resourceAssignments.forEach((k, v) -> v.forEach((kk, vv) -> hostSet.addAll(vv.keySet())));
+    Assert.assertTrue(hostSet.contains(instance1));
+    Assert.assertTrue(hostSet.contains(swapNewInstance));
+    Assert.assertFalse(hostSet.contains(liveInstances.get(0)));
+    Assert.assertFalse(hostSet.contains(liveInstances.get(1)));
+
+    // Test partitionAssignment host filter
+    String payload2 = "{\"Options\" : { \"InstanceFilter\" : [\"" + liveInstances.get(0) + "\" , \""
+        + liveInstances.get(1) + "\"] }}  ";
+    String body2 = post(urlBase, null, Entity.entity(payload2, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments2 = OBJECT_MAPPER
+        .readValue(body2, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+        });
+    Set<String> hostSet2 = new HashSet<>();
+    resourceAssignments2.forEach((k, v) -> v.forEach((kk, vv) -> hostSet2.addAll(vv.keySet())));
+    Assert.assertEquals(hostSet2.size(), 2);
+    Assert.assertTrue(hostSet2.contains(liveInstances.get(0)));
+    Assert.assertTrue(hostSet2.contains(liveInstances.get(1)));
+
+    String payload3 =
+        "{\"Options\" : { \"ResourceFilter\" : [\"" + resources.get(0) + "\" , \"" + resources
+            .get(1) + "\"] }}  ";
+    String body3 = post(urlBase, null, Entity.entity(payload3, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments3 = OBJECT_MAPPER
+        .readValue(body3, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+        });
+    Assert.assertEquals(resourceAssignments3.size(), 2);
+    Assert.assertTrue(resourceAssignments3.containsKey(resources.get(0)));
+    Assert.assertTrue(resourceAssignments3.containsKey(resources.get(1)));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testComputePartitionAssignment")
+  public void testComputePartitionAssignmentWaged() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // Use Waged for following tests
+    for (String resource : resources) {
+      IdealState idealState =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
+      idealState
+          .setRebalancerClassName("org.apache.helix.controller.rebalancer.waged.WagedRebalancer");
+      _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
+    }
+
+    // Test AddInstances, RemoveInstances and SwapInstances
+    String payload = "{\"InstanceChange\" : { \"AddInstances\" : [\"" + instance1
+        + "\"], \"RemoveInstances\" : [ \"" + toRemoveInstance + "\"], \"SwapInstances\" : {\""
+        + swapOldInstance + "\" : \"" + swapNewInstance + "\"} }}  ";
+    String body = post(urlBase, null, Entity.entity(payload, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments = OBJECT_MAPPER
+        .readValue(body, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+        });
+    Set<String> hostSet = new HashSet<>();
+    resourceAssignments.forEach((k, v) -> v.forEach((kk, vv) -> hostSet.addAll(vv.keySet())));
+    Assert.assertTrue(hostSet.contains(instance1));
+    Assert.assertTrue(hostSet.contains(swapNewInstance));
+    Assert.assertFalse(hostSet.contains(liveInstances.get(0)));
+    Assert.assertFalse(hostSet.contains(liveInstances.get(1)));
+
+    // Test partitionAssignment host filter
+    String payload2 = "{\"Options\" : { \"InstanceFilter\" : [\"" + liveInstances.get(0) + "\" , \""
+        + liveInstances.get(1) + "\"] }}  ";
+    String body2 = post(urlBase, null, Entity.entity(payload2, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments2 = OBJECT_MAPPER
+        .readValue(body2, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+        });
+    Set<String> hostSet2 = new HashSet<>();
+    resourceAssignments2.forEach((k, v) -> v.forEach((kk, vv) -> hostSet2.addAll(vv.keySet())));
+    Assert.assertEquals(hostSet2.size(), 2);
+    Assert.assertTrue(hostSet2.contains(liveInstances.get(0)));
+    Assert.assertTrue(hostSet2.contains(liveInstances.get(1)));
+
+    String payload3 =
+        "{\"Options\" : { \"ResourceFilter\" : [\"" + resources.get(0) + "\" , \"" + resources
+            .get(1) + "\"] }}  ";
+    String body3 = post(urlBase, null, Entity.entity(payload3, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments3 = OBJECT_MAPPER
+        .readValue(body3, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+        });
+    Assert.assertEquals(resourceAssignments3.size(), 2);
+    Assert.assertTrue(resourceAssignments3.containsKey(resources.get(0)));
+    Assert.assertTrue(resourceAssignments3.containsKey(resources.get(1)));
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+
+  @Test(dependsOnMethods = "testComputePartitionAssignmentWaged")
+  public void testComputePartitionAssignmentNegativeInput() throws IOException {
+    System.out.println("Start test :" + TestHelper.getTestMethodName());
+
+    // Test negative input
+    String payload4 = "{\"InstanceChange\" : { \"AddInstances\" : [\" nonExistInstanceName \"] }} ";
+    post(urlBase, null, Entity.entity(payload4, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.BAD_REQUEST.getStatusCode(), true);
+
+    String payload5 =
+        "{\"InstanceChange\" : { \"RemoveInstances\" : [\" nonExistInstanceName \"] }} ";
+    post(urlBase, null, Entity.entity(payload5, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.BAD_REQUEST.getStatusCode(), true);
+
+    String payload6 =
+        "{\"InstanceChange\" : { \"SwapInstances\" : {\" nonExistInstanceName \" : \" swapNewInstance \"} }} ";
+    post(urlBase, null, Entity.entity(payload6, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.BAD_REQUEST.getStatusCode(), true);
+
+    System.out.println("End test :" + TestHelper.getTestMethodName());
+  }
+}
\ No newline at end of file