You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/04/27 21:09:12 UTC

[GitHub] [helix] NealSun96 opened a new pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

NealSun96 opened a new pull request #973:
URL: https://github.com/apache/helix/pull/973


   ### Issues
   
   - [x] My PR addresses the following Helix issues and references them in the PR description:
   
   Fixes #972
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI changes:
   
   We are improving the task framework by allowing task thread pools to have configurable sizes. As a part of the participant side change, `TaskStateModelFactory` is changed to allocate thread pool sizes based on values provided in `InstanceConfig` or `ClusterConfig`.
   
   Both `InstanceConfig` and `ClusterConfig` are now expected to contain a field keyed as "TARGET_TASK_THREAD_POOL_SIZE", where the value for `InstanceConfig` only determines the thread pool size for the specified instance, and the value for `ClusterConfig` determines the thread pool sizes for all instances of that cluster. If `InstanceConfig` isn't able to provide the pool size, then `ClusterConfig` will be used to determine the pool size; if neither of them are able to provide the pool size, `TaskStateModelFactory` will use a default value.
   
   ### Tests
   
   - [] The following is the result of the "mvn test" command on the appropriate module:
   
   
   ### Commits
   
   - [] My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - [x] My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on pull request #973:
URL: https://github.com/apache/helix/pull/973#issuecomment-623714094


   > Just to confirm, will the logic to report the current pool size be in a different PR?
   
   Yes it will be a different PR. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416964720



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if InstanceConfig doesn't exist.
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(manager.getClusterName());
+      int targetTaskThreadPoolSize = clusterConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if ClusterConfig doesn't exist.
+    }
+
+    return DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  private static boolean _verifyTargetThreadPoolSize(int targetTaskThreadPoolSize) {

Review comment:
       This is to check against the default value: when the target thread pool size is not specified in the configs, a -1 is returned from `getTargetTaskThreadPoolSize()`. This check makes sure the default value is filtered out. 
   
   I don't see any other good way to do `getIntField()` while handling the "not defined" case differently, since `getIntField()` always asks for a default value; if `getIntField()` throws an exception when the value is not defined it's much cleaner. This is the best solution I came up with while using `getIntField()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r419686031



##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -137,6 +140,7 @@
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       +1 to what @jiajunwang  said.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,36 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+    if (instanceConfig != null) {

Review comment:
       I am okay with returning the default size because creating a thread pool with a default size won't be the end of the world and won't affect the user too much. And the user can always try to set it again or restart if they want to change the thread pool size. 
   
   But we should make it clear to the user. I don't see any log or notification here, so that could be a problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420975485



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -190,6 +192,24 @@ public void setWebserviceUrl(String url) {
     _record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(), url);
   }
 
+  /**
+   * Get the current task thread pool size of the instance
+   * @return the current task thread pool size
+   */
+  public int getCurrentTaskThreadPoolSize() {
+    return _record.getIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET);

Review comment:
       Discussed offline: this is meant to be a bit signaling an error, because field should always be set. But it's true that old participants don't have this field. I'm using the default 40 instead as there's no better options in that case. Thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 edited a comment on pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 edited a comment on pull request #973:
URL: https://github.com/apache/helix/pull/973#issuecomment-623714094


   > Just to confirm, will the logic to report the current pool size be in a different PR?
   
   Current pool size reporting is added to this PR. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420328436



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {
+      int taskThreadPoolSize = getTaskThreadPoolSize();
+      updateLiveInstanceWithThreadPoolSize(taskThreadPoolSize);

Review comment:
       Reading your later comment, I think I know what you mean by "controller needs to know the pool size". Let me think about this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420428755



##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -137,6 +141,7 @@
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public static final int DEFAULT_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       I find myself repeating this again, why does this field need to be public?

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,32 @@ public void setInstanceCapacityKeys(List<String> capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
   }
 
+  /**
+   * Get the default target size of task thread pools. This values applies to participants and is
+   * overwritten by participants' own values if they specified individual pool sizes in
+   * InstanceConfigs
+   * @return the target size of task thread pool
+   */
+  public int getDefaultTargetTaskThreadPoolSize() {
+    return _record.getIntField(
+        ClusterConfig.ClusterConfigProperty.DEFAULT_TARGET_TASK_THREAD_POOL_SIZE.name(),

Review comment:
       We're confusing terminology here. Let's please try to be exact:
   
   What you want here is the global target task thread pool size, not default.
   
   The "default" value refers to 40. It's okay to use the NOT_SET bit and resort to 40 at initialization time.

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,32 @@ public void setInstanceCapacityKeys(List<String> capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
   }
 
+  /**
+   * Get the default target size of task thread pools. This values applies to participants and is
+   * overwritten by participants' own values if they specified individual pool sizes in
+   * InstanceConfigs
+   * @return the target size of task thread pool
+   */
+  public int getDefaultTargetTaskThreadPoolSize() {
+    return _record.getIntField(
+        ClusterConfig.ClusterConfigProperty.DEFAULT_TARGET_TASK_THREAD_POOL_SIZE.name(),
+        DEFAULT_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the default target size of task thread pools for this cluster.
+   * @param defaultTargetTaskThreadPoolSize - the new target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size is not greater than 0
+   */
+  public void setDefaultTargetTaskThreadPoolSize(int defaultTargetTaskThreadPoolSize)

Review comment:
       Fix the name.

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -109,7 +109,11 @@
     // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    // The target size of task thread pools for each participant. This is the "default" value

Review comment:
       "global" value to the cluster

##########
File path: helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
##########
@@ -56,11 +56,13 @@
     DOMAIN,
     DELAY_REBALANCE_ENABLED,
     MAX_CONCURRENT_TASK,
-    INSTANCE_CAPACITY_MAP
+    INSTANCE_CAPACITY_MAP,
+    TARGET_TASK_THREAD_POOL_SIZE
   }
 
   public static final int WEIGHT_NOT_SET = -1;
   public static final int MAX_CONCURRENT_TASK_NOT_SET = -1;
+  public static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       private?

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,32 @@ public void setInstanceCapacityKeys(List<String> capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
   }
 
+  /**
+   * Get the default target size of task thread pools. This values applies to participants and is
+   * overwritten by participants' own values if they specified individual pool sizes in
+   * InstanceConfigs
+   * @return the target size of task thread pool
+   */
+  public int getDefaultTargetTaskThreadPoolSize() {
+    return _record.getIntField(
+        ClusterConfig.ClusterConfigProperty.DEFAULT_TARGET_TASK_THREAD_POOL_SIZE.name(),
+        DEFAULT_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the default target size of task thread pools for this cluster.
+   * @param defaultTargetTaskThreadPoolSize - the new target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size is not greater than 0
+   */
+  public void setDefaultTargetTaskThreadPoolSize(int defaultTargetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (defaultTargetTaskThreadPoolSize <= 0) {
+      throw new IllegalArgumentException("targetTaskThreadPoolSize must be greater than 0!");

Review comment:
       Fix the message.

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,32 @@ public void setInstanceCapacityKeys(List<String> capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
   }
 
+  /**
+   * Get the default target size of task thread pools. This values applies to participants and is
+   * overwritten by participants' own values if they specified individual pool sizes in
+   * InstanceConfigs
+   * @return the target size of task thread pool
+   */
+  public int getDefaultTargetTaskThreadPoolSize() {
+    return _record.getIntField(
+        ClusterConfig.ClusterConfigProperty.DEFAULT_TARGET_TASK_THREAD_POOL_SIZE.name(),
+        DEFAULT_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the default target size of task thread pools for this cluster.

Review comment:
       Not default, but global value. The default value for the global config should be 40.

##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -190,6 +192,24 @@ public void setWebserviceUrl(String url) {
     _record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(), url);
   }
 
+  /**
+   * Get the current task thread pool size of the instance
+   * @return the current task thread pool size
+   */
+  public int getCurrentTaskThreadPoolSize() {
+    return _record.getIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET);

Review comment:
       Please try to understand the proposed design well. This suggests to me that you should perhaps spend more time trying to understand what this field represents.
   
   For example, what does it mean for an instance to have a task thread pool of size -1? (it is meaningless, we shouldn't create a task pool with size -1).

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -109,7 +109,11 @@
     // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    // The target size of task thread pools for each participant. This is the "default" value
+    //that's used when participants don't specify their individual pool sizes.

Review comment:
       Nit: fix the comment (space)

##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       This field is not necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420356162



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       @jiajunwang This is an interesting point, but wouldn't it be dangerous to use a default value like 40? If the actual thread pool is 20 and the write to LiveInstance fails, then the controller will be looking at an exaggerated number. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420321054



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +106,73 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private int getTaskThreadPoolSize() {
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(_manager.getClusterName(), _manager.getInstanceName());
+    if (instanceConfig != null) {
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(_manager.getClusterName());
+    if (clusterConfig != null) {
+      int targetTaskThreadPoolSize = clusterConfig.getDefaultTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  /*
+   * Checks against the default values of -1 when pool sizes are not defined; we don't want -1's
+   */
+  private static boolean verifyTargetThreadPoolSize(int targetTaskThreadPoolSize) {
+    return targetTaskThreadPoolSize > 0;
+  }
+
+  /*
+   * Update LiveInstance with the current used thread pool size
+   */
+  private void updateLiveInstanceWithThreadPoolSize(int taskThreadPoolSize) {

Review comment:
       This is a good point. Let me think about this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420971827



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {
+      int taskThreadPoolSize = getTaskThreadPoolSize();
+      updateLiveInstanceWithThreadPoolSize(taskThreadPoolSize);

Review comment:
       Resolved offline, thank you @jiajunwang!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416963072



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if InstanceConfig doesn't exist.
+    }

Review comment:
       Thank you for correcting!
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420246943



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,36 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+    if (instanceConfig != null) {

Review comment:
       Additionally, we are also updating `LiveInstance`, which I'm giving the same treatment. Read: if `InstanceConfig`, `ClusterConfig`, `LiveInstance` don't exist, the `TaskStateModelFactory` will **not** handle the exception and will let the process fail, because they should exist. I believe if they don't exist, somewhere else will also fail on participant side.
   
   I'm open to your suggestions, what do you think? @jiajunwang @narendly 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416951514



##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -137,6 +140,7 @@
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       Since `InstanceConfig` and `ClusterConfig` are both using the same enum and constant, how about only defining them in `InstanceConfig` and make them public, then `ClusterConfig` can reuse the values?
   
   All the config field names we have so far are enums in one of the several `ConfigProperty`s, and I think it's better to maintain that convention; a separate file like TaskConstants.java having a `InstanceConfig` field name might come as a surprise to others. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416953743



##########
File path: helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
##########
@@ -506,6 +508,16 @@ public void setMaxConcurrentTask(int maxConcurrentTask) {
     _record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask);
   }
 
+  /**
+   * Get the target size of task thread pool
+   * @return the target size of task thread pool
+   */
+  public int getTargetTaskThreadPoolSize() {

Review comment:
       Could we add setters as well with some sort of validation logic?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {

Review comment:
       Why are we including an underscore for a method name?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -44,11 +49,11 @@
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
+  public final static int DEFAULT_TASK_THREAD_POOL_SIZE = 40;

Review comment:
       Could we potentially move this constant to TaskConstants? Do you think that would be a good idea?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if InstanceConfig doesn't exist.
+    }

Review comment:
       Try-catch is not necessary here. There are other ways to check if InstanceConfig exists or not.
   E.g.) Perform one read, if it's null, then it will return null. If exists, then you'll have read the config.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if InstanceConfig doesn't exist.
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(manager.getClusterName());
+      int targetTaskThreadPoolSize = clusterConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if ClusterConfig doesn't exist.
+    }

Review comment:
       Again, this might be an incorrect use of a try-catch clause

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if InstanceConfig doesn't exist.
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(manager.getClusterName());
+      int targetTaskThreadPoolSize = clusterConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if ClusterConfig doesn't exist.
+    }
+
+    return DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  private static boolean _verifyTargetThreadPoolSize(int targetTaskThreadPoolSize) {

Review comment:
       You could do this in the setter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416961202



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -44,11 +49,11 @@
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
+  public final static int DEFAULT_TASK_THREAD_POOL_SIZE = 40;

Review comment:
       Since it's only used in this class, would it be better to have it here? And make it private? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420337963



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       By separating the code into multiple functions based on its purpose, I made it easier to understand. If I combine them, I either name the function `initializeTaskExecutorAndTaskMonitorAndUpdateLiveInstance()` or I force developers to read the code with no clue from the function titles. I'm sure it was clear for you to take a glance and understand what this block is doing because the function names explain them. 
   
   This is a nit, so I don't have that strong of an opinion, but I believe it is correct to break down code into logical groups. To answer your question, "do you have a scenario that you need to call these methods separately?", the answer is yes. I could be utilizing `initializeTaskMonitor()` separately in the other constructor; I'm not doing it because I think that constructor should be gone (see conversation above). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420466018



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -41,23 +47,15 @@
 
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
-  private final ScheduledExecutorService _taskExecutor;
+  private ScheduledExecutorService _taskExecutor;

Review comment:
       Update:
   
   Neal and I had an offline discussion. To simplify the design, we will create a temporary ZK connection to read the task pool size configs and close right away. This would greatly simplify the code and this won't be that much of a burden since the connection is closed right away.
   
   This is to populate the current task size pool at the time of LiveInstance (ephemeral node) initialization.
   
   @NealSun96 Please remember to put a TODO here to potentially revisit the logic - ideally, we shouldn't have to create multiple ZK client connections when there already is a manager there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420248398



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -41,23 +47,15 @@
 
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
-  private final ScheduledExecutorService _taskExecutor;
+  private ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    this(manager, taskFactoryRegistry, null);
   }
 
+  // This constructor is only for internal usage. Do not use!

Review comment:
       We talked about this in an early meeting, how we want to not let users to use this constructor, therefore I added this comment. 
   However, I realized that with this constructor, we actually cannot set `CURRENT_TASK_THREAD_POOL_SIZE`, because we can't get the pool size. I'm now more inclined to disable this constructor altogether. @jiajunwang @narendly 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416940613



##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -137,6 +140,7 @@
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       Why does this need to be a public field? 
   
   Also, it seems that this constant and some of the other enums are duplicated in different places. Is it possible to put this in one place? (Hint. TaskConstants.java)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416951514



##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -137,6 +140,7 @@
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       Since `InstanceConfig` and `ClusterConfig` are both using the same enum and constant, how about only defining them in `InstanceConfig` and make them public, then `ClusterConfig` can reuse the values?
   
   All the field names we have so far are enums in one of the several `ConfigProperty`s, and I think it's better to maintain that convention; a separate file like TaskConstants.java having a `InstanceConfig` field name might come as a surprise to others. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420325192



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {
+      int taskThreadPoolSize = getTaskThreadPoolSize();
+      updateLiveInstanceWithThreadPoolSize(taskThreadPoolSize);

Review comment:
       1. why we have to relies on the HelixManager to query for the config? You can use data accessor whenever it is requried. But if we don't even have the cluster/zk information, then it is not possible. This is one of the cons (or limitations) that we rely on ZK to pass the configuration.
   2. The controller needs to know the real capacity otherwise how it ensures the assigned tasks fit the instance's capacity? I think this is the purpose of this design, no?
   3. Alternatively, you can report the accepted capacity without really applying it to any executor. I mean, when liveinstance is created, put the capacity there. But at that moment, there is no real executor exists. Then when we create the executor, we apply for exactly the same number there.
   To be clear, this is not a preferred design. In this way, the CURRENT_THREAD_POOL_SIZE becomes meaningless. But that is the only thing I can think of now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416964720



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if InstanceConfig doesn't exist.
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(manager.getClusterName());
+      int targetTaskThreadPoolSize = clusterConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if ClusterConfig doesn't exist.
+    }
+
+    return DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  private static boolean _verifyTargetThreadPoolSize(int targetTaskThreadPoolSize) {

Review comment:
       This is to check against the default value: when the field is not specified, a -1 is returned from `getTargetTaskThreadPoolSize()`. This check makes sure the default value is filtered out. 
   
   I don't see any other good way to do `getIntField()` while handling the "not defined" case differently, since `getIntField()` always asks for a default value. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416988465



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,36 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+    if (instanceConfig != null) {

Review comment:
       For `InstanceConfig` it's actually valid to get null. When an instance is set up for the first time it's not going to have `InstanceConfig` when this code is executed, so an exception is not very appropriate because that situation is actually correct. 
   
   I do think the `ClusterConfig` should have been there, but I still think an exception may be too strong. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r422451164



##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -109,7 +109,11 @@
     // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    // The target size of task thread pools for each participant. This is the global value
+    // that's used when participants don't specify their individual pool sizes.

Review comment:
       Let's clarify which config takes precedence and also what would it mean to not have this field defined.
   
   For example, consider the following:
   1. what if an instance has a target value set in InstanceConfig and there is a global thread pool size config set as well?
   2. what if there's nothing set at all? which value will this use?
   etc..

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,33 @@ public void setInstanceCapacityKeys(List<String> capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
   }
 
+  /**
+   * Get the global target size of task thread pools. This values applies to participants and is
+   * overwritten by participants' own values if they specified individual pool sizes in
+   * InstanceConfigs

Review comment:
       What if this value is not set?

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -709,6 +714,33 @@ public void setInstanceCapacityKeys(List<String> capacityKeys) {
     _record.setListField(ClusterConfigProperty.INSTANCE_CAPACITY_KEYS.name(), capacityKeys);
   }
 
+  /**
+   * Get the global target size of task thread pools. This values applies to participants and is
+   * overwritten by participants' own values if they specified individual pool sizes in
+   * InstanceConfigs
+   * @return the global target size of task thread pool
+   */
+  public int getGlobalTargetTaskThreadPoolSize() {
+    return _record
+        .getIntField(ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(),
+            GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET);
+  }
+
+  /**
+   * Set the global target size of task thread pools for this cluster.
+   * @param globalTargetTaskThreadPoolSize - the new global target task thread pool size
+   * @throws IllegalArgumentException - when the provided new thread pool size is not greater than 0
+   */
+  public void setGlobalTargetTaskThreadPoolSize(int globalTargetTaskThreadPoolSize)
+      throws IllegalArgumentException {
+    if (globalTargetTaskThreadPoolSize <= 0) {
+      throw new IllegalArgumentException("globalTargetTaskThreadPoolSize must be greater than 0!");
+    }

Review comment:
       Technically, I think the value 0 is also valid. It just wouldn't process any tasks. What do you think?
   
   I think that we might be able to use this config as a workaround to "disable" task framework globally, or only have certain participants process tasks - say, global is 0, but certain participants will have their individual configs set.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskConstants.java
##########
@@ -49,4 +49,11 @@
   public static final String PREV_RA_NODE = "PreviousResourceAssignment";
 
   public static final boolean DEFAULT_TASK_ENABLE_COMPRESSION = false;
+
+  /**
+   * The default task thread pool size that will be used to create thread pools if target thread
+   * pool sizes are not defined in InstanceConfig or ClusterConfig; also used as the current thread
+   * pool size default value if the current thread pool size is not defined in LiveInstance

Review comment:
       Good description! :)

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +96,53 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  private ScheduledExecutorService createTaskExecutor(int taskThreadPoolSize) {
+    return Executors.newScheduledThreadPool(taskThreadPoolSize, new ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
+      }
+    });
+  }
+
+  private ScheduledExecutorService createTimerTaskExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-timeTask_thread");
+      }
+    });
+  }
+
+  private void initializeTaskMonitor() {
+    if (_taskExecutor instanceof ThreadPoolExecutor) {
+      try {
+        _monitor = new ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
+            (ThreadPoolExecutor) _taskExecutor);
+      } catch (JMException e) {
+        LOG.warn("Error in creating ThreadPoolExecutorMonitor for TaskStateModelFactory.");
+      }
+    }
+  }
+
+  /*
+   * Create a config accessor to get the thread pool size
+   */
+  protected ConfigAccessor createConfigAccessor() {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      String clusterName = _manager.getClusterName();
+      String shardingKey = clusterName.charAt(0) == '/' ? clusterName : "/" + clusterName;

Review comment:
       Cluster name doesn't have a "/". So this ternary check is not necessary?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -44,39 +48,29 @@
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+    // TODO: revisit the logic here; we are creating a connection although we already have a manager

Review comment:
       It would be a good idea to give more context:
   
   This is only so because we don't enforce the order in which the manager should be connected. Some users register a taskFactoryRegistry before connecting the manager, others do so after. Either works. This is only a problem for the former case. 
   So the real TODO is to think about what the right order should be and to determine whether we should enforce this order (which would make it backward incompatible), but arguably cleaner because we then won't have to create an extra ZK connection.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -44,39 +48,29 @@
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    _manager = manager;
+    _taskFactoryRegistry = taskFactoryRegistry;
+    // TODO: revisit the logic here; we are creating a connection although we already have a manager
+    ConfigAccessor configAccessor = createConfigAccessor();
+    int threadPoolSize = TaskUtil.getTargetThreadPoolSize(configAccessor, _manager.getClusterName(),
+        _manager.getInstanceName());
+    configAccessor.close();
+    _taskExecutor = createTaskExecutor(threadPoolSize);
+    _timerTaskExecutor = createTimerTaskExecutor();
+    initializeTaskMonitor();
   }
 
+  // FIXME: DO NOT USE! This size of provided thread pool will not be reflected to controller
+  // properly, the controller may over schedule tasks to this participant.

Review comment:
       Let's also add that we want to avoid using this because Task Framework needs to have full control of the thread pool unlike the state transition thread pool.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {

Review comment:
       Should we allow 0?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(

Review comment:
       Nit: your last "Exception: " is not necessary since you're just providing it as the last parameter. Log4j I think formats it already.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {

Review comment:
       0?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {
+          return globalTargetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn("Got null as ClusterConfig for cluster {}. Returning default value. ",

Review comment:
       It would be nice to include the default size constant here.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {
+          return globalTargetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn("Got null as ClusterConfig for cluster {}. Returning default value. ",
+            clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching ClusterConfig in cluster {}. Returning default value. Exception: ",

Review comment:
       Say what the default value is, and no need for "Exception: ".

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
##########
@@ -1045,4 +1048,54 @@ private static void setNextJobPurgeTime(String workflow, long currentTime, long
       rebalanceScheduler.scheduleRebalance(manager, workflow, nextPurgeTime);
     }
   }
+
+  /**
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   * @param configAccessor - accessor used for the configs
+   * @param clusterName - the cluster name for InstanceConfig and ClusterConfig
+   * @param instanceName - the instance name for InstanceConfig
+   * @return target thread pool size
+   */
+  public static int getTargetThreadPoolSize(ConfigAccessor configAccessor, String clusterName,
+      String instanceName) {
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
+      if (instanceConfig != null) {
+        int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+        if (targetTaskThreadPoolSize > 0) {
+          return targetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn(
+            "Got null as InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. ",
+            instanceName, clusterName);
+      }
+    } catch (HelixException e) {
+      LOG.warn(
+          "Encountered an exception while fetching InstanceConfig for instance {} in cluster {}. Continuing with ClusterConfig. Exception: ",
+          instanceName, clusterName, e);
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+      if (clusterConfig != null) {
+        int globalTargetTaskThreadPoolSize = clusterConfig.getGlobalTargetTaskThreadPoolSize();
+        if (globalTargetTaskThreadPoolSize > 0) {
+          return globalTargetTaskThreadPoolSize;
+        }
+      } else {
+        LOG.warn("Got null as ClusterConfig for cluster {}. Returning default value. ",

Review comment:
       TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +96,53 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  private ScheduledExecutorService createTaskExecutor(int taskThreadPoolSize) {
+    return Executors.newScheduledThreadPool(taskThreadPoolSize, new ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
+      }
+    });
+  }
+
+  private ScheduledExecutorService createTimerTaskExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-timeTask_thread");
+      }
+    });
+  }

Review comment:
       Consider adding a TODO here - I'm not sure why this needs to be a single thread executor. We could certainly use more threads for timer tasks, but let's tackle this at a later point of this project.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +96,53 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  private ScheduledExecutorService createTaskExecutor(int taskThreadPoolSize) {
+    return Executors.newScheduledThreadPool(taskThreadPoolSize, new ThreadFactory() {
+      private AtomicInteger threadId = new AtomicInteger(0);
+
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
+      }
+    });
+  }
+
+  private ScheduledExecutorService createTimerTaskExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "TaskStateModelFactory-timeTask_thread");
+      }
+    });
+  }
+
+  private void initializeTaskMonitor() {
+    if (_taskExecutor instanceof ThreadPoolExecutor) {
+      try {
+        _monitor = new ThreadPoolExecutorMonitor(TaskConstants.STATE_MODEL_NAME,
+            (ThreadPoolExecutor) _taskExecutor);
+      } catch (JMException e) {
+        LOG.warn("Error in creating ThreadPoolExecutorMonitor for TaskStateModelFactory.");

Review comment:
       Nit, let's add e as the second parameter for log.warn.

##########
File path: helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
##########
@@ -145,6 +147,43 @@ public void testSessionExpiryCreateLiveInstance() throws Exception {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;

Review comment:
       Nit: might be a display of better craftsmanship/readability if you could explain why you're adding 1 here. I believe you're just trying to use a value that's not 40?

##########
File path: helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
##########
@@ -149,6 +150,7 @@ public void beforeClass() throws Exception {
     System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
         "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
 
+    HttpRoutingDataReader.reset();

Review comment:
       How is this test relevant to this PR?

##########
File path: helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
##########
@@ -145,6 +147,43 @@ public void testSessionExpiryCreateLiveInstance() throws Exception {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(ZK_ADDR));
+    final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR,
+        12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave",
+        true); // do rebalance
+
+    final String instanceName = "localhost_12918";
+    final MockParticipantManager manager =
+        new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+    InstanceConfig instanceConfig = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
+    instanceConfig.setTargetTaskThreadPoolSize(testThreadPoolSize);
+    accessor.setProperty(keyBuilder.instanceConfig(instanceName), instanceConfig);
+
+    manager.syncStart();

Review comment:
       Are you stopping this manager and cleaning up this cluster after the test?

##########
File path: helix-core/src/test/java/org/apache/helix/manager/zk/TestParticipantManager.java
##########
@@ -145,6 +147,43 @@ public void testSessionExpiryCreateLiveInstance() throws Exception {
     deleteCluster(clusterName);
   }
 
+  @Test(dependsOnMethods = "testSessionExpiryCreateLiveInstance")
+  public void testCurrentTaskThreadPoolSizeCreation() throws Exception {
+    final int testThreadPoolSize = TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+    final String className = TestHelper.getTestClassName();
+    final String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    final ZKHelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(ZK_ADDR));

Review comment:
       If possible, could we start phasing out deprecated constructors (I believe this is deprecated)?

##########
File path: helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
##########
@@ -123,4 +123,28 @@ public void testSetInstanceCapacityMapInvalid() {
     InstanceConfig testConfig = new InstanceConfig("testConfig");
     testConfig.setInstanceCapacityMap(capacityDataMap);
   }
+
+  @Test
+  public void testGetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.getRecord().setIntField(
+        InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetTargetTaskThreadPoolSize() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        InstanceConfig.InstanceConfigProperty.TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetTargetTaskThreadPoolSizeIllegalArgument() {
+    InstanceConfig testConfig = new InstanceConfig("testConfig");
+    testConfig.setTargetTaskThreadPoolSize(0);

Review comment:
       We should consider allowing the value 0. Negative values don't make any sense so those should still not be allowed.

##########
File path: helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
##########
@@ -69,6 +69,30 @@ public void testSetCapacityKeysEmptyList() {
     testConfig.setInstanceCapacityKeys(Collections.emptyList());
   }
 
+  @Test
+  public void testGetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.getRecord().setIntField(
+        ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), 100);
+
+    Assert.assertEquals(testConfig.getGlobalTargetTaskThreadPoolSize(), 100);
+  }
+
+  @Test
+  public void testSetGlobalTargetTaskThreadPoolSize() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(100);
+
+    Assert.assertEquals(testConfig.getRecord().getIntField(
+        ClusterConfig.ClusterConfigProperty.GLOBAL_TARGET_TASK_THREAD_POOL_SIZE.name(), -1), 100);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testSetGlobalTargetTaskThreadPoolSizeIllegalArgument() {
+    ClusterConfig testConfig = new ClusterConfig("testId");
+    testConfig.setGlobalTargetTaskThreadPoolSize(0);

Review comment:
       We should maybe consider allowing the value 0. Negative values don't make any sense so those should still not be allowed.

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testConfigAccessorCreationMultiZk() throws Exception {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    // Start a msds server
+    // TODO: TestMultiZkHelixJavaApis already defined MSDS_SERVER_ENDPOINT, which goes into
+    // HttpRoutingDataReader and is recorded as final. As a result this test case has to use the
+    // same endpoint. There's no workaround at this moment.
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";
+    Map<String, Collection<String>> routingData = new HashMap<>();
+    routingData
+        .put(ZK_ADDR, Collections.singletonList("/" + anyParticipantManager.getClusterName()));
+    MockMetadataStoreDirectoryServer msds =
+        new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace, routingData);
+    msds.startServer();
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    String prevMsdsServerEndpoint =
+        System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
+    // Turn on multiZk mode in System config
+    System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
+    // MSDS endpoint: http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+    System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
+        "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace);
+
+    HttpRoutingDataReader.reset();
+    TaskStateModelFactory taskStateModelFactory =
+        new TaskStateModelFactory(anyParticipantManager, Collections.emptyMap());
+    ConfigAccessor configAccessor = taskStateModelFactory.createConfigAccessor();
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(configAccessor, anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), TEST_TARGET_TASK_THREAD_POOL_SIZE);
+
+    // Restore system properties
+    if (prevMultiZkEnabled == null) {
+      System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    } else {
+      System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
+    }
+    if (prevMsdsServerEndpoint == null) {
+      System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+    } else {
+      System.setProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY, prevMsdsServerEndpoint);
+    }
+    msds.stopServer();
+  }
+
+  @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
+  public void testConfigAccessorCreationSingleZk() {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    // Save previously-set system configs
+    String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
+    // Turn on multiZk mode in System config

Review comment:
       Did you mean to say turn "off"? :)

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify correctness
+  private static final int TEST_TARGET_TASK_THREAD_POOL_SIZE =
+      TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
+
+  @Test
+  public void testConfigAccessorCreationMultiZk() throws Exception {
+    MockParticipantManager anyParticipantManager = _participants[0];
+
+    InstanceConfig instanceConfig =
+        InstanceConfig.toInstanceConfig(anyParticipantManager.getInstanceName());
+    instanceConfig.setTargetTaskThreadPoolSize(TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    anyParticipantManager.getConfigAccessor()
+        .setInstanceConfig(anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName(), instanceConfig);
+
+    // Start a msds server
+    // TODO: TestMultiZkHelixJavaApis already defined MSDS_SERVER_ENDPOINT, which goes into
+    // HttpRoutingDataReader and is recorded as final. As a result this test case has to use the
+    // same endpoint. There's no workaround at this moment.
+    final String msdsHostName = "localhost";
+    final int msdsPort = 11117;
+    final String msdsNamespace = "multiZkTest";

Review comment:
       This is okay. I'm not sure if we want to make it a TODO, but "multiZkTest" is a generic enough a name, so I'm not too bothered by this at the moment. The reason this is tricky is that in production, in order to set up a different MSDS, you would have to change the JVM config and restart the JVM, thereby updating the `final` field.
   
   I think the right TODO here is that now that we have a few tests that need to use the MSDS, so it would be a good idea to move this MSDS creation to the abstract test base so that we make it transparent to all tests, not a single test that tests multi-zk functionalities. What do you think? That way, we could make these values like msdsHostName, port, namespace all accessible to child classes, so you don't have to redefine them (just like how we do with ZK_ADDR and such).

##########
File path: helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,124 @@
+package org.apache.helix.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
+import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
+import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestTaskStateModelFactory extends TaskTestBase {
+  // This value has to be different from the default value to verify correctness

Review comment:
       Great!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420993736



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -41,23 +47,15 @@
 
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
-  private final ScheduledExecutorService _taskExecutor;
+  private ScheduledExecutorService _taskExecutor;

Review comment:
       Fyi @jiajunwang 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420308001



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       Agreed on the sync part. 
   
   Why do we need all logic inside `initializeTaskExecutor()`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r419711715



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,40 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int _getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    try {
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if InstanceConfig doesn't exist.
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    try {
+      ClusterConfig clusterConfig = configAccessor.getClusterConfig(manager.getClusterName());
+      int targetTaskThreadPoolSize = clusterConfig.getTargetTaskThreadPoolSize();
+      if (_verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    } catch (HelixException e) {
+      // Pass if ClusterConfig doesn't exist.
+    }
+
+    return DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  private static boolean _verifyTargetThreadPoolSize(int targetTaskThreadPoolSize) {

Review comment:
       @narendly Reminder on this conversation. 
   
   I want to know if there's a good way to use `getIntField()` while handling the "not defined" case differently. If not, I will proceed with the current design. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420966673



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       With the recent changes, `initializeTaskMonitor()` is called by both constructors so it makes sense for it to be factored out. I'm not entirely convinced that it's correct to put all logic in one function - I always thought it's a good practice to break functions down when necessary into logical chunks to avoid big messy functions. 
   Since this section of code is outdated, we can discuss in the new conversation if necessary. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420364253



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       It depends, to me the whole init logic is one logical group.
   1. each method is small jumping between these methods to read the code is not comfortable.
   2. Do you have a scenario that you need to call these methods separately? Or how you support the statement that they are different logical groups?
   3. The method name does not need to contain all the logics inside. I don't quite get this point.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420288114



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       in addition to what Huizhi said, shall we make it 40 for backward compatibility?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       To be safe, add a sync control on this block. There is no guarantee we won't call createNewStateModel in parallel in the future.
   And better to put all the logics inside the initializeTaskExecutor().

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {
+      int taskThreadPoolSize = getTaskThreadPoolSize();
+      updateLiveInstanceWithThreadPoolSize(taskThreadPoolSize);

Review comment:
       Curious about this design.
   The createNewStateModel() is called when we have partition assigned, right? But I think the controller needs to know the pool size before assign the tasks. How it works?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -41,23 +47,15 @@
 
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
-  private final ScheduledExecutorService _taskExecutor;
+  private ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    this(manager, taskFactoryRegistry, null);

Review comment:
       As mentioned below, I don't think we like lazy initialize the executor. Any reason for this change?

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +106,73 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private int getTaskThreadPoolSize() {
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(_manager.getClusterName(), _manager.getInstanceName());
+    if (instanceConfig != null) {
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(_manager.getClusterName());
+    if (clusterConfig != null) {
+      int targetTaskThreadPoolSize = clusterConfig.getDefaultTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  /*
+   * Checks against the default values of -1 when pool sizes are not defined; we don't want -1's
+   */
+  private static boolean verifyTargetThreadPoolSize(int targetTaskThreadPoolSize) {
+    return targetTaskThreadPoolSize > 0;
+  }
+
+  /*
+   * Update LiveInstance with the current used thread pool size
+   */
+  private void updateLiveInstanceWithThreadPoolSize(int taskThreadPoolSize) {

Review comment:
       Instead of updating after the live node created, we shall ensure it is included when the node is created the first time. Or we may have some serious issue that the controller violates the capacity limitation.

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +106,73 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private int getTaskThreadPoolSize() {
+    ConfigAccessor configAccessor = _manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(_manager.getClusterName(), _manager.getInstanceName());
+    if (instanceConfig != null) {
+      int targetTaskThreadPoolSize = instanceConfig.getTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    // Fallback to cluster config since instance config doesn't provide the value
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(_manager.getClusterName());
+    if (clusterConfig != null) {
+      int targetTaskThreadPoolSize = clusterConfig.getDefaultTargetTaskThreadPoolSize();
+      if (verifyTargetThreadPoolSize(targetTaskThreadPoolSize)) {
+        return targetTaskThreadPoolSize;
+      }
+    }
+
+    return TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE;
+  }
+
+  /*
+   * Checks against the default values of -1 when pool sizes are not defined; we don't want -1's

Review comment:
       You are also excluding 0. Let's make the comment clear and fitting the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420289599



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,95 @@
+package org.apache.helix.task;
+

Review comment:
       Not sure what happened there... Added it. Thanks for catching it!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420290124



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       These values all have different meanings (see one discussion above). Also, since they are all used only in one place (LiveInstance, ClusterConfig, InstanceConfig), they should stay in the scope.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] pkuwm commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
pkuwm commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420278332



##########
File path: helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
##########
@@ -0,0 +1,95 @@
+package org.apache.helix.task;
+

Review comment:
       Apache license?

##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       I see 2 not set constants. Did you consider using one common constant and putting it in the constants class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420337963



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       By separating the code into multiple functions based on its purpose, I made it easier to understand. If I combine them, I either name the function `initializeTaskExecutorAndTaskMonitorAndUpdateLiveInstance()` or I force developers to read the code with no clue from the function titles. I'm sure it was clear for you to take a glance and understand what this block is doing because the function names explain them. 
   
   This is a nit, so I don't have that strong if an opinion, but I believe it is correct to break down code into logical groups. To answer your question, "do you have a scenario that you need to call these methods separately?", the answer is yes. I could be utilizing `initializeTaskMonitor()` separately in the other constructor; I'm not doing it because I think that constructor should be gone (see conversation above). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r417595459



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -44,11 +49,11 @@
   private final ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
+  public final static int DEFAULT_TASK_THREAD_POOL_SIZE = 40;

Review comment:
       I realized that this is actually incorrect. I'll move it to TaskConstants. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420963940



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       If there is ever a case where CURRENT_TASK_THREAD_POOL_SIZE is not set, I don't think the controller should proceed. But @narendly raised a good point offline - it could be an old participant that doesn't have this field. I'm using the default 40 instead as there's no better options in that case. Thank you all for the input. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420357720



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -48,6 +49,7 @@
     TASK_EXEC_THREAD
   }
 
+  public static final int CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       What I meant here is actully since you have a default value anyway. There should not be a case we want the CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET defined. We just always return 40 if no additional configuration is given.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416975135



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,36 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+    if (instanceConfig != null) {

Review comment:
       nit, shall we throw exception if the instance config or cluster config is not found?
   This mostly because the read is not done successfully. In which case, we don't know the real configured numbers. So better stop initialization instead of guess it shall be 40.

##########
File path: helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
##########
@@ -137,6 +140,7 @@
   private final static int MAX_REBALANCE_PREFERENCE = 10;
   private final static int MIN_REBALANCE_PREFERENCE = 0;
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+  public static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;

Review comment:
       IMO, the cluster level default value is different from the node level pool size.
   Better to use a different name for the cluster level default number.
   DEFAULT_TARGET_TASK_THREAD_POOL_SIZE
   DEFAULT_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] narendly commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
narendly commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420434219



##########
File path: helix-core/src/main/java/org/apache/helix/model/LiveInstance.java
##########
@@ -190,6 +192,24 @@ public void setWebserviceUrl(String url) {
     _record.setSimpleField(LiveInstanceProperty.ZKPROPERTYTRANSFERURL.toString(), url);
   }
 
+  /**
+   * Get the current task thread pool size of the instance
+   * @return the current task thread pool size
+   */
+  public int getCurrentTaskThreadPoolSize() {
+    return _record.getIntField(LiveInstanceProperty.CURRENT_TASK_THREAD_POOL_SIZE.name(),
+        CURRENT_TASK_THREAD_POOL_SIZE_NOT_SET);

Review comment:
       Please try to understand the proposed design thoroughly.
   
   For example, what does it mean for an instance to have a task thread pool of size -1? (it is meaningless, we shouldn't create a task pool with size -1).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r416960606



##########
File path: helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
##########
@@ -506,6 +508,16 @@ public void setMaxConcurrentTask(int maxConcurrentTask) {
     _record.setIntField(InstanceConfigProperty.MAX_CONCURRENT_TASK.name(), maxConcurrentTask);
   }
 
+  /**
+   * Get the target size of task thread pool
+   * @return the target size of task thread pool
+   */
+  public int getTargetTaskThreadPoolSize() {

Review comment:
       I agree. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: (WIP) TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r419710771



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -102,4 +107,36 @@ public boolean isShutdown() {
   public boolean isTerminated() {
     return _taskExecutor.isTerminated();
   }
+
+  /*
+   * Get target thread pool size from InstanceConfig first; if that fails, get it from
+   * ClusterConfig; if that fails, fall back to the default value.
+   */
+  private static int getTaskThreadPoolSize(HelixManager manager) {
+    ConfigAccessor configAccessor = manager.getConfigAccessor();
+    // Check instance config first for thread pool size
+    InstanceConfig instanceConfig =
+        configAccessor.getInstanceConfig(manager.getClusterName(), manager.getInstanceName());
+    if (instanceConfig != null) {

Review comment:
       So the context has changed, and what I said earlier is no longer valid. Please take a look if you want to further this discussion:
   
   1. Thread pool creation is now after HelixManager connection, which means `InstanceConfig` and `ClusterConfig` are expected to exist. It should be treated as an error if they don't exist. 
   2. I verified with test cases and believe that an earlier comment made by @narendly is incorrect: if `InstanceConfig` or `ClusterConfig` doesn't exist, `getInstanceConfig()`/`getClusterConfig()` will throw an HelixException because `isInstanceSetup()`/`isClusterSetup()` will fail. 
   
   Here are the implications of these two points:
   
   With the current design as shown here, if `InstanceConfig`/`ClusterConfig` doesn't exist, the attempt to create a thread pool will result an HelixException. I believe it's fine to **not** catch the exception and let it propagate up, because if `InstanceConfig`/`ClusterConfig` doesn't exist, it's an erroneous state. This aligns with what @jiajunwang is saying, and is the current design as we see it. 
   
   Alternatively, we can catch the HelixException, log it, and use the default value (this is @narendly 's idea). 
   
   I'm leaning towards the current design (let HelixException propagate upwards if configs don't exist). Anyone has a different opinion? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 edited a comment on pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 edited a comment on pull request #973:
URL: https://github.com/apache/helix/pull/973#issuecomment-623714094


   > Just to confirm, will the logic to report the current pool size be in a different PR?
   
   Taking back my previous comment as I thought about it a bit more. Will edit later. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] jiajunwang commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
jiajunwang commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420309052



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {

Review comment:
       Do you have a scenario that you need to call these methods separately? Put each line of code in separate methods are not efficient. Please try to balance the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] NealSun96 commented on a change in pull request #973: TaskStateModelFactory configurable thread pool size

Posted by GitBox <gi...@apache.org>.
NealSun96 commented on a change in pull request #973:
URL: https://github.com/apache/helix/pull/973#discussion_r420313966



##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -81,6 +79,12 @@ public Thread newThread(Runnable r) {
 
   @Override
   public TaskStateModel createNewStateModel(String resourceName, String partitionKey) {
+    if (_taskExecutor == null) {
+      int taskThreadPoolSize = getTaskThreadPoolSize();
+      updateLiveInstanceWithThreadPoolSize(taskThreadPoolSize);

Review comment:
       Answering "why lazy initialization" question here as well:
   
   When `TaskStateModelFactory` is initialized, it is usually during factory registration. However, factory registration can occur before HelixManager connection, or after. When it occurs before HelixManager connection, `TaskStateModelFactory` actually cannot access the configs, therefore it cannot create the thread pool before manager connection. 
   
   We have 2 ways to make it work: 1. we enforce `TaskStateModelFactory` initialization to be after HelixManager connection, 2. we lazily initialize the thread pool. I chose 2 because it is backward compatible - clients don't need to change their existing code in which they registered `TaskStateModelFactory` before HelixManager connection (which exists). 
   
   In terms of `createNewStateModel()`, it is called during message handling when the state model of the message doesn't exist for the resource name and partition key. From the code I read, I don't think "the controller needs to know the pool size before assign the tasks", because as we can see from the original code of this class, the task thread pool is never exposed anywhere other than `createNewStateModel()`. In fact, it's not used anywhere other than `createNewStateModel()` either, therefore making it safe to initialize in this function. 
   
   Feel free to correct me. @jiajunwang 

##########
File path: helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
##########
@@ -41,23 +47,15 @@
 
   private final HelixManager _manager;
   private final Map<String, TaskFactory> _taskFactoryRegistry;
-  private final ScheduledExecutorService _taskExecutor;
+  private ScheduledExecutorService _taskExecutor;
   private final ScheduledExecutorService _timerTaskExecutor;
   private ThreadPoolExecutorMonitor _monitor;
-  public final static int TASK_THREADPOOL_SIZE = 40;
 
   public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) {
-    this(manager, taskFactoryRegistry,
-        Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() {
-          private AtomicInteger threadId = new AtomicInteger(0);
-
-          @Override
-          public Thread newThread(Runnable r) {
-            return new Thread(r, "TaskStateModelFactory-task_thread-" + threadId.getAndIncrement());
-          }
-        }));
+    this(manager, taskFactoryRegistry, null);

Review comment:
       Combined with the previous comment. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org