You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/06/02 07:09:16 UTC

[GitHub] [helix] junkaixue commented on a change in pull request #1747: New REST api partitionAssignment -- return potential assignment given cluster change

junkaixue commented on a change in pull request #1747:
URL: https://github.com/apache/helix/pull/1747#discussion_r643707579



##########
File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
##########
@@ -0,0 +1,354 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Path("/clusters/{clusterId}/partitionAssignment")
+public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
+  private static Logger LOG = LoggerFactory.getLogger(
+      org.apache.helix.rest.server.resources.helix.ResourceAssignmentOptimizerAccessor.class
+          .getName());
+
+  private static class InputFields {
+    List<String> newInstances = new ArrayList<>();
+    List<String> instancesToRemove = new ArrayList<>();
+    Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
+    Set<String> instanceFilter = new HashSet<>();
+    Set<String> resourceFilter = new HashSet<>();

Review comment:
       Are these cached? If yes, better cached with some objected instead of static fields. Because the data cached is not synchronized between requests. Shared simple data structure could be problematic.

##########
File path: helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
##########
@@ -278,6 +281,18 @@ public HelixDataAccessor getDataAccessor(String clusterName) {
     return _byteArrayZkBaseDataAccessor;
   }
 
+  public ZkBucketDataAccessor getZkBucketDataAccessor() {
+    if (_zkBucketDataAccessor == null) {
+      synchronized (this) {
+        if (_zkBucketDataAccessor == null) {
+          _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);

Review comment:
       +1. I was trying to make this comment, for realm aware client, get a dedicated zk address client is not a good idea.

##########
File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
##########
@@ -0,0 +1,354 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Path("/clusters/{clusterId}/partitionAssignment")
+public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
+  private static Logger LOG = LoggerFactory.getLogger(
+      org.apache.helix.rest.server.resources.helix.ResourceAssignmentOptimizerAccessor.class
+          .getName());
+
+  private static class InputFields {
+    List<String> newInstances = new ArrayList<>();
+    List<String> instancesToRemove = new ArrayList<>();
+    Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
+    Set<String> instanceFilter = new HashSet<>();
+    Set<String> resourceFilter = new HashSet<>();
+  }
+
+  private static class ClusterState {
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    ClusterConfig clusterConfig;
+    List<String> resources = new ArrayList<>();
+    List<String> instances;        // cluster LiveInstance + addInstances - instancesToRemove.
+  }
+
+  // Result format: Map of resource -> partition -> instance -> state.
+  private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> {
+    public AssignmentResult() {
+      super();
+    }
+  }
+
+  public static final String INSTANCE_CHANGE = "InstanceChange";
+  public static final String INSTANCE_CHANGE_ADD_INSTANCES = "AddInstances";
+  public static final String INSTANCE_CHANGE_REMOVE_INSTANCES = "RemoveInstances";
+  public static final String INSTANCE_CHANGE_SWAP_INSTANCES = "SwapInstances";
+  public static final String OPTIONS = "Options";
+  public static final String OPTIONS_INSTANCE_FLT = "InstanceFilter";
+  public static final String OPTIONS_RESOURCE_FLT = "ResourceFilter";
+
+  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+  @Timed(name = HttpConstants.WRITE_REQUEST)
+  @POST
+  public Response computePotentialAssignment(@PathParam("clusterId") String clusterId,
+      String content) {
+
+    InputFields inputFields;
+    ClusterState clusterState;
+    AssignmentResult result;
+
+    try {
+      // 1.  Try to parse the content string. If parseable, use it as a KV map. Otherwise, return a REASON String
+      inputFields = readInput(content);
+      // 2. Read cluster status from ZK.
+      clusterState = readClusterStateAndValidateInput(clusterId, inputFields);
+      // 3. Call rebalancer tools for each resource.
+      result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId);
+      // 4. Serialize result to JSON and return.
+      return JSONRepresentation(result);
+    } catch (InvalidParameterException ex) {
+      return badRequest(ex.getMessage());
+    } catch (JsonProcessingException e) {
+      return badRequest("Invalid input: Input can not be parsed into a KV map." + e.getMessage());
+    } catch (OutOfMemoryError e) {
+      LOG.error("OutOfMemoryError while calling partitionAssignment" + Arrays
+          .toString(e.getStackTrace()));
+      return badRequest(
+          "Response size is too large to serialize. Please query by resources or instance filter");
+    } catch (Exception e) {
+      LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace()));
+      return badRequest("Failed to compute partition assignment: " + e);
+    }
+  }
+
+  private InputFields readInput(String content)
+      throws InvalidParameterException, JsonProcessingException {
+
+    Map<String, Map<String, Object>> customFieldsMap;
+    InputFields inputMap = new InputFields();
+    customFieldsMap =
+        OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, Map<String, Object>>>() {
+        });
+
+    // Content is given as a KV mapping.
+    for (Map.Entry<String, Map<String, Object>> entry : customFieldsMap.entrySet()) {
+      String key = entry.getKey();
+      switch (key) {
+        case INSTANCE_CHANGE:
+          for (Map.Entry<String, Object> instanceChange : entry.getValue().entrySet()) {
+            String instanceChangeKey = instanceChange.getKey();
+            if (instanceChangeKey.equals(INSTANCE_CHANGE_ADD_INSTANCES) && (instanceChange

Review comment:
       Can we have nested switch case for here? That could be easy for us to read.

##########
File path: helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
##########
@@ -0,0 +1,354 @@
+package org.apache.helix.rest.server.resources.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Response;
+
+import com.codahale.metrics.annotation.ResponseMetered;
+import com.codahale.metrics.annotation.Timed;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.rest.common.HttpConstants;
+import org.apache.helix.util.HelixUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Path("/clusters/{clusterId}/partitionAssignment")
+public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
+  private static Logger LOG = LoggerFactory.getLogger(
+      org.apache.helix.rest.server.resources.helix.ResourceAssignmentOptimizerAccessor.class
+          .getName());
+
+  private static class InputFields {
+    List<String> newInstances = new ArrayList<>();
+    List<String> instancesToRemove = new ArrayList<>();
+    Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
+    Set<String> instanceFilter = new HashSet<>();
+    Set<String> resourceFilter = new HashSet<>();
+  }
+
+  private static class ClusterState {
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    ClusterConfig clusterConfig;
+    List<String> resources = new ArrayList<>();
+    List<String> instances;        // cluster LiveInstance + addInstances - instancesToRemove.
+  }
+
+  // Result format: Map of resource -> partition -> instance -> state.
+  private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> {
+    public AssignmentResult() {
+      super();
+    }
+  }
+
+  public static final String INSTANCE_CHANGE = "InstanceChange";
+  public static final String INSTANCE_CHANGE_ADD_INSTANCES = "AddInstances";
+  public static final String INSTANCE_CHANGE_REMOVE_INSTANCES = "RemoveInstances";
+  public static final String INSTANCE_CHANGE_SWAP_INSTANCES = "SwapInstances";
+  public static final String OPTIONS = "Options";
+  public static final String OPTIONS_INSTANCE_FLT = "InstanceFilter";
+  public static final String OPTIONS_RESOURCE_FLT = "ResourceFilter";
+
+  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
+  @Timed(name = HttpConstants.WRITE_REQUEST)
+  @POST
+  public Response computePotentialAssignment(@PathParam("clusterId") String clusterId,
+      String content) {
+
+    InputFields inputFields;
+    ClusterState clusterState;
+    AssignmentResult result;
+
+    try {
+      // 1.  Try to parse the content string. If parseable, use it as a KV map. Otherwise, return a REASON String
+      inputFields = readInput(content);
+      // 2. Read cluster status from ZK.
+      clusterState = readClusterStateAndValidateInput(clusterId, inputFields);
+      // 3. Call rebalancer tools for each resource.
+      result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId);
+      // 4. Serialize result to JSON and return.
+      return JSONRepresentation(result);
+    } catch (InvalidParameterException ex) {
+      return badRequest(ex.getMessage());
+    } catch (JsonProcessingException e) {
+      return badRequest("Invalid input: Input can not be parsed into a KV map." + e.getMessage());
+    } catch (OutOfMemoryError e) {
+      LOG.error("OutOfMemoryError while calling partitionAssignment" + Arrays
+          .toString(e.getStackTrace()));
+      return badRequest(
+          "Response size is too large to serialize. Please query by resources or instance filter");
+    } catch (Exception e) {
+      LOG.error("Failed to compute partition assignment:" + Arrays.toString(e.getStackTrace()));
+      return badRequest("Failed to compute partition assignment: " + e);
+    }
+  }
+
+  private InputFields readInput(String content)
+      throws InvalidParameterException, JsonProcessingException {
+
+    Map<String, Map<String, Object>> customFieldsMap;
+    InputFields inputMap = new InputFields();
+    customFieldsMap =
+        OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, Map<String, Object>>>() {
+        });
+
+    // Content is given as a KV mapping.
+    for (Map.Entry<String, Map<String, Object>> entry : customFieldsMap.entrySet()) {
+      String key = entry.getKey();
+      switch (key) {
+        case INSTANCE_CHANGE:
+          for (Map.Entry<String, Object> instanceChange : entry.getValue().entrySet()) {
+            String instanceChangeKey = instanceChange.getKey();
+            if (instanceChangeKey.equals(INSTANCE_CHANGE_ADD_INSTANCES) && (instanceChange
+                .getValue() instanceof List)) {
+              inputMap.newInstances.addAll((List<String>) instanceChange.getValue());
+            } else if (instanceChangeKey.equals(INSTANCE_CHANGE_REMOVE_INSTANCES) && (instanceChange
+                .getValue() instanceof List)) {
+              inputMap.instancesToRemove.addAll((List<String>) instanceChange.getValue());
+            } else if (instanceChangeKey.equals(INSTANCE_CHANGE_SWAP_INSTANCES) && (instanceChange
+                .getValue() instanceof Map)) {
+              for (Map.Entry<String, String> swapPair : ((Map<String, String>) instanceChange
+                  .getValue()).entrySet()) {
+                inputMap.nodeSwap.put(swapPair.getKey(), swapPair.getValue());
+              }
+            } else {
+              throw new InvalidParameterException(
+                  "Unsupported command or invalid format for InstanceChange : "
+                      + instanceChangeKey);
+            }
+          }
+          break;
+        case OPTIONS:
+          for (Map.Entry<String, Object> option : entry.getValue().entrySet()) {
+            String optionKey = option.getKey();
+            if (optionKey.equals(OPTIONS_RESOURCE_FLT) && (option.getValue() instanceof List)) {
+              inputMap.resourceFilter.addAll((List<String>) option.getValue());
+            } else if (optionKey.equals(OPTIONS_INSTANCE_FLT) && (option.getValue() instanceof List)) {
+              inputMap.instanceFilter.addAll((List<String>) option.getValue());
+            } else {
+              throw new InvalidParameterException(
+                  "Unsupported command or invalid format for Options : " + option);
+            }
+          }
+          break;
+        default:
+          throw new InvalidParameterException(
+              "Unsupported command for partitionAssignment : " + key);
+      }
+    }
+    return inputMap;
+  }
+
+  private ClusterState readClusterStateAndValidateInput(String clusterId, InputFields inputFields)
+      throws InvalidParameterException {
+
+    ClusterState clusterState = new ClusterState();
+    ConfigAccessor cfgAccessor = getConfigAccessor();
+    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
+    clusterState.resources = dataAccessor.getChildNames(dataAccessor.keyBuilder().idealStates());
+    // Add existing live instances and new instances from user input to instances list.
+    clusterState.instances = dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+    clusterState.instances.addAll(inputFields.newInstances);
+
+    // Check if to be removed instances and old instances in swap node exist in live instance.
+    if (!inputFields.nodeSwap.isEmpty() || !inputFields.instancesToRemove.isEmpty()) {
+      Set<String> liveInstanceSet = new HashSet<>(clusterState.instances);
+      for (Map.Entry<String, String> nodeSwapPair : inputFields.nodeSwap.entrySet()) {
+        if (!liveInstanceSet.contains(nodeSwapPair.getKey())) {
+          throw new InvalidParameterException("Invalid input: instance [" + nodeSwapPair.getKey()
+              + "] in SwapInstances does not exist in cluster.");
+        }
+      }
+      for (String instanceToRemove : inputFields.instancesToRemove) {
+        if (!liveInstanceSet.contains(instanceToRemove)) {
+          throw new InvalidParameterException("Invalid input: instance [" + instanceToRemove
+              + "] in RemoveInstances does not exist in cluster.");
+        }
+      }
+      if (!inputFields.instancesToRemove.isEmpty()) {
+        clusterState.instances.removeIf(inputFields.instancesToRemove::contains);
+      }
+    }
+
+    // Read instance and cluster config.
+    // It will throw exception if there is no instanceConfig for newly added instance.
+    for (String instance : clusterState.instances) {
+      InstanceConfig config = cfgAccessor.getInstanceConfig(clusterId, instance);
+      clusterState.instanceConfigs.add(config);
+    }
+    clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId);
+    return clusterState;
+  }
+
+  private AssignmentResult computeOptimalAssignmentForResources(InputFields inputFields,

Review comment:
       Shall we move the logic into HelixUtil or RebalanceUtil? Because this could be something leveraged by Java API as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org