You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 22:56:24 UTC

helix git commit: [HELIX-718] implement AssignableInstance

Repository: helix
Updated Branches:
  refs/heads/master 84c2feabb -> 2049f93ab


[HELIX-718] implement AssignableInstance


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2049f93a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2049f93a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2049f93a

Branch: refs/heads/master
Commit: 2049f93abe8e56a754e4880a9157959ef24cd89e
Parents: 84c2fea
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jul 9 15:49:33 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Mon Jul 9 15:49:33 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskConfig.java  |   4 +-
 .../helix/task/assigner/AssignableInstance.java | 325 +++++++++++++++++-
 .../helix/task/assigner/TaskAssignResult.java   |   5 +-
 .../helix/task/assigner/TaskAssigner.java       |   4 +-
 .../task/assigner/TestAssignableInstance.java   | 334 +++++++++++++++++++
 5 files changed, 650 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index a447929..d3a8b34 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -42,7 +42,7 @@ public class TaskConfig {
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(TaskConfig.class);
-  public static final String QUOTA_TYPE_NOT_SET = "QUOTA_TYPE_NOT_SET";
+  public static final String DEFAULT_QUOTA_TYPE = "DEFAULT";
 
   private final Map<String, String> _configMap;
 
@@ -133,7 +133,7 @@ public class TaskConfig {
    */
   public String getQuotaType() {
     return _configMap.containsKey(TaskConfigProperty.TASK_QUOTA_TYPE.name()) ?
-        _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : QUOTA_TYPE_NOT_SET;
+        _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : DEFAULT_QUOTA_TYPE;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index fe59275..5d252eb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -19,44 +19,277 @@ package org.apache.helix.task.assigner;
  * under the License.
  */
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * AssignableInstance contains instance capacity profile and methods that control capacity and help
  * with task assignment.
  */
 public class AssignableInstance {
+  private static final Logger logger = LoggerFactory.getLogger(AssignableInstance.class);
 
   /**
-   * Caches tasks currently assigned to this instance.
+   * Fitness score will be calculated from 0 to 1000
+   */
+  private static final int fitnessScoreFactor = 1000;
+
+  /**
+   * Caches IDs of tasks currently assigned to this instance.
    * Every pipeline iteration will compare Task states in this map to Task states in TaskDataCache.
    * Tasks in a terminal state (finished or failed) will be removed as soon as they reach the state.
    */
-  private Map<String, TaskAssignResult> _currentAssignments;
+  private Set<String> _currentAssignments;
   private ClusterConfig _clusterConfig;
   private InstanceConfig _instanceConfig;
   private LiveInstance _liveInstance;
 
+  /**
+   * A map recording instance's total capacity:
+   * map{resourceType : map{quotaType : quota}}
+   */
+  private Map<String, Map<String, Integer>> _totalCapacity;
+
+  /**
+   * A map recording instance's used capacity
+   * map{resourceType : map{quotaType : quota}}
+   */
+  private Map<String, Map<String, Integer>> _usedCapacity;
+
   public AssignableInstance(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
       LiveInstance liveInstance) {
+    if (clusterConfig == null || instanceConfig == null || liveInstance == null) {
+      throw new IllegalArgumentException(
+          "ClusterConfig, InstanceConfig, LiveInstance cannot be null!");
+    }
+
+    if (!instanceConfig.getInstanceName().equals(liveInstance.getInstanceName())) {
+      throw new IllegalArgumentException(String
+          .format("Instance name from LiveInstance (%s) and InstanceConfig (%s) don't match!",
+              liveInstance.getInstanceName(), instanceConfig.getInstanceName()));
+    }
     _clusterConfig = clusterConfig;
     _instanceConfig = instanceConfig;
     _liveInstance = liveInstance;
+
+    _currentAssignments = new HashSet<>();
+    _totalCapacity = new HashMap<>();
+    _usedCapacity = new HashMap<>();
+    refreshTotalCapacity();
+  }
+
+  /**
+   * When task quota ratio / instance's resource capacity change, we need to update instance
+   * capacity cache. Couple of corner cases to clarify for updating capacity:
+   *    1. User shrinks capacity and used capacity exceeds total capacity - current assignment
+   *       will not be affected (used > total is ok) but no further assignment decision will
+   *       be made on this instance until spaces get freed up
+   *    2. User removed a quotaType but there are still tasks with stale quota type assigned on
+   *       this instance - current assignment will not be affected, and further assignment will
+   *       NOT be made for stale quota type
+   *    3. User removed a resourceType but there are still tasks with stale resource type assigned
+   *       on this instance - current assignment will not be affected, but no further assignment
+   *       with stale resource type request will be allowed on this instance
+   */
+  private void refreshTotalCapacity() {
+    // Create a temp total capacity record in case we fail to parse configurations, we
+    // still retain existing source of truth
+    Map<String, Map<String, Integer>> tempTotalCapacity = new HashMap<>();
+    Map<String, String> typeQuotaRatio = _clusterConfig.getTaskQuotaRatioMap();
+    Map<String, String> resourceCapacity = _liveInstance.getResourceCapacityMap();
+
+    if (resourceCapacity == null) {
+      resourceCapacity = new HashMap<>();
+      resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
+          Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE));
+      logger.info("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
+          _instanceConfig.getInstanceName(), resourceCapacity);
+    }
+
+    if (typeQuotaRatio == null) {
+      typeQuotaRatio = new HashMap<>();
+      typeQuotaRatio.put(TaskConfig.DEFAULT_QUOTA_TYPE, Integer.toString(1));
+      logger.info("No quota type ratio provided in LiveInstance {}, assuming default ratio: {}",
+          _instanceConfig.getInstanceName(), typeQuotaRatio);
+    }
+
+    logger.info(
+        "Updating capacity for AssignableInstance {}. Resource Capacity: {}; Type Quota Ratio: {}",
+        _instanceConfig.getInstanceName(), resourceCapacity, typeQuotaRatio);
+
+    // Reconcile current and new resource types
+    try {
+      for (final Map.Entry<String, String> resEntry : resourceCapacity.entrySet()) {
+        String resourceType = resEntry.getKey();
+        int capacity = Integer.valueOf(resEntry.getValue());
+
+        if (!_totalCapacity.containsKey(resourceType)) {
+          logger.info("Adding InstanceResourceType {}", resourceType);
+          _usedCapacity.put(resourceType, new HashMap<String, Integer>());
+        }
+        tempTotalCapacity.put(resourceType, new HashMap<String, Integer>());
+
+        int totalRatio = 0;
+        for (String val : typeQuotaRatio.values()) {
+          totalRatio += Integer.valueOf(val);
+        }
+
+        // Setup per-type resource quota based on given total capacity
+        for (Map.Entry<String, String> typeQuotaEntry : typeQuotaRatio.entrySet()) {
+          // Calculate total quota for a given type
+          String quotaType = typeQuotaEntry.getKey();
+          int quotaRatio = Integer.valueOf(typeQuotaEntry.getValue());
+          int quota = Math.round(capacity * (float)quotaRatio / (float)totalRatio);
+
+          // Honor non-zero quota ratio for non-zero capacity even if it is rounded to zero
+          if (capacity != 0 && quotaRatio != 0 && quota == 0) {
+            quota = 1;
+          }
+
+          // record total quota of the resource
+          tempTotalCapacity.get(resourceType).put(quotaType, quota);
+
+          // Add quota for new quota type
+          if (!_usedCapacity.get(resourceType).containsKey(quotaType)) {
+            logger.info("Adding QuotaType {} for resource {}", quotaType, resourceType);
+            _usedCapacity.get(resourceType).put(quotaType, 0);
+          }
+        }
+
+        // For removed quota type, remove record from used capacity
+        _usedCapacity.get(resourceType).keySet().retainAll(typeQuotaRatio.keySet());
+      }
+
+      // Update total capacity map
+      _totalCapacity = tempTotalCapacity;
+
+      // Purge used capacity for resource deleted
+      _usedCapacity.keySet().retainAll(resourceCapacity.keySet());
+
+      logger.info(
+          "Finished updating capacity for AssignableInstance {}. Current capacity {}. Current usage: {}",
+          _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity);
+    } catch (Exception e) {
+      // TODO: properly escalate error
+      logger.error(
+          "Failed to update capacity for Assignableinstance {}, still using current capacity {}. Current usage: {}",
+          _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity, e);
+    }
+  }
+
+  public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
+      LiveInstance liveInstance) {
+    logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName());
+    boolean refreshCapacity = false;
+    if (clusterConfig != null) {
+      if (!clusterConfig.getTaskQuotaRatioMap().equals(_clusterConfig.getTaskQuotaRatioMap())) {
+        refreshCapacity = true;
+      }
+      _clusterConfig = clusterConfig;
+      logger.info("Updated cluster config");
+    }
+
+    if (liveInstance != null) {
+      if (!_instanceConfig.getInstanceName().equals(liveInstance.getInstanceName())) {
+        logger.error(
+            "Cannot update live instance with different instance name. Current: {}; new: {}",
+            _instanceConfig.getInstanceName(), liveInstance.getInstanceName());
+      } else {
+        if (!liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap())) {
+          refreshCapacity = true;
+        }
+        _liveInstance = liveInstance;
+        logger.info("Updated live instance");
+      }
+    }
+
+    if (instanceConfig != null) {
+      if (!_instanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())) {
+        logger.error(
+            "Cannot update instance config with different instance name. Current: {}; new: {}",
+            _instanceConfig.getInstanceName(), instanceConfig.getInstanceName());
+      } else {
+        _instanceConfig = instanceConfig;
+        logger.info("Updated instance config");
+      }
+    }
+
+    if (refreshCapacity) {
+      refreshTotalCapacity();
+    }
+
+    logger.info("Updated configs for AssignableInstance {}", _instanceConfig.getInstanceName());
   }
 
   /**
    * Tries to assign the given task on this instance and returns TaskAssignResult. Instance capacity
    * profile is NOT modified by tryAssign.
-   * @param task
-   * @return
+   *
+   * When calculating fitness of an assignment, this function will rate assignment from 0 to 1000,
+   * and the assignment that has a higher score will be a better fit.
+   *
+   * @param task task config
+   * @return TaskAssignResult
+   * @throws IllegalArgumentException if task is null
    */
-  public TaskAssignResult tryAssign(TaskConfig task) {
-    // TODO: implement
-    return null;
+  public TaskAssignResult tryAssign(TaskConfig task) throws IllegalArgumentException {
+    if (task == null) {
+      throw new IllegalArgumentException("Task is null!");
+    }
+
+    if (_currentAssignments.contains(task.getId())) {
+      return new TaskAssignResult(task, this, false, 0,
+          TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED, String
+          .format("Task %s is already assigned to this instance. Need to release it first",
+              task.getId()));
+    }
+
+    // For now we only have 1 type of resource so just hard code it here
+    String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
+
+    // Fail when no such resource type
+    if (!_totalCapacity.containsKey(resourceType)) {
+      return new TaskAssignResult(task, this, false, 0,
+          TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE, String
+          .format("Requested resource type %s not supported. Available resource types: %s",
+              resourceType, _totalCapacity.keySet()));
+    }
+
+    String quotaType = task.getQuotaType();
+
+    // Fail when no such quota type
+    if (!_totalCapacity.get(resourceType).containsKey(quotaType)) {
+      return new TaskAssignResult(task, this, false, 0,
+          TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE, String
+          .format("Requested quota type %s not defined. Available quota types: %s", quotaType,
+              _totalCapacity.get(resourceType).keySet()));
+    }
+
+    int capacity = _totalCapacity.get(resourceType).get(quotaType);
+    int usage =  _usedCapacity.get(resourceType).get(quotaType);
+
+    // Fail with insufficient quota
+    if (capacity <= usage) {
+      return new TaskAssignResult(task, this, false, 0,
+          TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, String
+          .format("Insufficient quota %s::%s. Capacity: %s, Current Usage: %s", resourceType,
+              quotaType, capacity, usage));
+    }
+
+    // More remaining capacity leads to higher fitness score
+    int fitness = Math.round((float)(capacity - usage) / capacity * fitnessScoreFactor);
+
+    return new TaskAssignResult(task, this, true, fitness,
+        null, "");
   }
 
   /**
@@ -64,29 +297,71 @@ public class AssignableInstance {
    * 1. Deduct the amount of resource required by this task
    * 2. Add this TaskAssignResult to _currentAssignments
    * @param result
-   * @throws IllegalStateException if TaskAssignResult is not successful
+   * @throws IllegalStateException if TaskAssignResult is not successful or the task is double
+   *                              assigned, or the task is not assigned to this instance
    */
   public void assign(TaskAssignResult result) throws IllegalStateException {
-    // TODO: implement
-    return;
+    if (!result.isSuccessful()) {
+      throw new IllegalStateException("Cannot assign a failed result: " + result);
+    }
+
+    if (!result.getInstanceName().equals(getInstanceName())) {
+      throw new IllegalStateException(String.format(
+          "Cannot assign a result for a different instance. This instance: %s; Result: %s",
+          getInstanceName(), result));
+    }
+
+    if (_currentAssignments.contains(result.getTaskConfig().getId())) {
+      throw new IllegalStateException(
+          "Cannot double assign task " + result.getTaskConfig().getId());
+    }
+
+    _currentAssignments.add(result.getTaskConfig().getId());
+
+    // update resource usage
+    String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
+    String quotaType = result.getTaskConfig().getQuotaType();
+
+    // Assume used capacity is updated, and if resource type / quota type is not supported
+    // we have already failed the assignment
+    int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+    _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+
+    logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(),
+        _instanceConfig.getInstanceName());
   }
 
   /**
    * Performs the following to release resource for a task:
    * 1. Release the resource by adding back what the task required.
    * 2. Remove the TaskAssignResult from _currentAssignments
-   * @param taskID
-   * @throws IllegalArgumentException if task is not found
+   * @param taskConfig config of this task
    */
-  public void release(String taskID) throws IllegalArgumentException {
-    // TODO: implement
-    return;
+  public void release(TaskConfig taskConfig) {
+    if (!_currentAssignments.contains(taskConfig.getId())) {
+      logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(),
+          _instanceConfig.getInstanceName());
+      return;
+    }
+    String quotaType = taskConfig.getQuotaType();
+    String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
+
+    // We might be releasing a task whose resource requirement / quota type is out-dated,
+    // thus we need to check to avoid NPE
+    if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType)
+        .containsKey(quotaType)) {
+      int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+      _usedCapacity.get(resourceType).put(quotaType, curUsage - 1);
+    }
+    _currentAssignments.remove(taskConfig.getId());
+    logger.info("Released task {} from instance {}", taskConfig.getId(),
+        _instanceConfig.getInstanceName());
   }
 
   /**
-   * Returns taskID -> TaskAssignResult mappings.
+   * Returns a set of taskIDs
    */
-  public Map<String, TaskAssignResult> getCurrentAssignments() {
+  public Set<String> getCurrentAssignments() {
     return _currentAssignments;
   }
 
@@ -96,4 +371,20 @@ public class AssignableInstance {
   public String getInstanceName() {
     return _instanceConfig.getInstanceName();
   }
+
+  /**
+   * Returns total capacity of the AssignableInstance
+   * @return map{resourceType : map{quotaType : quota}}
+   */
+  public Map<String, Map<String, Integer>> getTotalCapacity() {
+    return _totalCapacity;
+  }
+
+  /**
+   * Returns used capacity of the AssignableInstance
+   * @return map{resourceType : map{quotaType : usedQuota}}
+   */
+  public Map<String, Map<String, Integer>> getUsedCapacity() {
+    return _usedCapacity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
index 6d7b4e8..00d7db1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
@@ -34,7 +34,10 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> {
     NO_SUCH_RESOURCE_TYPE,
 
     // Required quota type is not configured
-    NO_SUCH_QUOTA_TYPE
+    NO_SUCH_QUOTA_TYPE,
+
+    // Task cannot be assigned twice on a node without releasing it first
+    TASK_ALREADY_ASSIGNED
   }
 
   private final boolean _isAssignmentSuccessful;

http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
index 66614a2..79fbd64 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
@@ -28,8 +28,8 @@ public interface TaskAssigner {
    * Assign a collection of tasks on a collection of assignableInstances.
    * When an assignment decision is made, AssignableInstance.assign() must be called for the
    * instance to modify its internal capacity profile.
-   * @param assignableInstances
-   * @param tasks
+   * @param assignableInstances String -> AssignableInstanceMapping
+   * @param tasks String -> TaskConfig
    * @return taskID -> TaskAssignmentResult mapping per task
    */
   Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,

http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
new file mode 100644
index 0000000..02a0d39
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -0,0 +1,334 @@
+package org.apache.helix.task.assigner;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAssignableInstance {
+  private static final String testClusterName = "testCluster";
+  private static final String testInstanceName = "testInstance";
+
+  private static final String[] testResourceTypes = new String[] {"Resource1", "Resource2", "Resource3"};
+  private static final String[] testResourceCapacity = new String[] {"20", "50", "100"};
+
+  private static final String[] testQuotaTypes = new String[] {"Type1", "Type2", "Type3"};
+  private static final String[] testQuotaRatio = new String[] {"50", "30", "20"};
+  private static final String defaultQuotaRatio = "100";
+
+
+  @Test
+  public void testInvalidInitialization() {
+    try {
+      AssignableInstance ai = new AssignableInstance(null, null, null);
+      Assert.fail("Expecting IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("cannot be null"));
+    }
+
+    try {
+      ClusterConfig clusterConfig = new ClusterConfig("testCluster");
+      InstanceConfig instanceConfig = new InstanceConfig("instance");
+      LiveInstance liveInstance = new LiveInstance("another-instance");
+      AssignableInstance ai = new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
+      Assert.fail("Expecting IllegalArgumentException");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("don't match"));
+    }
+  }
+
+  @Test
+  public void testInitializationWithQuotaUnset() {
+    // Initialize AssignableInstance with neither resource capacity nor quota ratio provided
+    AssignableInstance ai = new AssignableInstance(
+        createClusterConfig(null, null, false),
+        new InstanceConfig(testInstanceName),
+        createLiveInstance(null, null)
+    );
+    Assert.assertEquals(ai.getUsedCapacity().size(), 1);
+    Assert.assertEquals(
+        (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+            .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0);
+    Assert.assertEquals(
+        (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+            .get(TaskConfig.DEFAULT_QUOTA_TYPE), TaskStateModelFactory.TASK_THREADPOOL_SIZE);
+    Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
+  }
+
+  @Test
+  public void testInitializationWithOnlyCapacity() {
+    // Initialize AssignableInstance with only resource capacity provided
+    AssignableInstance ai = new AssignableInstance(
+        createClusterConfig(null, null, false),
+        new InstanceConfig(testInstanceName),
+        createLiveInstance(testResourceTypes, testResourceCapacity)
+    );
+    Assert.assertEquals(ai.getTotalCapacity().size(), testResourceTypes.length);
+    Assert.assertEquals(ai.getUsedCapacity().size(), testResourceTypes.length);
+    for (int i = 0; i < testResourceTypes.length; i++) {
+      Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]).size(), 1);
+      Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]).size(), 1);
+      Assert.assertEquals(
+          ai.getTotalCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE),
+          Integer.valueOf(testResourceCapacity[i])
+      );
+      Assert.assertEquals(
+          ai.getUsedCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE),
+          Integer.valueOf(0)
+      );
+    }
+  }
+
+  @Test
+  public void testInitializationWithOnlyQuotaType() {
+    // Initialize AssignableInstance with only quota type provided
+    AssignableInstance ai = new AssignableInstance(
+        createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+        new InstanceConfig(testInstanceName),
+        createLiveInstance(null, null)
+    );
+
+    Assert.assertEquals(ai.getTotalCapacity().size(), 1);
+    Assert.assertEquals(ai.getUsedCapacity().size(), 1);
+    Assert.assertEquals(
+        ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
+        testQuotaTypes.length
+    );
+    Assert.assertEquals(
+        ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
+        testQuotaTypes.length
+    );
+    Assert.assertEquals(
+        ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
+        calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE, testQuotaTypes,
+            testQuotaRatio));
+    Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
+  }
+
+  @Test
+  public void testInitializationWithQuotaAndCapacity() {
+    // Initialize AssignableInstance with both capacity and quota type provided
+    AssignableInstance ai = new AssignableInstance(
+        createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+        new InstanceConfig(testInstanceName),
+        createLiveInstance(testResourceTypes, testResourceCapacity)
+    );
+
+    Map<String, Integer> usedResourcePerType =
+        createResourceQuotaPerTypeMap(testQuotaTypes, new int[] { 0, 0, 0 });
+    for (int i = 0; i < testResourceTypes.length; i++) {
+      Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]),
+          calculateExpectedQuotaPerType(Integer.valueOf(testResourceCapacity[i]), testQuotaTypes,
+              testQuotaRatio));
+      Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]), usedResourcePerType);
+    }
+  }
+
+  @Test
+  public void testAssignableInstanceUpdateConfigs() {
+    AssignableInstance ai = new AssignableInstance(
+        createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+        new InstanceConfig(testInstanceName),
+        createLiveInstance(testResourceTypes, testResourceCapacity)
+    );
+
+    String[] newResources = new String[] {"Resource2", "Resource3", "Resource4"};
+    String[] newResourceCapacities = new String[] {"100", "150", "50"};
+
+    String[] newTypes = new String[] {"Type3", "Type4", "Type5", "Type6"};
+    String[] newTypeRatio = new String[] {"20", "40", "25", "25"};
+
+    LiveInstance newLiveInstance = createLiveInstance(newResources, newResourceCapacities);
+    ClusterConfig newClusterConfig = createClusterConfig(newTypes, newTypeRatio, false);
+    ai.updateConfigs(newClusterConfig, null, newLiveInstance);
+
+    Assert.assertEquals(ai.getUsedCapacity().size(), newResourceCapacities.length);
+    Assert.assertEquals(ai.getTotalCapacity().size(), newResourceCapacities.length);
+
+    for (int i = 0; i < newResources.length; i++) {
+      Assert.assertEquals(ai.getTotalCapacity().get(newResources[i]),
+          calculateExpectedQuotaPerType(Integer.valueOf(newResourceCapacities[i]), newTypes,
+              newTypeRatio));
+      Assert.assertEquals(ai.getUsedCapacity().get(newResources[i]),
+          createResourceQuotaPerTypeMap(newTypes, new int[] { 0, 0, 0, 0 }));
+    }
+  }
+
+  @Test
+  public void testNormalTryAssign() {
+    AssignableInstance ai = new AssignableInstance(
+        createClusterConfig(null, null, true),
+        new InstanceConfig(testInstanceName),
+        createLiveInstance(null, null)
+    );
+
+    // When nothing is configured, we should use default quota to assign
+    Map<String, TaskAssignResult> results = new HashMap<>();
+    for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) {
+      String taskId = Integer.toString(i);
+      TaskConfig task = new TaskConfig("", null, taskId, null);
+      TaskAssignResult result = ai.tryAssign(task);
+      Assert.assertTrue(result.isSuccessful());
+      ai.assign(result);
+      results.put(taskId, result);
+    }
+
+    // We are out of quota now and we should not be able to assign
+    String taskId = "TaskCannotAssign";
+    TaskConfig task = new TaskConfig("", null, taskId, null);
+    TaskAssignResult result = ai.tryAssign(task);
+    Assert.assertFalse(result.isSuccessful());
+    Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+    try {
+      ai.assign(result);
+      Assert.fail("Expecting IllegalStateException");
+    } catch (IllegalStateException e) {
+      // OK
+    }
+
+    // After releasing 1 task, we should be able to schedule
+    ai.release(results.get("1").getTaskConfig());
+    result = ai.tryAssign(task);
+    Assert.assertTrue(result.isSuccessful());
+
+    // release all tasks, check remaining resources
+    for (TaskAssignResult rst : results.values()) {
+      ai.release(rst.getTaskConfig());
+    }
+
+    Assert.assertEquals(
+        (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+            .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0);
+  }
+
+  @Test
+  public void testTryAssignFailure() {
+    AssignableInstance ai =
+        new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+            new InstanceConfig(testInstanceName),
+            createLiveInstance(testResourceTypes, testResourceCapacity));
+
+    // No such resource type
+    String taskId = "testTask";
+    TaskConfig task = new TaskConfig("", null, taskId, "");
+    TaskAssignResult result = ai.tryAssign(task);
+    Assert.assertFalse(result.isSuccessful());
+    Assert.assertEquals(result.getFailureReason(),
+        TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE);
+
+    // No such quota type
+    ai.updateConfigs(null, null, createLiveInstance(
+        new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+        new String[] { "1" }));
+
+    result = ai.tryAssign(task);
+    Assert.assertFalse(result.isSuccessful());
+    Assert
+        .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE);
+
+    ai.updateConfigs(createClusterConfig(testQuotaTypes, testQuotaRatio, true), null, null);
+
+    task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE);
+    result = ai.tryAssign(task);
+    Assert.assertTrue(result.isSuccessful());
+    ai.assign(result);
+    try {
+      ai.assign(result);
+      Assert.fail("Expecting IllegalArgumentException");
+    } catch (IllegalStateException e) {
+      // OK
+    }
+
+    // Duplicate assignment
+    result = ai.tryAssign(task);
+    Assert.assertFalse(result.isSuccessful());
+    Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED);
+
+    // Insufficient quota
+    ai.release(task);
+    ai.updateConfigs(null, null, createLiveInstance(
+        new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+        new String[] { "0" }));
+
+    task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE);
+    result = ai.tryAssign(task);
+    Assert.assertFalse(result.isSuccessful());
+    Assert
+        .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+  }
+
+  private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) {
+    Map<String, Integer> ret = new HashMap<>();
+    for (int i = 0; i < types.length; i++) {
+      ret.put(types[i], quotas[i]);
+    }
+    return ret;
+  }
+
+  private Map<String, Integer> calculateExpectedQuotaPerType(int capacity, String[] quotaTypes,
+      String[] quotaRatios) {
+    Integer totalQuota = 0;
+    Map<String, Integer> expectedQuotaPerType = new HashMap<>();
+
+    for (String ratio : quotaRatios) {
+      totalQuota += Integer.valueOf(ratio);
+    }
+
+    for (int i = 0; i < quotaRatios.length; i++) {
+      expectedQuotaPerType.put(quotaTypes[i],
+          Math.round((float)capacity * Integer.valueOf(quotaRatios[i])
+              / totalQuota));
+    }
+    return expectedQuotaPerType;
+  }
+
+  private LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity) {
+    LiveInstance li = new LiveInstance(testInstanceName);
+    if (resourceCapacity != null && resourceTypes != null) {
+      Map<String, String> resMap = new HashMap<>();
+      for (int i = 0; i < resourceCapacity.length; i++) {
+        resMap.put(resourceTypes[i], resourceCapacity[i]);
+      }
+      li.setResourceCapacityMap(resMap);
+    }
+    return li;
+  }
+
+  private ClusterConfig createClusterConfig(String[] quotaTypes, String[] quotaRatio,
+      boolean addDefaultQuota) {
+    ClusterConfig clusterConfig = new ClusterConfig(testClusterName);
+    if (quotaTypes != null && quotaRatio != null) {
+      for (int i = 0; i < quotaTypes.length; i++) {
+        clusterConfig.setTaskQuotaRatio(quotaTypes[i], quotaRatio[i]);
+      }
+    }
+    if (addDefaultQuota) {
+      clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, defaultQuotaRatio);
+    }
+    return clusterConfig;
+  }
+}