You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/06/12 01:26:06 UTC

[helix] branch master updated: Support currentState format for partitionAssignment (#1787)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8911f7e  Support currentState format for partitionAssignment (#1787)
8911f7e is described below

commit 8911f7ef3ae3df4252f3fa07fcb9c0e0b6f245a4
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jun 11 18:25:59 2021 -0700

    Support currentState format for partitionAssignment (#1787)
    
    Support currentState format for partitionAssignment.
---
 .../helix/ResourceAssignmentOptimizerAccessor.java | 38 ++++++++++++++++++++--
 .../TestResourceAssignmentOptimizerAccessor.java   | 26 +++++++++++++++
 2 files changed, 61 insertions(+), 3 deletions(-)

diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
index f6b9b82..ee888e0 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -71,6 +71,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
     Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
     Set<String> instanceFilter = new HashSet<>();
     Set<String> resourceFilter = new HashSet<>();
+    AssignmentFormat returnFormat = AssignmentFormat.IdealStateFormat;
   }
 
   // TODO: We could add a data cache here to avoid read latency.
@@ -81,7 +82,9 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
     List<String> instances;        // cluster LiveInstance + addInstances - instancesToRemove.
   }
 
-  // Result format: Map of resource -> partition -> instance -> state.
+  // Result format. User can choose from IdealState or CurrentState format,
+  // IdealState format   : Map of resource -> partition -> instance -> state.  (default)
+  // CurrentState format : Map of instance -> resource -> partition -> state.
   private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> {
     public AssignmentResult() {
       super();
@@ -104,11 +107,18 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
     Map<String, String> swapInstances;
   }
 
+  private enum AssignmentFormat{
+    IdealStateFormat,
+    CurrentStateFormat
+  }
+
   private static class OptionsMap {
     @JsonProperty("InstanceFilter")
     Set<String> instanceFilter;
     @JsonProperty("ResourceFilter")
     Set<String> resourceFilter;
+    @JsonProperty("ReturnFormat")
+    AssignmentFormat returnFormat;
   }
 
   @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@@ -129,6 +139,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
       // 3. Call rebalancer tools for each resource.
       result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId);
       // 4. Serialize result to JSON and return.
+      // TODO: We will need to include user input to response header since user may do async call.
       return JSONRepresentation(result);
     } catch (InvalidParameterException ex) {
       return badRequest(ex.getMessage());
@@ -146,7 +157,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
   }
 
   private InputFields readInput(String content)
-      throws InvalidParameterException, JsonProcessingException {
+      throws JsonProcessingException, IllegalArgumentException  {
 
     ObjectMapper objectMapper = new ObjectMapper();
     InputJsonContent inputJsonContent = objectMapper.readValue(content, InputJsonContent.class);
@@ -165,6 +176,8 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
           .ifPresent(inputFields.resourceFilter::addAll);
       Optional.ofNullable(inputJsonContent.optionsMap.instanceFilter)
           .ifPresent(inputFields.instanceFilter::addAll);
+      inputFields.returnFormat = Optional.ofNullable(inputJsonContent.optionsMap.returnFormat)
+          .orElse(AssignmentFormat.IdealStateFormat);
     }
 
     return inputFields;
@@ -208,6 +221,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
       clusterState.instanceConfigs.add(config);
     }
     clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId);
+
     return clusterState;
   }
 
@@ -277,7 +291,25 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
           result);
     }
 
-    return result;
+    return updateAssignmentFormat(inputFields, result);
+  }
+
+  // IdealState format   : Map of resource -> partition -> instance -> state.  (default)
+  // CurrentState format : Map of instance -> resource -> partition -> state.
+  private AssignmentResult updateAssignmentFormat(InputFields inputFields,
+      AssignmentResult idealStateFormatResult) {
+
+    if (inputFields.returnFormat.equals(AssignmentFormat.CurrentStateFormat)) {
+      AssignmentResult currentStateFormatResult = new AssignmentResult();
+      idealStateFormatResult.forEach((resourceKey, partitionMap) -> partitionMap.forEach(
+          (partitionKey, instanceMap) -> instanceMap.forEach(
+              (instanceKey, instanceState) -> currentStateFormatResult
+                  .computeIfAbsent(instanceKey, x -> new HashMap<>())
+                  .computeIfAbsent(resourceKey, y -> new HashMap<>())
+                  .put(partitionKey, instanceState))));
+      return currentStateFormatResult;
+    }
+    return idealStateFormatResult;
   }
 
   private void computeWagedAssignmentResult(List<IdealState> wagedResourceIdealState,
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
index ad1fda2..a5369a8 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -127,6 +127,32 @@ public class TestResourceAssignmentOptimizerAccessor extends AbstractTestClass {
     Assert.assertEquals(resourceAssignments3.size(), 2);
     Assert.assertTrue(resourceAssignments3.containsKey(resources.get(0)));
     Assert.assertTrue(resourceAssignments3.containsKey(resources.get(1)));
+
+    // Test Option CurrentState format
+    // Test AddInstances, RemoveInstances and SwapInstances
+    String payload4 = "{\"InstanceChange\" : { \"AddInstances\" : [\"" + instance1
+        + "\"], \"RemoveInstances\" : [ \"" + toRemoveInstance + "\"], \"SwapInstances\" : {\""
+        + swapOldInstance + "\" : \"" + swapNewInstance
+        + "\"} }, \"Options\" : { \"ReturnFormat\" : \"CurrentStateFormat\" , \"ResourceFilter\" : [\""
+        + resources.get(0) + "\" , \"" + resources.get(1) + "\"]} } ";
+    String body4 = post(urlBase, null, Entity.entity(payload4, MediaType.APPLICATION_JSON_TYPE),
+        Response.Status.OK.getStatusCode(), true);
+    Map<String, Map<String, Map<String, String>>> resourceAssignments4 = OBJECT_MAPPER
+        .readValue(body4, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+        });
+    // Validate outer map key is instance
+    Set<String> resource4 = new HashSet<>();
+    resourceAssignments4.forEach((k, v)  -> v.forEach((kk, vv) -> resource4.add(kk)));
+    Assert.assertTrue(resource4.contains(resources.get(0)));
+    Assert.assertTrue(resource4.contains(resources.get(1)));
+
+    // First inner map key is resource
+    Assert.assertTrue(resourceAssignments4.containsKey(instance1));
+    Assert.assertTrue(resourceAssignments4.containsKey(swapNewInstance));
+    Assert.assertFalse(resourceAssignments4.containsKey(liveInstances.get(0)));
+    Assert.assertFalse(resourceAssignments4.containsKey(liveInstances.get(1)));
+
+
     System.out.println("End test :" + TestHelper.getTestMethodName());
   }