You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/25 00:47:09 UTC
helix git commit: [HELIX-744] Allow undefined workflow/job types to
be assigned as DEFAULT type
Repository: helix
Updated Branches:
refs/heads/master c012c7b9d -> 675904095
[HELIX-744] Allow undefined workflow/job types to be assigned as DEFAULT type
Previously, we ignored undefined types, that is workflow/job types that are not defined in ClusterConfig. This is not backward-compatible because some users of Task Framework are setting types without any quota-related config set in ClusterConfig. The default behavior was changed so that each AssignableInstance will just treat these workflows/jobs as DEFAULT type, which will make quota-based scheduling backward-compatible.
Changelist:
1. AssignableInstance treats undefined types as DEFAULT
2. Appropriate log messages and logic change was applied to restoreTaskAssignResult logic
3. A test case was added to TestQuotaBasedScheduling
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/67590409
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/67590409
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/67590409
Branch: refs/heads/master
Commit: 6759040956907b197b6d65cb8b4759b7e8981883
Parents: c012c7b
Author: Hunter Lee <na...@gmail.com>
Authored: Tue Jul 24 14:06:20 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Tue Jul 24 17:45:50 2018 -0700
----------------------------------------------------------------------
.../helix/task/assigner/AssignableInstance.java | 67 ++++++----
.../task/TestQuotaBasedScheduling.java | 123 ++++++++++---------
.../task/assigner/TestAssignableInstance.java | 26 ++--
3 files changed, 115 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/67590409/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index b13bb61..a1f2fd4 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -212,7 +212,8 @@ public class AssignableInstance {
"Cannot update live instance with different instance name. Current: {}; new: {}",
_instanceConfig.getInstanceName(), liveInstance.getInstanceName());
} else {
- if (liveInstance.getResourceCapacityMap() != null && !liveInstance.getResourceCapacityMap().equals(_liveInstance.getResourceCapacityMap())) {
+ if (liveInstance.getResourceCapacityMap() != null && !liveInstance.getResourceCapacityMap()
+ .equals(_liveInstance.getResourceCapacityMap())) {
refreshCapacity = true;
}
_liveInstance = liveInstance;
@@ -281,20 +282,17 @@ public class AssignableInstance {
resourceType, _totalCapacity.keySet()));
}
- // Fail when no such quota type. However, if quotaType is null, treat it as DEFAULT
+ // If quotaType is null, treat it as DEFAULT
if (quotaType == null || quotaType.equals("")) {
quotaType = DEFAULT_QUOTA_TYPE;
}
if (!_totalCapacity.get(resourceType).containsKey(quotaType)) {
logger.warn(
- "AssignableInstance does not support the given quotaType: {}. Task: {}, quotaType: {}, Instance name: {}",
+ "AssignableInstance does not support the given quotaType: {}. Task: {}, quotaType: {}, Instance name: {}. Task will be assigned as DEFAULT type.",
quotaType, task.getId(), quotaType, getInstanceName());
+ quotaType = DEFAULT_QUOTA_TYPE;
- return new TaskAssignResult(task, quotaType, this, false, 0,
- TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE,
- String.format("Requested quota type %s not defined. Available quota types: %s", quotaType,
- _totalCapacity.get(resourceType).keySet()));
}
int capacity = _totalCapacity.get(resourceType).get(quotaType);
@@ -353,16 +351,25 @@ public class AssignableInstance {
// Resource type / quota type might have already changed, i.e. we are recovering
// current assignments for a live instance, but currently running tasks's quota
// type has already been removed by user. So we do the deduction with best effort
- if (_usedCapacity.containsKey(resourceType)
- && _usedCapacity.get(resourceType).containsKey(quotaType)) {
- int curUsage = _usedCapacity.get(resourceType).get(quotaType);
- _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+ // Note that if the quota type is not found within the resource, task must have been scheduled
+ // to DEFAULT type
+ // because we schedule undefined types to DEFAULT, so we increment usage to DEFAULT
+ if (_usedCapacity.containsKey(resourceType)) {
+ // Check that this resourceType contains the given quotaType
+ if (_usedCapacity.get(resourceType).containsKey(quotaType)) {
+ int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+ _usedCapacity.get(resourceType).put(quotaType, curUsage + 1);
+ } else {
+ // quotaType is not found, treat it as DEFAULT
+ int curUsage = _usedCapacity.get(resourceType).get(AssignableInstance.DEFAULT_QUOTA_TYPE);
+ _usedCapacity.get(resourceType).put(AssignableInstance.DEFAULT_QUOTA_TYPE, curUsage + 1);
+ }
} else {
+ // resourceType is not found. Leave a warning log and will not touch quota
logger.warn(
- "Task's requested resource type and quota type is no longer supported. TaskConfig: %s; UsedCapacity: %s",
- result.getTaskConfig(), _usedCapacity);
+ "Task's requested resource type is not supported. TaskConfig: %s; UsedCapacity: %s; ResourceType: %s",
+ result.getTaskConfig(), _usedCapacity, resourceType);
}
-
logger.info("Assigned task {} to instance {}", result.getTaskConfig().getId(),
_instanceConfig.getInstanceName());
}
@@ -376,14 +383,15 @@ public class AssignableInstance {
* @param taskConfig config of this task
* @param quotaType quota type this task belongs to
*/
- public synchronized boolean release(TaskConfig taskConfig, String quotaType) {
+ public synchronized void release(TaskConfig taskConfig, String quotaType) {
if (!_currentAssignments.contains(taskConfig.getId())) {
logger.warn("Task {} is not assigned on instance {}", taskConfig.getId(),
_instanceConfig.getInstanceName());
- return false;
+ return;
}
if (quotaType == null) {
- logger.warn("Task {}'s quotaType is null. Trying to release as DEFAULT type.", taskConfig.getId());
+ logger.warn("Task {}'s quotaType is null. Trying to release as DEFAULT type.",
+ taskConfig.getId());
quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
}
@@ -391,16 +399,23 @@ public class AssignableInstance {
// We might be releasing a task whose resource requirement / quota type is out-dated,
// thus we need to check to avoid NPE
- if (_usedCapacity.containsKey(resourceType)
- && _usedCapacity.get(resourceType).containsKey(quotaType)) {
- int curUsage = _usedCapacity.get(resourceType).get(quotaType);
- _usedCapacity.get(resourceType).put(quotaType, curUsage - 1);
- _currentAssignments.remove(taskConfig.getId());
- logger.info("Released task {} from instance {}", taskConfig.getId(),
- _instanceConfig.getInstanceName());
- return true;
+ if (_usedCapacity.containsKey(resourceType)) {
+ if (_usedCapacity.get(resourceType).containsKey(quotaType)) {
+ int curUsage = _usedCapacity.get(resourceType).get(quotaType);
+ _usedCapacity.get(resourceType).put(quotaType, curUsage - 1);
+ } else {
+ // This task must have run as DEFAULT type because it was not found in the quota config
+ // So make adjustments for DEFAULT
+ int curUsage = _usedCapacity.get(resourceType).get(AssignableInstance.DEFAULT_QUOTA_TYPE);
+ _usedCapacity.get(resourceType).put(AssignableInstance.DEFAULT_QUOTA_TYPE, curUsage - 1);
+ }
}
- return false;
+
+ // If the resource type is not found, we just remove from currentAssignments since no adjustment
+ // can be made
+ _currentAssignments.remove(taskConfig.getId());
+ logger.info("Released task {} from instance {}", taskConfig.getId(),
+ _instanceConfig.getInstanceName());
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/67590409/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index 9b2ec9e..db471ff 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -21,9 +21,11 @@ package org.apache.helix.integration.task;
import com.google.common.collect.Maps;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -44,6 +46,7 @@ import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.tools.ClusterSetup;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -55,6 +58,8 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
private static final String JOB_COMMAND = "DummyCommand";
private Map<String, String> _jobCommandMap;
private Map<String, Integer> _quotaTypeExecutionCount = new ConcurrentHashMap<>();
+ private Set<String> _availableQuotaTypes =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private boolean _finishTask = false;
@BeforeClass
@@ -129,6 +134,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
@BeforeMethod
public void beforeMethod() {
_quotaTypeExecutionCount.clear();
+ _availableQuotaTypes.clear();
_finishTask = false;
}
@@ -165,6 +171,44 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
}
/**
+ * Tests whether jobs with undefined types (not found in ClusterConfig) run as DEFAULT.
+ * @throws InterruptedException
+ */
+ @Test(dependsOnMethods = "testSchedulingWithoutQuota")
+ public void testSchedulingUndefinedTypes() throws InterruptedException {
+ ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+ clusterConfig.resetTaskQuotaRatioMap();
+ clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 1);
+ clusterConfig.setTaskQuotaRatio("A", 1);
+ clusterConfig.setTaskQuotaRatio("B", 1);
+ _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+ _availableQuotaTypes = clusterConfig.getTaskQuotaRatioMap().keySet();
+
+ String workflowName = TestHelper.getTestMethodName();
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+ WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName);
+ configBuilder.setAllowOverlapJobAssignment(true);
+ workflowBuilder.setWorkflowConfig(configBuilder.build());
+
+ for (int i = 0; i < 10; i++) {
+ List<TaskConfig> taskConfigs = new ArrayList<>();
+ taskConfigs.add(new TaskConfig("ShortTask", new HashMap<String, String>()));
+ JobConfig.Builder jobConfigBulider =
+ new JobConfig.Builder().setCommand(JOB_COMMAND).addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(_jobCommandMap).setJobType("UNDEFINED");
+ workflowBuilder.addJob("JOB" + i, jobConfigBulider);
+ }
+
+ _driver.start(workflowBuilder.build());
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+ // Check run counts for each quota type
+ Assert.assertEquals((int) _quotaTypeExecutionCount.get("DEFAULT"), 10);
+ Assert.assertFalse(_quotaTypeExecutionCount.containsKey("A"));
+ Assert.assertFalse(_quotaTypeExecutionCount.containsKey("B"));
+ }
+
+ /**
* Tests whether jobs with quotas can run successfully.
* @throws InterruptedException
*/
@@ -176,6 +220,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
clusterConfig.setTaskQuotaRatio("A", 1);
clusterConfig.setTaskQuotaRatio("B", 1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+ _availableQuotaTypes = clusterConfig.getTaskQuotaRatioMap().keySet();
String workflowName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
@@ -229,6 +274,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
clusterConfig.setTaskQuotaRatio("B", 10);
clusterConfig.setTaskQuotaRatio("C", 9);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+ _availableQuotaTypes = clusterConfig.getTaskQuotaRatioMap().keySet();
String workflowName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
@@ -300,6 +346,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
clusterConfig.setTaskQuotaRatio("A", 10);
clusterConfig.setTaskQuotaRatio("B", 10);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+ _availableQuotaTypes = clusterConfig.getTaskQuotaRatioMap().keySet();
String workflowName = TestHelper.getTestMethodName();
Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
@@ -363,58 +410,6 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
}
/**
- * Tests that jobs belonging to a quota type that is not defined in ClusterConfig do not get
- * scheduled. That is, the job with an invalid quota type should never complete (because its tasks
- * may be assigned but never actually scheduled).
- * @throws InterruptedException
- */
- @Test(dependsOnMethods = "testSchedulingWithoutQuota")
- public void testNotSchedulingInvalidQuotaType() throws InterruptedException {
- ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
- clusterConfig.resetTaskQuotaRatioMap();
- clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 1);
- clusterConfig.setTaskQuotaRatio("A", 19);
- _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
-
- String workflowName = TestHelper.getTestMethodName();
- Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
- WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName);
- configBuilder.setAllowOverlapJobAssignment(true);
- workflowBuilder.setWorkflowConfig(configBuilder.build());
-
- // Create two jobs, JOB_A belonging to quotaType A and JOB_B to quotaType B (not defined)
-
- // JOB_A
- List<TaskConfig> taskConfigsA = new ArrayList<>();
- for (int i = 0; i < 1; i++) {
- Map<String, String> taskConfigMap = Maps.newHashMap();
- taskConfigsA.add(new TaskConfig("ShortTask", taskConfigMap));
- }
- JobConfig.Builder jobBuilderA = new JobConfig.Builder().setCommand(JOB_COMMAND)
- .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigsA).setJobType("A");
- workflowBuilder.addJob("JOB_A", jobBuilderA);
-
- // JOB_B
- List<TaskConfig> taskConfigsB = new ArrayList<>();
- for (int i = 0; i < 1; i++) {
- Map<String, String> taskConfigMap = Maps.newHashMap();
- taskConfigsB.add(new TaskConfig("ShortTask", taskConfigMap));
- }
- JobConfig.Builder jobBuilderB = new JobConfig.Builder().setCommand(JOB_COMMAND)
- .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigsB).setJobType("B");
- workflowBuilder.addJob("JOB_B", jobBuilderB);
-
- _driver.start(workflowBuilder.build());
- // Wait until JOB_A is correctly scheduled and complete
- _driver.pollForJobState(workflowName, workflowName + "_JOB_A", TaskState.COMPLETED);
-
- // Check that JOB_B is still in progress and does not finish due to tasks not being scheduled
- TaskState jobState =
- _driver.getWorkflowContext(workflowName).getJobState(workflowName + "_JOB_B");
- Assert.assertEquals(jobState, TaskState.IN_PROGRESS);
- }
-
- /**
* Tests that by repeatedly scheduling workflows and jobs that there is no thread leak when there
* are a multidude of successful and failed tests. The number of total tasks run must be well
* above the number of total thread capacity.
@@ -428,6 +423,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 1);
clusterConfig.setTaskQuotaRatio("A", 1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+ _availableQuotaTypes = clusterConfig.getTaskQuotaRatioMap().keySet();
List<String> workflowNames = new ArrayList<>();
@@ -440,7 +436,6 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
boolean shouldOverlapJobAssign = i % 3 == 1; // Alternate between true and false
String quotaType = (i % 2 == 1) ? null : "A"; // Alternate between null (DEFAULT) and A
String taskType = (i % 3 == 1) ? "FailTask" : "ShortTask"; // Some tasks will fail
- // String taskType = "ShortTask";
String workflowName = TestHelper.getTestMethodName() + "_" + i;
workflowNames.add(workflowName); // For polling the state for these workflows
@@ -458,7 +453,6 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
for (int i = 0; i < numWorkflows; i++) {
String workflowName = workflowNames.get(i);
TaskState state = (i % 3 == 1) ? TaskState.FAILED : TaskState.COMPLETED;
- // TaskState state = TaskState.COMPLETED;
Assert.assertEquals(_driver.getWorkflowContext(_manager, workflowName).getWorkflowState(),
state);
}
@@ -481,6 +475,7 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
clusterConfig.setTaskQuotaRatio("A", 1);
clusterConfig.setTaskQuotaRatio("B", 1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+ _availableQuotaTypes = clusterConfig.getTaskQuotaRatioMap().keySet();
String queueName = TestHelper.getTestMethodName();
@@ -566,9 +561,8 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
Map<String, String> taskConfigMap = new HashMap<>();
taskConfigs.add(new TaskConfig(taskType, taskConfigMap));
}
- JobConfig.Builder jobBuilder =
- new JobConfig.Builder().setCommand(JOB_COMMAND).setJobCommandConfigMap(_jobCommandMap)
- .addTaskConfigs(taskConfigs).setJobType(quotaType);
+ JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand(JOB_COMMAND)
+ .setJobCommandConfigMap(_jobCommandMap).addTaskConfigs(taskConfigs).setJobType(quotaType);
workflowBuilder.addJob(jobName, jobBuilder);
}
return workflowBuilder.build();
@@ -579,12 +573,15 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
*/
private class ShortTask extends MockTask {
private final String _instanceName;
- private final String _quotaType;
+ private String _quotaType;
ShortTask(TaskCallbackContext context, String instanceName) {
super(context);
_instanceName = instanceName;
_quotaType = context.getJobConfig().getJobType();
+ if (_quotaType != null && !_availableQuotaTypes.contains(_quotaType)) {
+ _quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+ }
// Initialize the count for this quotaType if not already done
if (_quotaType != null && !_quotaTypeExecutionCount.containsKey(_quotaType)) {
_quotaTypeExecutionCount.put(_quotaType, 0);
@@ -606,12 +603,15 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
*/
private class LongTask extends MockTask {
private final String _instanceName;
- private final String _quotaType;
+ private String _quotaType;
LongTask(TaskCallbackContext context, String instanceName) {
super(context);
_instanceName = instanceName;
_quotaType = context.getJobConfig().getJobType();
+ if (_quotaType != null && !_availableQuotaTypes.contains(_quotaType)) {
+ _quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+ }
// Initialize the count for this quotaType if not already done
if (_quotaType != null && !_quotaTypeExecutionCount.containsKey(_quotaType)) {
_quotaTypeExecutionCount.put(_quotaType, 0);
@@ -641,12 +641,15 @@ public class TestQuotaBasedScheduling extends TaskTestBase {
*/
private class FailTask extends MockTask {
private final String _instanceName;
- private final String _quotaType;
+ private String _quotaType;
FailTask(TaskCallbackContext context, String instanceName) {
super(context);
_instanceName = instanceName;
_quotaType = context.getJobConfig().getJobType();
+ if (_quotaType != null && !_availableQuotaTypes.contains(_quotaType)) {
+ _quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+ }
// Initialize the count for this quotaType if not already done
if (_quotaType != null && !_quotaTypeExecutionCount.containsKey(_quotaType)) {
_quotaTypeExecutionCount.put(_quotaType, 0);
http://git-wip-us.apache.org/repos/asf/helix/blob/67590409/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 0f12aea..5144bb6 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -80,9 +80,8 @@ public class TestAssignableInstance extends AssignerTestBase {
for (int i = 0; i < testResourceTypes.length; i++) {
Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]).size(), 1);
Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]).size(), 1);
- Assert.assertEquals(
- ai.getTotalCapacity().get(testResourceTypes[i]).get(AssignableInstance.DEFAULT_QUOTA_TYPE),
- Integer.valueOf(testResourceCapacity[i]));
+ Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i])
+ .get(AssignableInstance.DEFAULT_QUOTA_TYPE), Integer.valueOf(testResourceCapacity[i]));
Assert.assertEquals(
ai.getUsedCapacity().get(testResourceTypes[i]).get(AssignableInstance.DEFAULT_QUOTA_TYPE),
Integer.valueOf(0));
@@ -230,18 +229,12 @@ public class TestAssignableInstance extends AssignerTestBase {
Assert.assertEquals(result.getFailureReason(),
TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE);
- // No such quota type
ai.updateConfigs(null, null, createLiveInstance(new String[] {
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
}, new String[] {
"1"
}));
- result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
- Assert.assertFalse(result.isSuccessful());
- Assert.assertEquals(result.getFailureReason(),
- TaskAssignResult.FailureReason.NO_SUCH_QUOTA_TYPE);
-
ai.updateConfigs(createClusterConfig(testQuotaTypes, testQuotaRatio, true), null, null);
result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
@@ -280,9 +273,11 @@ public class TestAssignableInstance extends AssignerTestBase {
public void testRestoreTaskAssignResult() {
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, true),
- new InstanceConfig(testInstanceName), createLiveInstance(
- new String[] { LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()},
- new String[] { "40" }));
+ new InstanceConfig(testInstanceName), createLiveInstance(new String[] {
+ LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
+ }, new String[] {
+ "40"
+ }));
Map<String, TaskConfig> currentAssignments = new HashMap<>();
TaskConfig supportedTask = new TaskConfig("", null, "supportedTask", "");
@@ -298,8 +293,7 @@ public class TestAssignableInstance extends AssignerTestBase {
String quotaType = (taskID.equals("supportedTask")) ? AssignableInstance.DEFAULT_QUOTA_TYPE
: "UnsupportedQuotaType";
// Restore TaskAssignResult
- TaskAssignResult taskAssignResult =
- ai.restoreTaskAssignResult(taskID, taskConfig, quotaType);
+ TaskAssignResult taskAssignResult = ai.restoreTaskAssignResult(taskID, taskConfig, quotaType);
if (taskAssignResult.isSuccessful()) {
results.put(taskID, taskAssignResult);
}
@@ -310,10 +304,12 @@ public class TestAssignableInstance extends AssignerTestBase {
Assert.assertEquals(rst.getAssignableInstance(), ai);
}
Assert.assertEquals(ai.getCurrentAssignments().size(), 2);
+ // The expected value for the following should be 2, not 1 because the unsupported task should
+ // also have been assigned as a DEFAULT task
Assert.assertEquals(
(int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
.get(AssignableInstance.DEFAULT_QUOTA_TYPE),
- 1);
+ 2);
}
private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) {