You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/06/11 00:35:21 UTC
[helix] 02/03: Modify AssignableInstance for Configurable Thread
Pool Size (#1009)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 35af2639482ecb61adf61e618c8d0b788ac1b790
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Fri May 15 14:26:56 2020 -0700
Modify AssignableInstance for Configurable Thread Pool Size (#1009)
AssignableInstance used to assign a default value of 40 to its resource capacity (resource capacity has only one field: TASK_EXEC_THREAD). With the recent change related to configurable thread pool size, AssignableInstance should respect the reported thread pool size in LiveInstance's.
---
.../helix/task/assigner/AssignableInstance.java | 2 +-
.../task/assigner/TestAssignableInstance.java | 28 +++++++++++++++-------
2 files changed, 20 insertions(+), 10 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 194db41..67199a3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -113,7 +113,7 @@ public class AssignableInstance {
if (resourceCapacity == null) {
resourceCapacity = new HashMap<>();
resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
- Integer.toString(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE));
+ Integer.toString(_liveInstance.getCurrentTaskThreadPoolSize()));
logger.debug("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
_instanceConfig.getInstanceName(), resourceCapacity);
}
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 1dac153..fbbf06e 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -55,18 +55,20 @@ public class TestAssignableInstance extends AssignerTestBase {
@Test
public void testInitializationWithQuotaUnset() {
+ int expectedCurrentTaskThreadPoolSize = 100;
+ LiveInstance liveInstance = createLiveInstance(null, null);
+ liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
// Initialize AssignableInstance with neither resource capacity nor quota ratio provided
AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
- new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+ new InstanceConfig(testInstanceName), liveInstance);
Assert.assertEquals(ai.getUsedCapacity().size(), 1);
Assert.assertEquals(
(int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
- .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
- 0);
+ .get(AssignableInstance.DEFAULT_QUOTA_TYPE), 0);
Assert.assertEquals(
(int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
- .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
- TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+ .get(AssignableInstance.DEFAULT_QUOTA_TYPE), expectedCurrentTaskThreadPoolSize);
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@@ -91,10 +93,14 @@ public class TestAssignableInstance extends AssignerTestBase {
@Test
public void testInitializationWithOnlyQuotaType() {
+ int expectedCurrentTaskThreadPoolSize = 100;
+ LiveInstance liveInstance = createLiveInstance(null, null);
+ liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
// Initialize AssignableInstance with only quota type provided
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
- new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+ new InstanceConfig(testInstanceName), liveInstance);
Assert.assertEquals(ai.getTotalCapacity().size(), 1);
Assert.assertEquals(ai.getUsedCapacity().size(), 1);
@@ -106,7 +112,7 @@ public class TestAssignableInstance extends AssignerTestBase {
testQuotaTypes.length);
Assert.assertEquals(
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
- calculateExpectedQuotaPerType(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE, testQuotaTypes,
+ calculateExpectedQuotaPerType(expectedCurrentTaskThreadPoolSize, testQuotaTypes,
testQuotaRatio));
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@@ -171,12 +177,16 @@ public class TestAssignableInstance extends AssignerTestBase {
@Test
public void testNormalTryAssign() {
+ int testCurrentTaskThreadPoolSize = 100;
+ LiveInstance liveInstance = createLiveInstance(null, null);
+ liveInstance.setCurrentTaskThreadPoolSize(testCurrentTaskThreadPoolSize);
+
AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, true),
- new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+ new InstanceConfig(testInstanceName), liveInstance);
// When nothing is configured, we should use default quota type to assign
Map<String, TaskAssignResult> results = new HashMap<>();
- for (int i = 0; i < TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; i++) {
+ for (int i = 0; i < testCurrentTaskThreadPoolSize; i++) {
String taskId = Integer.toString(i);
TaskConfig task = new TaskConfig("", null, taskId, null);
TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);