You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "dxichen (via GitHub)" <gi...@apache.org> on 2023/06/01 02:12:56 UTC

[GitHub] [samza] dxichen commented on a diff in pull request #1667: SAMZA-2781: Use framework thread to execute hand-offs and sub-DAG execution

dxichen commented on code in PR #1667:
URL: https://github.com/apache/samza/pull/1667#discussion_r1212476028


##########
samza-api/src/main/java/org/apache/samza/context/TaskContext.java:
##########
@@ -108,4 +109,12 @@ public interface TaskContext {
    */
   @InterfaceStability.Evolving
   void setStartingOffset(SystemStreamPartition systemStreamPartition, String offset);
+
+  /**
+   * Gets the operator {@link ExecutorService} for this container.

Review Comment:
   this should be executorService for this `task`



##########
samza-api/src/main/java/org/apache/samza/task/TaskExecutorFactory.java:
##########
@@ -19,17 +19,39 @@
 package org.apache.samza.task;
 
 import java.util.concurrent.ExecutorService;
+import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
 
 
 /**
  * Factory for creating the executor used when running tasks in multi-thread mode.
  */
+@InterfaceStability.Unstable
 public interface TaskExecutorFactory {
 
   /**
    * @param config contains configs for the executor
    * @return task executor
    */
   ExecutorService getTaskExecutor(Config config);
+
+  /**
+   * Operator thread pool is asynchronous execution facility used to execute hand-off between operators and sub-DAG in case
+   * of synchronous operators in the application DAG. In case of asynchronous operators, typically the operator invocation
+   * happens on one thread while completion of the callback happens on another thread. When the CompletionStage completes normally,
+   * the subsequent DAG or hand-off code is executed on the operator thread pool.
+   * <b>Note:</b>It is upto the implementors of the factory to share the executor across tasks vs provide isolated executors

Review Comment:
   s/upto/up to



##########
samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java:
##########
@@ -37,4 +45,36 @@ public ExecutorService getTaskExecutor(Config config) {
     return Executors.newFixedThreadPool(threadPoolSize,
         new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build());
   }
+
+  /**
+   * {@inheritDoc}
+   *
+   * The choice of thread pool is determined based on the following logic
+   *    If job.operator.thread.pool.enabled,
+   *     a. Use {@link #getTaskExecutor(Config)} if job.container.thread.pool.size &gt; 1
+   *     b. Use default single threaded pool otherwise
+   * <b>Note:</b> The default single threaded pool used is a substitute for the scenario where container thread pool is null and
+   * the messages are dispatched on runloop thread. We can't have the stages schedule on the run loop thread and hence
+   * the fallback to use a single threaded executor across all tasks.
+   */
+  @Override
+  public ExecutorService getOperatorExecutor(TaskName taskName, Config config) {
+    ExecutorService taskExecutor = TASK_EXECUTORS.computeIfAbsent(taskName, key -> {
+      final int threadPoolSize = new JobConfig(config).getThreadPoolSize();
+      ExecutorService operatorThreadPool;
+
+      if (threadPoolSize > 1) {
+        LOG.info("Using container thread pool as operator thread pool for task {}", key.getTaskName());
+        operatorThreadPool = getTaskExecutor(config);

Review Comment:
   Since we are using the taskExecutor in this impl, what is the concern to directly use the taskExecutor in place of the operatorExecutor directly in the api for other impls?



##########
samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java:
##########
@@ -37,4 +45,36 @@ public ExecutorService getTaskExecutor(Config config) {
     return Executors.newFixedThreadPool(threadPoolSize,
         new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build());
   }
+
+  /**
+   * {@inheritDoc}
+   *
+   * The choice of thread pool is determined based on the following logic
+   *    If job.operator.thread.pool.enabled,
+   *     a. Use {@link #getTaskExecutor(Config)} if job.container.thread.pool.size &gt; 1
+   *     b. Use default single threaded pool otherwise
+   * <b>Note:</b> The default single threaded pool used is a substitute for the scenario where container thread pool is null and
+   * the messages are dispatched on runloop thread. We can't have the stages schedule on the run loop thread and hence
+   * the fallback to use a single threaded executor across all tasks.
+   */
+  @Override
+  public ExecutorService getOperatorExecutor(TaskName taskName, Config config) {
+    ExecutorService taskExecutor = TASK_EXECUTORS.computeIfAbsent(taskName, key -> {
+      final int threadPoolSize = new JobConfig(config).getThreadPoolSize();
+      ExecutorService operatorThreadPool;

Review Comment:
   nit: operatorExecutor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org