You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/05/05 17:58:08 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

jiajunwang commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420288114



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       in addition to what Huizhi said, shall we make it 40 for backward compatibility?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       To be safe, add a sync control on this block. There is no guarantee we won't call createNewStateModel in parallel in the future.
   And better to put all the logics inside the initializeTaskExecutor().

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {
+      int taskThreadPoolSize = getTaskThreadPoolSize();
+      updateLiveInstanceWithThreadPoolSize(taskThreadPoolSize);

Review comment:
       Curious about this design.
   The createNewStateModel() is called when we have partition assigned, right? But I think the controller needs to know the pool size before assign the tasks. How it works?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -41,23 +47,15 @@
 
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
-  private final ScheduledExecutorService _taskExecutor;
+  private ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    this(manager, taskFactoryRegistry, null);

Review comment:
       As mentioned below, I don't think we like lazy initialize the executor. Any reason for this change?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +106,73 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private int getTaskThreadPoolSize() {
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(_manager.getClusterName(), _manager.getInstanceName());
+    if (instanceConfig != null) {
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(_manager.getClusterName());
+    if (clusterConfig != null) {
+      int targetTaskThreadPoolSize = clusterConfig.getDefaultTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  /*
+   * Checks against the default values of -1 when pool sizes are not defined; we don't want -1's
+   */
+  private static boolean verifyTargetThreadPoolSize(int targetTaskThreadPoolSize) {
+    return targetTaskThreadPoolSize > 0;
+  }
+
+  /*
+   * Update LiveInstance with the current used thread pool size
+   */
+  private void updateLiveInstanceWithThreadPoolSize(int taskThreadPoolSize) {

Review comment:
       Instead of updating after the live node created, we shall ensure it is included when the node is created the first time. Or we may have some serious issue that the controller violates the capacity limitation.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +106,73 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private int getTaskThreadPoolSize() {
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(_manager.getClusterName(), _manager.getInstanceName());
+    if (instanceConfig != null) {
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(_manager.getClusterName());
+    if (clusterConfig != null) {
+      int targetTaskThreadPoolSize = clusterConfig.getDefaultTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  /*
+   * Checks against the default values of -1 when pool sizes are not defined; we don't want -1's

Review comment:
       You are also excluding 0. Let's make the comment clear and fitting the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org