You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/06/12 01:26:06 UTC
[helix] branch master updated: Support currentState format for
partitionAssignment (#1787)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 8911f7e Support currentState format for partitionAssignment (#1787)
8911f7e is described below
commit 8911f7ef3ae3df4252f3fa07fcb9c0e0b6f245a4
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jun 11 18:25:59 2021 -0700
Support currentState format for partitionAssignment (#1787)
Support currentState format for partitionAssignment.
---
.../helix/ResourceAssignmentOptimizerAccessor.java | 38 ++++++++++++++++++++--
.../TestResourceAssignmentOptimizerAccessor.java | 26 +++++++++++++++
2 files changed, 61 insertions(+), 3 deletions(-)
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
index f6b9b82..ee888e0 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -71,6 +71,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
Map<String, String> nodeSwap = new HashMap<>(); // old instance -> new instance.
Set<String> instanceFilter = new HashSet<>();
Set<String> resourceFilter = new HashSet<>();
+ AssignmentFormat returnFormat = AssignmentFormat.IdealStateFormat;
}
// TODO: We could add a data cache here to avoid read latency.
@@ -81,7 +82,9 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
List<String> instances; // cluster LiveInstance + addInstances - instancesToRemove.
}
- // Result format: Map of resource -> partition -> instance -> state.
+ // Result format. User can choose from IdealState or CurrentState format,
+ // IdealState format : Map of resource -> partition -> instance -> state. (default)
+ // CurrentState format : Map of instance -> resource -> partition -> state.
private static class AssignmentResult extends HashMap<String, Map<String, Map<String, String>>> {
public AssignmentResult() {
super();
@@ -104,11 +107,18 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
Map<String, String> swapInstances;
}
+ private enum AssignmentFormat{
+ IdealStateFormat,
+ CurrentStateFormat
+ }
+
private static class OptionsMap {
@JsonProperty("InstanceFilter")
Set<String> instanceFilter;
@JsonProperty("ResourceFilter")
Set<String> resourceFilter;
+ @JsonProperty("ReturnFormat")
+ AssignmentFormat returnFormat;
}
@ResponseMetered(name = HttpConstants.WRITE_REQUEST)
@@ -129,6 +139,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
// 3. Call rebalancer tools for each resource.
result = computeOptimalAssignmentForResources(inputFields, clusterState, clusterId);
// 4. Serialize result to JSON and return.
+ // TODO: We will need to include user input to response header since user may do async call.
return JSONRepresentation(result);
} catch (InvalidParameterException ex) {
return badRequest(ex.getMessage());
@@ -146,7 +157,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
}
private InputFields readInput(String content)
- throws InvalidParameterException, JsonProcessingException {
+ throws JsonProcessingException, IllegalArgumentException {
ObjectMapper objectMapper = new ObjectMapper();
InputJsonContent inputJsonContent = objectMapper.readValue(content, InputJsonContent.class);
@@ -165,6 +176,8 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
.ifPresent(inputFields.resourceFilter::addAll);
Optional.ofNullable(inputJsonContent.optionsMap.instanceFilter)
.ifPresent(inputFields.instanceFilter::addAll);
+ inputFields.returnFormat = Optional.ofNullable(inputJsonContent.optionsMap.returnFormat)
+ .orElse(AssignmentFormat.IdealStateFormat);
}
return inputFields;
@@ -208,6 +221,7 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
clusterState.instanceConfigs.add(config);
}
clusterState.clusterConfig = cfgAccessor.getClusterConfig(clusterId);
+
return clusterState;
}
@@ -277,7 +291,25 @@ public class ResourceAssignmentOptimizerAccessor extends AbstractHelixResource {
result);
}
- return result;
+ return updateAssignmentFormat(inputFields, result);
+ }
+
+ // IdealState format : Map of resource -> partition -> instance -> state. (default)
+ // CurrentState format : Map of instance -> resource -> partition -> state.
+ private AssignmentResult updateAssignmentFormat(InputFields inputFields,
+ AssignmentResult idealStateFormatResult) {
+
+ if (inputFields.returnFormat.equals(AssignmentFormat.CurrentStateFormat)) {
+ AssignmentResult currentStateFormatResult = new AssignmentResult();
+ idealStateFormatResult.forEach((resourceKey, partitionMap) -> partitionMap.forEach(
+ (partitionKey, instanceMap) -> instanceMap.forEach(
+ (instanceKey, instanceState) -> currentStateFormatResult
+ .computeIfAbsent(instanceKey, x -> new HashMap<>())
+ .computeIfAbsent(resourceKey, y -> new HashMap<>())
+ .put(partitionKey, instanceState))));
+ return currentStateFormatResult;
+ }
+ return idealStateFormatResult;
}
private void computeWagedAssignmentResult(List<IdealState> wagedResourceIdealState,
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
index ad1fda2..a5369a8 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -127,6 +127,32 @@ public class TestResourceAssignmentOptimizerAccessor extends AbstractTestClass {
Assert.assertEquals(resourceAssignments3.size(), 2);
Assert.assertTrue(resourceAssignments3.containsKey(resources.get(0)));
Assert.assertTrue(resourceAssignments3.containsKey(resources.get(1)));
+
+ // Test Option CurrentState format
+ // Test AddInstances, RemoveInstances and SwapInstances
+ String payload4 = "{\"InstanceChange\" : { \"AddInstances\" : [\"" + instance1
+ + "\"], \"RemoveInstances\" : [ \"" + toRemoveInstance + "\"], \"SwapInstances\" : {\""
+ + swapOldInstance + "\" : \"" + swapNewInstance
+ + "\"} }, \"Options\" : { \"ReturnFormat\" : \"CurrentStateFormat\" , \"ResourceFilter\" : [\""
+ + resources.get(0) + "\" , \"" + resources.get(1) + "\"]} } ";
+ String body4 = post(urlBase, null, Entity.entity(payload4, MediaType.APPLICATION_JSON_TYPE),
+ Response.Status.OK.getStatusCode(), true);
+ Map<String, Map<String, Map<String, String>>> resourceAssignments4 = OBJECT_MAPPER
+ .readValue(body4, new TypeReference<HashMap<String, Map<String, Map<String, String>>>>() {
+ });
+ // Validate outer map key is instance
+ Set<String> resource4 = new HashSet<>();
+ resourceAssignments4.forEach((k, v) -> v.forEach((kk, vv) -> resource4.add(kk)));
+ Assert.assertTrue(resource4.contains(resources.get(0)));
+ Assert.assertTrue(resource4.contains(resources.get(1)));
+
+ // First inner map key is resource
+ Assert.assertTrue(resourceAssignments4.containsKey(instance1));
+ Assert.assertTrue(resourceAssignments4.containsKey(swapNewInstance));
+ Assert.assertFalse(resourceAssignments4.containsKey(liveInstances.get(0)));
+ Assert.assertFalse(resourceAssignments4.containsKey(liveInstances.get(1)));
+
+
System.out.println("End test :" + TestHelper.getTestMethodName());
}