You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/06/11 00:35:21 UTC

[helix] 02/03: Modify AssignableInstance for Configurable Thread Pool Size (#1009)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch task_pool
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 35af2639482ecb61adf61e618c8d0b788ac1b790
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Fri May 15 14:26:56 2020 -0700

    Modify AssignableInstance for Configurable Thread Pool Size (#1009)
    
    AssignableInstance used to assign a default value of 40 to its resource capacity (resource capacity has only one field: TASK_EXEC_THREAD). With the recent change related to configurable thread pool size, AssignableInstance should respect the reported thread pool size in LiveInstance's.
---
 .../helix/task/assigner/AssignableInstance.java    |  2 +-
 .../task/assigner/TestAssignableInstance.java      | 28 +++++++++++++++-------
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
index 194db41..67199a3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
+++ b/helix-core/src/main/java/org/apache/helix/task/assigner/AssignableInstance.java
@@ -113,7 +113,7 @@ public class AssignableInstance {
     if (resourceCapacity == null) {
       resourceCapacity = new HashMap<>();
       resourceCapacity.put(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name(),
-          Integer.toString(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE));
+          Integer.toString(_liveInstance.getCurrentTaskThreadPoolSize()));
       logger.debug("No resource capacity provided in LiveInstance {}, assuming default capacity: {}",
           _instanceConfig.getInstanceName(), resourceCapacity);
     }
diff --git a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
index 1dac153..fbbf06e 100644
--- a/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
+++ b/helix-core/src/test/java/org/apache/helix/task/assigner/TestAssignableInstance.java
@@ -55,18 +55,20 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testInitializationWithQuotaUnset() {
+    int expectedCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
     // Initialize AssignableInstance with neither resource capacity nor quota ratio provided
     AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
-        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+        new InstanceConfig(testInstanceName), liveInstance);
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
     Assert.assertEquals(
         (int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        0);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE), 0);
     Assert.assertEquals(
         (int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
-            .get(AssignableInstance.DEFAULT_QUOTA_TYPE),
-        TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE);
+            .get(AssignableInstance.DEFAULT_QUOTA_TYPE), expectedCurrentTaskThreadPoolSize);
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
 
@@ -91,10 +93,14 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testInitializationWithOnlyQuotaType() {
+    int expectedCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
+
     // Initialize AssignableInstance with only quota type provided
     AssignableInstance ai =
         new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
-            new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+            new InstanceConfig(testInstanceName), liveInstance);
 
     Assert.assertEquals(ai.getTotalCapacity().size(), 1);
     Assert.assertEquals(ai.getUsedCapacity().size(), 1);
@@ -106,7 +112,7 @@ public class TestAssignableInstance extends AssignerTestBase {
         testQuotaTypes.length);
     Assert.assertEquals(
         ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
-        calculateExpectedQuotaPerType(TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE, testQuotaTypes,
+        calculateExpectedQuotaPerType(expectedCurrentTaskThreadPoolSize, testQuotaTypes,
             testQuotaRatio));
     Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
   }
@@ -171,12 +177,16 @@ public class TestAssignableInstance extends AssignerTestBase {
 
   @Test
   public void testNormalTryAssign() {
+    int testCurrentTaskThreadPoolSize = 100;
+    LiveInstance liveInstance = createLiveInstance(null, null);
+    liveInstance.setCurrentTaskThreadPoolSize(testCurrentTaskThreadPoolSize);
+
     AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, true),
-        new InstanceConfig(testInstanceName), createLiveInstance(null, null));
+        new InstanceConfig(testInstanceName), liveInstance);
 
     // When nothing is configured, we should use default quota type to assign
     Map<String, TaskAssignResult> results = new HashMap<>();
-    for (int i = 0; i < TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE; i++) {
+    for (int i = 0; i < testCurrentTaskThreadPoolSize; i++) {
       String taskId = Integer.toString(i);
       TaskConfig task = new TaskConfig("", null, taskId, null);
       TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);