You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 23:36:26 UTC

helix git commit: [HELIX-718] provide a method in AssignableInstance to set current assignments

Repository: helix
Updated Branches:
  refs/heads/master a9cdecc64 -> e44b29e03


[HELIX-718] provide a method in AssignableInstance to set current assignments


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e44b29e0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e44b29e0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e44b29e0

Branch: refs/heads/master
Commit: e44b29e03ef4c807e940cde717ed2f6fff58a273
Parents: a9cdecc
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jul 9 15:59:27 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Mon Jul 9 15:59:27 2018 -0700

----------------------------------------------------------------------
 .../helix/task/assigner/AssignableInstance.java | 45 ++++++++++++++++++--
 .../task/assigner/TestAssignableInstance.java   | 25 +++++++++++
 2 files changed, 66 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 5d252eb..8883987 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -186,6 +186,12 @@ public class AssignableInstance {
     }
   }
 
+  /**
+   * Update this AssignableInstance with new configs
+   * @param clusterConfig cluster config
+   * @param instanceConfig instance config
+   * @param liveInstance live instance object
+   */
   public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
       LiveInstance liveInstance) {
     logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName());
@@ -319,13 +325,22 @@ public class AssignableInstance {
     _currentAssignments.add(result.getTaskConfig().getId());
 
     // update resource usage
+    // TODO (harry): get requested resource type from task config
     String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
     String quotaType = result.getTaskConfig().getQuotaType();
 
-    // Assume used capacity is updated, and if resource type / quota type is not supported
-    // we have already failed the assignment
-    int curUsage = _usedCapacity.get(resourceType).get(quotaType);
-    _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+    // Resource type / quota type might have already changed, i.e. we are recovering
+    // current assignments for a live instance, but currently running tasks's quota
+    // type has already been removed by user. So we do the deduction with best effort
+    if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType)
+        .containsKey(quotaType)) {
+      int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+      _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+    } else {
+      logger.warn(
+          "Task's requested resource type and quota type is no longer supported. TaskConfig: %s; UsedCapacity: %s",
+          result.getTaskConfig(), _usedCapacity);
+    }
 
     logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(),
         _instanceConfig.getInstanceName());
@@ -359,6 +374,28 @@ public class AssignableInstance {
   }
 
   /**
+   * This method is used for forcing AssignableInstance to match current assignment state. It
+   * returns with TaskAssignResult for proper release current assignments when they are finished.
+   * @param tasks taskId -> taskConfig mapping
+   * @return taskId -> TaskAssignResult mapping
+   */
+  public Map<String, TaskAssignResult> setCurrentAssignments(Map<String, TaskConfig> tasks) {
+    Map<String, TaskAssignResult> assignment = new HashMap<>();
+    for (Map.Entry<String, TaskConfig> entry : tasks.entrySet()) {
+      TaskAssignResult assignResult =
+          new TaskAssignResult(entry.getValue(), this, true, fitnessScoreFactor, null,
+              "Recovered TaskAssignResult from current state");
+      try {
+        assign(assignResult);
+        assignment.put(entry.getKey(), assignResult);
+      } catch (IllegalStateException e) {
+        logger.error("Failed to set current assignment for task {}.", entry.getValue().getId(), e);
+      }
+    }
+    return assignment;
+  }
+
+  /**
    * Returns a set of taskIDs
    */
   public Set<String> getCurrentAssignments() {

http://git-wip-us.apache.org/repos/asf/helix/blob/e44b29e0/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 02a0d39..9b5974a 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -281,6 +281,31 @@ public class TestAssignableInstance {
         .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
   }
 
+  @Test
+  public void testSetCurrentAssignment() {
+    AssignableInstance ai =
+        new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, true),
+            new InstanceConfig(testInstanceName), createLiveInstance(
+            new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+            new String[] { "40" }));
+
+    Map<String, TaskConfig> currentAssignments = new HashMap<>();
+    currentAssignments.put("supportedTask", new TaskConfig("", null, "supportedTask", ""));
+    TaskConfig unsupportedTask = new TaskConfig("", null, "unsupportedTask", "");
+    unsupportedTask.setQuotaType("UnsupportedQuotaType");
+    currentAssignments.put("unsupportedTask", unsupportedTask);
+
+    Map<String, TaskAssignResult> results = ai.setCurrentAssignments(currentAssignments);
+    for (TaskAssignResult rst : results.values()) {
+      Assert.assertTrue(rst.isSuccessful());
+      Assert.assertEquals(rst.getAssignableInstance(), ai);
+    }
+    Assert.assertEquals(ai.getCurrentAssignments().size(), 2);
+    Assert.assertEquals(
+        (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+            .get(TaskConfig.DEFAULT_QUOTA_TYPE), 1);
+  }
+
   private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) {
     Map<String, Integer> ret = new HashMap<>();
     for (int i = 0; i < types.length; i++) {