You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 23:36:26 UTC
helix git commit: [HELIX-718] provide a method in AssignableInstance
to set current assignments
Repository: helix
Updated Branches:
refs/heads/master a9cdecc64 -> e44b29e03
[HELIX-718] provide a method in AssignableInstance to set current assignments
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e44b29e0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e44b29e0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e44b29e0
Branch: refs/heads/master
Commit: e44b29e03ef4c807e940cde717ed2f6fff58a273
Parents: a9cdecc
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jul 9 15:59:27 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Mon Jul 9 15:59:27 2018 -0700
----------------------------------------------------------------------
.../helix/task/assigner/AssignableInstance.java | 45 ++++++++++++++++++--
.../task/assigner/TestAssignableInstance.java | 25 +++++++++++
2 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 5d252eb..8883987 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -186,6 +186,12 @@ public class AssignableInstance {
}
}
+ /**
+ * Update this AssignableInstance with new configs
+ * @param clusterConfig cluster config
+ * @param instanceConfig instance config
+ * @param liveInstance live instance object
+ */
public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
LiveInstance liveInstance) {
logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName());
@@ -319,13 +325,22 @@ public class AssignableInstance {
_currentAssignments.add(result.getTaskConfig().getId());
// update resource usage
+ // TODO (harry): get requested resource type from task config
String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
String quotaType = result.getTaskConfig().getQuotaType();
- // Assume used capacity is updated, and if resource type / quota type is not supported
- // we have already failed the assignment
- int curUsage = _usedCapacity.get(resourceType).get(quotaType);
- _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+ // Resource type / quota type might have already changed, i.e. we are recovering
+ // current assignments for a live instance, but currently running tasks's quota
+ // type has already been removed by user. So we do the deduction with best effort
+ if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType)
+ .containsKey(quotaType)) {
+ int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+ _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+ } else {
+ logger.warn(
+ "Task's requested resource type and quota type is no longer supported. TaskConfig: %s; UsedCapacity: %s",
+ result.getTaskConfig(), _usedCapacity);
+ }
logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(),
_instanceConfig.getInstanceName());
@@ -359,6 +374,28 @@ public class AssignableInstance {
}
/**
+ * This method is used for forcing AssignableInstance to match current assignment state. It
+ * returns with TaskAssignResult for proper release current assignments when they are finished.
+ * @param tasks taskId -> taskConfig mapping
+ * @return taskId -> TaskAssignResult mapping
+ */
+ public Map<String, TaskAssignResult> setCurrentAssignments(Map<String, TaskConfig> tasks) {
+ Map<String, TaskAssignResult> assignment = new HashMap<>();
+ for (Map.Entry<String, TaskConfig> entry : tasks.entrySet()) {
+ TaskAssignResult assignResult =
+ new TaskAssignResult(entry.getValue(), this, true, fitnessScoreFactor, null,
+ "Recovered TaskAssignResult from current state");
+ try {
+ assign(assignResult);
+ assignment.put(entry.getKey(), assignResult);
+ } catch (IllegalStateException e) {
+ logger.error("Failed to set current assignment for task {}.", entry.getValue().getId(), e);
+ }
+ }
+ return assignment;
+ }
+
+ /**
* Returns a set of taskIDs
*/
public Set<String> getCurrentAssignments() {
http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 02a0d39..9b5974a 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -281,6 +281,31 @@ public class TestAssignableInstance {
.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
}
+ @Test
+ public void testSetCurrentAssignment() {
+ AssignableInstance ai =
+ new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, true),
+ new InstanceConfig(testInstanceName), createLiveInstance(
+ new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+ new String[] { "40" }));
+
+ Map<String, TaskConfig> currentAssignments = new HashMap<>();
+ currentAssignments.put("supportedTask", new TaskConfig("", null, "supportedTask", ""));
+ TaskConfig unsupportedTask = new TaskConfig("", null, "unsupportedTask", "");
+ unsupportedTask.setQuotaType("UnsupportedQuotaType");
+ currentAssignments.put("unsupportedTask", unsupportedTask);
+
+ Map<String, TaskAssignResult> results = ai.setCurrentAssignments(currentAssignments);
+ for (TaskAssignResult rst : results.values()) {
+ Assert.assertTrue(rst.isSuccessful());
+ Assert.assertEquals(rst.getAssignableInstance(), ai);
+ }
+ Assert.assertEquals(ai.getCurrentAssignments().size(), 2);
+ Assert.assertEquals(
+ (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+ .get(TaskConfig.DEFAULT_QUOTA_TYPE), 1);
+ }
+
private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) {
Map<String, Integer> ret = new HashMap<>();
for (int i = 0; i < types.length; i++) {