You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 22:56:24 UTC
helix git commit: [HELIX-718] implement AssignableInstance
Repository: helix
Updated Branches:
refs/heads/master 84c2feabb -> 2049f93ab
[HELIX-718] implement AssignableInstance
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2049f93a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2049f93a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2049f93a
Branch: refs/heads/master
Commit: 2049f93abe8e56a754e4880a9157959ef24cd89e
Parents: 84c2fea
Author: Harry Zhang <hr...@linkedin.com>
Authored: Mon Jul 9 15:49:33 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Mon Jul 9 15:49:33 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskConfig.java | 4 +-
.../helix/task/assigner/AssignableInstance.java | 325 +++++++++++++++++-
.../helix/task/assigner/TaskAssignResult.java | 5 +-
.../helix/task/assigner/TaskAssigner.java | 4 +-
.../task/assigner/TestAssignableInstance.java | 334 +++++++++++++++++++
5 files changed, 650 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index a447929..d3a8b34 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -42,7 +42,7 @@ public class TaskConfig {
}
private static final Logger LOG = LoggerFactory.getLogger(TaskConfig.class);
- public static final String QUOTA_TYPE_NOT_SET = "QUOTA_TYPE_NOT_SET";
+ public static final String DEFAULT_QUOTA_TYPE = "DEFAULT";
private final Map<String, String> _configMap;
@@ -133,7 +133,7 @@ public class TaskConfig {
*/
public String getQuotaType() {
return _configMap.containsKey(TaskConfigProperty.TASK_QUOTA_TYPE.name()) ?
- _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : QUOTA_TYPE_NOT_SET;
+ _configMap.get(TaskConfigProperty.TASK_QUOTA_TYPE.name()) : DEFAULT_QUOTA_TYPE;
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index fe59275..5d252eb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -19,44 +19,277 @@ package org.apache.helix.task.assigner;
* under the License.
*/
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* AssignableInstance contains instance capacity profile and methods that control capacity and help
* with task assignment.
*/
public class AssignableInstance {
+ private static final Logger logger = LoggerFactory.getLogger(AssignableInstance.class);
/**
- * Caches tasks currently assigned to this instance.
+ * Fitness score will be calculated from 0 to 1000
+ */
+ private static final int fitnessScoreFactor = 1000;
+
+ /**
+ * Caches IDs of tasks currently assigned to this instance.
* Every pipeline iteration will compare Task states in this map to Task states in TaskDataCache.
* Tasks in a terminal state (finished or failed) will be removed as soon as they reach the state.
*/
- private Map<String, TaskAssignResult> _currentAssignments;
+ private Set<String> _currentAssignments;
private ClusterConfig _clusterConfig;
private InstanceConfig _instanceConfig;
private LiveInstance _liveInstance;
+ /**
+ * A map recording instance's total capacity:
+ * map{resourceType : map{quotaType : quota}}
+ */
+ private Map<String, Map<String, Integer>> _totalCapacity;
+
+ /**
+ * A map recording instance's used capacity
+ * map{resourceType : map{quotaType : quota}}
+ */
+ private Map<String, Map<String, Integer>> _usedCapacity;
+
public AssignableInstance(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
LiveInstance liveInstance) {
+ if (clusterConfig == null || instanceConfig == null || liveInstance == null) {
+ throw new IllegalArgumentException(
+ "ClusterConfig, InstanceConfig, LiveInstance cannot be null!");
+ }
+
+ if (!instanceConfig.getInstanceName().equals(liveInstance.getInstanceName())) {
+ throw new IllegalArgumentException(String
+ .format("Instance name from LiveInstance (%s) and InstanceConfig (%s) don't match!",
+ liveInstance.getInstanceName(), instanceConfig.getInstanceName()));
+ }
_clusterConfig = clusterConfig;
_instanceConfig = instanceConfig;
_liveInstance = liveInstance;
+
+ _currentAssignments = new HashSet<>();
+ _totalCapacity = new HashMap<>();
+ _usedCapacity = new HashMap<>();
+ refreshTotalCapacity();
+ }
+
+ /**
+ * When task quota ratio / instance's resource capacity change, we need to update instance
+ * capacity cache. Couple of corner cases to clarify for updating capacity:
+ * 1. User shrinks capacity and used capacity exceeds total capacity - current assignment
+ * will not be affected (used > total is ok) but no further assignment decision will
+ * be made on this instance until spaces get freed up
+ * 2. User removed a quotaType but there are still tasks with stale quota type assigned on
+ * this instance - current assignment will not be affected, and further assignment will
+ * NOT be made for stale quota type
+ * 3. User removed a resourceType but there are still tasks with stale resource type assigned
+ * on this instance - current assignment will not be affected, but no further assignment
+ * with stale resource type request will be allowed on this instance
+ */
+ private void refreshTotalCapacity() {
+ // Create a temp total capacity record in case we fail to parse configurations, we
+ // still retain existing source of truth
+ Map<String, Map<String, Integer>> tempTotalCapacity = new HashMap<>();
+ Map<String, String> typeQuotaRatio = _clusterConfig.getTaskQuotaRatioMap();
+ Map<String, String> resourceCapacity = _liveInstance.getResourceCapacityMap();
+
+ if (resourceCapacity == null) {
+ resourceCapacity = new HashMap<>();
+ resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
+ Integer.toString(TaskStateModelFactory.TASK_THREADPOOL_SIZE));
+ logger.info("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
+ _instanceConfig.getInstanceName(), resourceCapacity);
+ }
+
+ if (typeQuotaRatio == null) {
+ typeQuotaRatio = new HashMap<>();
+ typeQuotaRatio.put(TaskConfig.DEFAULT_QUOTA_TYPE, Integer.toString(1));
+ logger.info("No quota type ratio provided in LiveInstance {}, assuming default ratio: {}",
+ _instanceConfig.getInstanceName(), typeQuotaRatio);
+ }
+
+ logger.info(
+ "Updating capacity for AssignableInstance {}. Resource Capacity: {}; Type Quota Ratio: {}",
+ _instanceConfig.getInstanceName(), resourceCapacity, typeQuotaRatio);
+
+ // Reconcile current and new resource types
+ try {
+ for (final Map.Entry<String, String> resEntry : resourceCapacity.entrySet()) {
+ String resourceType = resEntry.getKey();
+ int capacity = Integer.valueOf(resEntry.getValue());
+
+ if (!_totalCapacity.containsKey(resourceType)) {
+ logger.info("Adding InstanceResourceType {}", resourceType);
+ _usedCapacity.put(resourceType, new HashMap<String, Integer>());
+ }
+ tempTotalCapacity.put(resourceType, new HashMap<String, Integer>());
+
+ int totalRatio = 0;
+ for (String val : typeQuotaRatio.values()) {
+ totalRatio += Integer.valueOf(val);
+ }
+
+ // Setup per-type resource quota based on given total capacity
+ for (Map.Entry<String, String> typeQuotaEntry : typeQuotaRatio.entrySet()) {
+ // Calculate total quota for a given type
+ String quotaType = typeQuotaEntry.getKey();
+ int quotaRatio = Integer.valueOf(typeQuotaEntry.getValue());
+ int quota = Math.round(capacity * (float)quotaRatio / (float)totalRatio);
+
+ // Honor non-zero quota ratio for non-zero capacity even if it is rounded to zero
+ if (capacity != 0 && quotaRatio != 0 && quota == 0) {
+ quota = 1;
+ }
+
+ // record total quota of the resource
+ tempTotalCapacity.get(resourceType).put(quotaType, quota);
+
+ // Add quota for new quota type
+ if (!_usedCapacity.get(resourceType).containsKey(quotaType)) {
+ logger.info("Adding QuotaType {} for resource {}", quotaType, resourceType);
+ _usedCapacity.get(resourceType).put(quotaType, 0);
+ }
+ }
+
+ // For removed quota type, remove record from used capacity
+ _usedCapacity.get(resourceType).keySet().retainAll(typeQuotaRatio.keySet());
+ }
+
+ // Update total capacity map
+ _totalCapacity = tempTotalCapacity;
+
+ // Purge used capacity for resource deleted
+ _usedCapacity.keySet().retainAll(resourceCapacity.keySet());
+
+ logger.info(
+ "Finished updating capacity for AssignableInstance {}. Current capacity {}. Current usage: {}",
+ _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity);
+ } catch (Exception e) {
+ // TODO: properly escalate error
+ logger.error(
+ "Failed to update capacity for Assignableinstance {}, still using current capacity {}. Current usage: {}",
+ _instanceConfig.getInstanceName(), _totalCapacity, _usedCapacity, e);
+ }
+ }
+
+ public void updateConfigs(ClusterConfig clusterConfig, InstanceConfig instanceConfig,
+ LiveInstance liveInstance) {
+ logger.info("Updating configs for AssignableInstance {}", _instanceConfig.getInstanceName());
+ boolean refreshCapacity = false;
+ if (clusterConfig != null) {
+ if (!clusterConfig.getTaskQuotaRatioMap().equals(_clusterConfig.getTaskQuotaRatioMap())) {
+ refreshCapacity = true;
+ }
+ _clusterConfig = clusterConfig;
+ logger.info("Updated cluster config");
+ }
+
+ if (liveInstance != null) {
+ if (!_instanceConfig.getInstanceName().equals(liveInstance.getInstanceName())) {
+ logger.error(
+ "Cannot update live instance with different instance name. Current: {}; new: {}",
+ _instanceConfig.getInstanceName(), liveInstance.getInstanceName());
+ } else {
+ if (!liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap())) {
+ refreshCapacity = true;
+ }
+ _liveInstance = liveInstance;
+ logger.info("Updated live instance");
+ }
+ }
+
+ if (instanceConfig != null) {
+ if (!_instanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())) {
+ logger.error(
+ "Cannot update instance config with different instance name. Current: {}; new: {}",
+ _instanceConfig.getInstanceName(), instanceConfig.getInstanceName());
+ } else {
+ _instanceConfig = instanceConfig;
+ logger.info("Updated instance config");
+ }
+ }
+
+ if (refreshCapacity) {
+ refreshTotalCapacity();
+ }
+
+ logger.info("Updated configs for AssignableInstance {}", _instanceConfig.getInstanceName());
}
/**
* Tries to assign the given task on this instance and returns TaskAssignResult. Instance capacity
* profile is NOT modified by tryAssign.
- * @param task
- * @return
+ *
+ * When calculating fitness of an assignment, this function will rate assignment from 0 to 1000,
+ * and the assignment that has a higher score will be a better fit.
+ *
+ * @param task task config
+ * @return TaskAssignResult
+ * @throws IllegalArgumentException if task is null
*/
- public TaskAssignResult tryAssign(TaskConfig task) {
- // TODO: implement
- return null;
+ public TaskAssignResult tryAssign(TaskConfig task) throws IllegalArgumentException {
+ if (task == null) {
+ throw new IllegalArgumentException("Task is null!");
+ }
+
+ if (_currentAssignments.contains(task.getId())) {
+ return new TaskAssignResult(task, this, false, 0,
+ TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED, String
+ .format("Task %s is already assigned to this instance. Need to release it first",
+ task.getId()));
+ }
+
+ // For now we only have 1 type of resource so just hard code it here
+ String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
+
+ // Fail when no such resource type
+ if (!_totalCapacity.containsKey(resourceType)) {
+ return new TaskAssignResult(task, this, false, 0,
+ TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE, String
+ .format("Requested resource type %s not supported. Available resource types: %s",
+ resourceType, _totalCapacity.keySet()));
+ }
+
+ String quotaType = task.getQuotaType();
+
+ // Fail when no such quota type
+ if (!_totalCapacity.get(resourceType).containsKey(quotaType)) {
+ return new TaskAssignResult(task, this, false, 0,
+ TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE, String
+ .format("Requested quota type %s not defined. Available quota types: %s", quotaType,
+ _totalCapacity.get(resourceType).keySet()));
+ }
+
+ int capacity = _totalCapacity.get(resourceType).get(quotaType);
+ int usage = _usedCapacity.get(resourceType).get(quotaType);
+
+ // Fail with insufficient quota
+ if (capacity <= usage) {
+ return new TaskAssignResult(task, this, false, 0,
+ TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA, String
+ .format("Insufficient quota %s::%s. Capacity: %s, Current Usage: %s", resourceType,
+ quotaType, capacity, usage));
+ }
+
+ // More remaining capacity leads to higher fitness score
+ int fitness = Math.round((float)(capacity - usage) / capacity * fitnessScoreFactor);
+
+ return new TaskAssignResult(task, this, true, fitness,
+ null, "");
}
/**
@@ -64,29 +297,71 @@ public class AssignableInstance {
* 1. Deduct the amount of resource required by this task
* 2. Add this TaskAssignResult to _currentAssignments
* @param result
- * @throws IllegalStateException if TaskAssignResult is not successful
+ * @throws IllegalStateException if TaskAssignResult is not successful or the task is double
+ * assigned, or the task is not assigned to this instance
*/
public void assign(TaskAssignResult result) throws IllegalStateException {
- // TODO: implement
- return;
+ if (!result.isSuccessful()) {
+ throw new IllegalStateException("Cannot assign a failed result: " + result);
+ }
+
+ if (!result.getInstanceName().equals(getInstanceName())) {
+ throw new IllegalStateException(String.format(
+ "Cannot assign a result for a different instance. This instance: %s; Result: %s",
+ getInstanceName(), result));
+ }
+
+ if (_currentAssignments.contains(result.getTaskConfig().getId())) {
+ throw new IllegalStateException(
+ "Cannot double assign task " + result.getTaskConfig().getId());
+ }
+
+ _currentAssignments.add(result.getTaskConfig().getId());
+
+ // update resource usage
+ String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
+ String quotaType = result.getTaskConfig().getQuotaType();
+
+ // Assume used capacity is updated, and if resource type / quota type is not supported
+ // we have already failed the assignment
+ int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+ _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+
+ logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(),
+ _instanceConfig.getInstanceName());
}
/**
* Performs the following to release resource for a task:
* 1. Release the resource by adding back what the task required.
* 2. Remove the TaskAssignResult from _currentAssignments
- * @param taskID
- * @throws IllegalArgumentException if task is not found
+ * @param taskConfig config of this task
*/
- public void release(String taskID) throws IllegalArgumentException {
- // TODO: implement
- return;
+ public void release(TaskConfig taskConfig) {
+ if (!_currentAssignments.contains(taskConfig.getId())) {
+ logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(),
+ _instanceConfig.getInstanceName());
+ return;
+ }
+ String quotaType = taskConfig.getQuotaType();
+ String resourceType = LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name();
+
+ // We might be releasing a task whose resource requirement / quota type is out-dated,
+ // thus we need to check to avoid NPE
+ if (_usedCapacity.containsKey(resourceType) && _usedCapacity.get(resourceType)
+ .containsKey(quotaType)) {
+ int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+ _usedCapacity.get(resourceType).put(quotaType, curUsage - 1);
+ }
+ _currentAssignments.remove(taskConfig.getId());
+ logger.info("Released task {} from instance {}", taskConfig.getId(),
+ _instanceConfig.getInstanceName());
}
/**
- * Returns taskID -> TaskAssignResult mappings.
+ * Returns a set of taskIDs
*/
- public Map<String, TaskAssignResult> getCurrentAssignments() {
+ public Set<String> getCurrentAssignments() {
return _currentAssignments;
}
@@ -96,4 +371,20 @@ public class AssignableInstance {
public String getInstanceName() {
return _instanceConfig.getInstanceName();
}
+
+ /**
+ * Returns total capacity of the AssignableInstance
+ * @return map{resourceType : map{quotaType : quota}}
+ */
+ public Map<String, Map<String, Integer>> getTotalCapacity() {
+ return _totalCapacity;
+ }
+
+ /**
+ * Returns used capacity of the AssignableInstance
+ * @return map{resourceType : map{quotaType : usedQuota}}
+ */
+ public Map<String, Map<String, Integer>> getUsedCapacity() {
+ return _usedCapacity;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
index 6d7b4e8..00d7db1 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssignResult.java
@@ -34,7 +34,10 @@ public class TaskAssignResult implements Comparable<TaskAssignResult> {
NO_SUCH_RESOURCE_TYPE,
// Required quota type is not configured
- NO_SUCH_QUOTA_TYPE
+ NO_SUCH_QUOTA_TYPE,
+
+ // Task cannot be assigned twice on a node without releasing it first
+ TASK_ALREADY_ASSIGNED
}
private final boolean _isAssignmentSuccessful;
http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
index 66614a2..79fbd64 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/TaskAssigner.java
@@ -28,8 +28,8 @@ public interface TaskAssigner {
* Assign a collection of tasks on a collection of assignableInstances.
* When an assignment decision is made, AssignableInstance.assign() must be called for the
* instance to modify its internal capacity profile.
- * @param assignableInstances
- * @param tasks
+ * @param assignableInstances String -> AssignableInstanceMapping
+ * @param tasks String -> TaskConfig
* @return taskID -> TaskAssignmentResult mapping per task
*/
Map<String, TaskAssignResult> assignTasks(Iterable<AssignableInstance> assignableInstances,
http://git-wip-us.apache.org/repos/asf/helix/blob/2049f93a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
new file mode 100644
index 0000000..02a0d39
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -0,0 +1,334 @@
+package org.apache.helix.task.assigner;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAssignableInstance {
+ private static final String testClusterName = "testCluster";
+ private static final String testInstanceName = "testInstance";
+
+ private static final String[] testResourceTypes = new String[] {"Resource1", "Resource2", "Resource3"};
+ private static final String[] testResourceCapacity = new String[] {"20", "50", "100"};
+
+ private static final String[] testQuotaTypes = new String[] {"Type1", "Type2", "Type3"};
+ private static final String[] testQuotaRatio = new String[] {"50", "30", "20"};
+ private static final String defaultQuotaRatio = "100";
+
+
+ @Test
+ public void testInvalidInitialization() {
+ try {
+ AssignableInstance ai = new AssignableInstance(null, null, null);
+ Assert.fail("Expecting IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("cannot be null"));
+ }
+
+ try {
+ ClusterConfig clusterConfig = new ClusterConfig("testCluster");
+ InstanceConfig instanceConfig = new InstanceConfig("instance");
+ LiveInstance liveInstance = new LiveInstance("another-instance");
+ AssignableInstance ai = new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
+ Assert.fail("Expecting IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("don't match"));
+ }
+ }
+
+ @Test
+ public void testInitializationWithQuotaUnset() {
+ // Initialize AssignableInstance with neither resource capacity nor quota ratio provided
+ AssignableInstance ai = new AssignableInstance(
+ createClusterConfig(null, null, false),
+ new InstanceConfig(testInstanceName),
+ createLiveInstance(null, null)
+ );
+ Assert.assertEquals(ai.getUsedCapacity().size(), 1);
+ Assert.assertEquals(
+ (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+ .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0);
+ Assert.assertEquals(
+ (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+ .get(TaskConfig.DEFAULT_QUOTA_TYPE), TaskStateModelFactory.TASK_THREADPOOL_SIZE);
+ Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
+ }
+
+ @Test
+ public void testInitializationWithOnlyCapacity() {
+ // Initialize AssignableInstance with only resource capacity provided
+ AssignableInstance ai = new AssignableInstance(
+ createClusterConfig(null, null, false),
+ new InstanceConfig(testInstanceName),
+ createLiveInstance(testResourceTypes, testResourceCapacity)
+ );
+ Assert.assertEquals(ai.getTotalCapacity().size(), testResourceTypes.length);
+ Assert.assertEquals(ai.getUsedCapacity().size(), testResourceTypes.length);
+ for (int i = 0; i < testResourceTypes.length; i++) {
+ Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]).size(), 1);
+ Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]).size(), 1);
+ Assert.assertEquals(
+ ai.getTotalCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE),
+ Integer.valueOf(testResourceCapacity[i])
+ );
+ Assert.assertEquals(
+ ai.getUsedCapacity().get(testResourceTypes[i]).get(TaskConfig.DEFAULT_QUOTA_TYPE),
+ Integer.valueOf(0)
+ );
+ }
+ }
+
+ @Test
+ public void testInitializationWithOnlyQuotaType() {
+ // Initialize AssignableInstance with only quota type provided
+ AssignableInstance ai = new AssignableInstance(
+ createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+ new InstanceConfig(testInstanceName),
+ createLiveInstance(null, null)
+ );
+
+ Assert.assertEquals(ai.getTotalCapacity().size(), 1);
+ Assert.assertEquals(ai.getUsedCapacity().size(), 1);
+ Assert.assertEquals(
+ ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
+ testQuotaTypes.length
+ );
+ Assert.assertEquals(
+ ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
+ testQuotaTypes.length
+ );
+ Assert.assertEquals(
+ ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
+ calculateExpectedQuotaPerType(TaskStateModelFactory.TASK_THREADPOOL_SIZE, testQuotaTypes,
+ testQuotaRatio));
+ Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
+ }
+
+ @Test
+ public void testInitializationWithQuotaAndCapacity() {
+ // Initialize AssignableInstance with both capacity and quota type provided
+ AssignableInstance ai = new AssignableInstance(
+ createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+ new InstanceConfig(testInstanceName),
+ createLiveInstance(testResourceTypes, testResourceCapacity)
+ );
+
+ Map<String, Integer> usedResourcePerType =
+ createResourceQuotaPerTypeMap(testQuotaTypes, new int[] { 0, 0, 0 });
+ for (int i = 0; i < testResourceTypes.length; i++) {
+ Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]),
+ calculateExpectedQuotaPerType(Integer.valueOf(testResourceCapacity[i]), testQuotaTypes,
+ testQuotaRatio));
+ Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]), usedResourcePerType);
+ }
+ }
+
+ @Test
+ public void testAssignableInstanceUpdateConfigs() {
+ AssignableInstance ai = new AssignableInstance(
+ createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+ new InstanceConfig(testInstanceName),
+ createLiveInstance(testResourceTypes, testResourceCapacity)
+ );
+
+ String[] newResources = new String[] {"Resource2", "Resource3", "Resource4"};
+ String[] newResourceCapacities = new String[] {"100", "150", "50"};
+
+ String[] newTypes = new String[] {"Type3", "Type4", "Type5", "Type6"};
+ String[] newTypeRatio = new String[] {"20", "40", "25", "25"};
+
+ LiveInstance newLiveInstance = createLiveInstance(newResources, newResourceCapacities);
+ ClusterConfig newClusterConfig = createClusterConfig(newTypes, newTypeRatio, false);
+ ai.updateConfigs(newClusterConfig, null, newLiveInstance);
+
+ Assert.assertEquals(ai.getUsedCapacity().size(), newResourceCapacities.length);
+ Assert.assertEquals(ai.getTotalCapacity().size(), newResourceCapacities.length);
+
+ for (int i = 0; i < newResources.length; i++) {
+ Assert.assertEquals(ai.getTotalCapacity().get(newResources[i]),
+ calculateExpectedQuotaPerType(Integer.valueOf(newResourceCapacities[i]), newTypes,
+ newTypeRatio));
+ Assert.assertEquals(ai.getUsedCapacity().get(newResources[i]),
+ createResourceQuotaPerTypeMap(newTypes, new int[] { 0, 0, 0, 0 }));
+ }
+ }
+
+ @Test
+ public void testNormalTryAssign() {
+ AssignableInstance ai = new AssignableInstance(
+ createClusterConfig(null, null, true),
+ new InstanceConfig(testInstanceName),
+ createLiveInstance(null, null)
+ );
+
+ // When nothing is configured, we should use default quota to assign
+ Map<String, TaskAssignResult> results = new HashMap<>();
+ for (int i = 0; i < TaskStateModelFactory.TASK_THREADPOOL_SIZE; i++) {
+ String taskId = Integer.toString(i);
+ TaskConfig task = new TaskConfig("", null, taskId, null);
+ TaskAssignResult result = ai.tryAssign(task);
+ Assert.assertTrue(result.isSuccessful());
+ ai.assign(result);
+ results.put(taskId, result);
+ }
+
+ // We are out of quota now and we should not be able to assign
+ String taskId = "TaskCannotAssign";
+ TaskConfig task = new TaskConfig("", null, taskId, null);
+ TaskAssignResult result = ai.tryAssign(task);
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+ try {
+ ai.assign(result);
+ Assert.fail("Expecting IllegalStateException");
+ } catch (IllegalStateException e) {
+ // OK
+ }
+
+ // After releasing 1 task, we should be able to schedule
+ ai.release(results.get("1").getTaskConfig());
+ result = ai.tryAssign(task);
+ Assert.assertTrue(result.isSuccessful());
+
+ // release all tasks, check remaining resources
+ for (TaskAssignResult rst : results.values()) {
+ ai.release(rst.getTaskConfig());
+ }
+
+ Assert.assertEquals(
+ (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
+ .get(TaskConfig.DEFAULT_QUOTA_TYPE), 0);
+ }
+
+ @Test
+ public void testTryAssignFailure() {
+ AssignableInstance ai =
+ new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
+ new InstanceConfig(testInstanceName),
+ createLiveInstance(testResourceTypes, testResourceCapacity));
+
+ // No such resource type
+ String taskId = "testTask";
+ TaskConfig task = new TaskConfig("", null, taskId, "");
+ TaskAssignResult result = ai.tryAssign(task);
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertEquals(result.getFailureReason(),
+ TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE);
+
+ // No such quota type
+ ai.updateConfigs(null, null, createLiveInstance(
+ new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+ new String[] { "1" }));
+
+ result = ai.tryAssign(task);
+ Assert.assertFalse(result.isSuccessful());
+ Assert
+ .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE);
+
+ ai.updateConfigs(createClusterConfig(testQuotaTypes, testQuotaRatio, true), null, null);
+
+ task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE);
+ result = ai.tryAssign(task);
+ Assert.assertTrue(result.isSuccessful());
+ ai.assign(result);
+ try {
+ ai.assign(result);
+ Assert.fail("Expecting IllegalArgumentException");
+ } catch (IllegalStateException e) {
+ // OK
+ }
+
+ // Duplicate assignment
+ result = ai.tryAssign(task);
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED);
+
+ // Insufficient quota
+ ai.release(task);
+ ai.updateConfigs(null, null, createLiveInstance(
+ new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name() },
+ new String[] { "0" }));
+
+ task.setQuotaType(TaskConfig.DEFAULT_QUOTA_TYPE);
+ result = ai.tryAssign(task);
+ Assert.assertFalse(result.isSuccessful());
+ Assert
+ .assertEquals(result.getFailureReason(), TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
+ }
+
+ private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) {
+ Map<String, Integer> ret = new HashMap<>();
+ for (int i = 0; i < types.length; i++) {
+ ret.put(types[i], quotas[i]);
+ }
+ return ret;
+ }
+
+ private Map<String, Integer> calculateExpectedQuotaPerType(int capacity, String[] quotaTypes,
+ String[] quotaRatios) {
+ Integer totalQuota = 0;
+ Map<String, Integer> expectedQuotaPerType = new HashMap<>();
+
+ for (String ratio : quotaRatios) {
+ totalQuota += Integer.valueOf(ratio);
+ }
+
+ for (int i = 0; i < quotaRatios.length; i++) {
+ expectedQuotaPerType.put(quotaTypes[i],
+ Math.round((float)capacity * Integer.valueOf(quotaRatios[i])
+ / totalQuota));
+ }
+ return expectedQuotaPerType;
+ }
+
+ private LiveInstance createLiveInstance(String[] resourceTypes, String[] resourceCapacity) {
+ LiveInstance li = new LiveInstance(testInstanceName);
+ if (resourceCapacity != null && resourceTypes != null) {
+ Map<String, String> resMap = new HashMap<>();
+ for (int i = 0; i < resourceCapacity.length; i++) {
+ resMap.put(resourceTypes[i], resourceCapacity[i]);
+ }
+ li.setResourceCapacityMap(resMap);
+ }
+ return li;
+ }
+
+ private ClusterConfig createClusterConfig(String[] quotaTypes, String[] quotaRatio,
+ boolean addDefaultQuota) {
+ ClusterConfig clusterConfig = new ClusterConfig(testClusterName);
+ if (quotaTypes != null && quotaRatio != null) {
+ for (int i = 0; i < quotaTypes.length; i++) {
+ clusterConfig.setTaskQuotaRatio(quotaTypes[i], quotaRatio[i]);
+ }
+ }
+ if (addDefaultQuota) {
+ clusterConfig.setTaskQuotaRatio(TaskConfig.DEFAULT_QUOTA_TYPE, defaultQuotaRatio);
+ }
+ return clusterConfig;
+ }
+}