You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/03/10 06:38:48 UTC

[samza] branch master updated: SAMZA-2720: Allow custom task executor to be injected and used. (#1588)

This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a90277  SAMZA-2720: Allow custom task executor to be injected and used. (#1588)
3a90277 is described below

commit 3a902771579917952d1155d5c6ac75cec84f09dc
Author: Christopher Zhang <ch...@berkeley.edu>
AuthorDate: Wed Mar 9 22:37:10 2022 -0800

    SAMZA-2720: Allow custom task executor to be injected and used. (#1588)
    
    * SAMZA-2720: Allow custom task executor to be injected and used.
    
    * run ./gradlew wrapper
    
    * iterate with feedback
    
    * fix variable names and imports
---
 gradle/wrapper/gradle-wrapper.properties           |  1 -
 .../org/apache/samza/task/TaskExecutorFactory.java | 35 ++++++++++
 .../java/org/apache/samza/config/JobConfig.java    |  6 ++
 .../samza/task/DefaultTaskExecutorFactory.java     | 40 ++++++++++++
 .../apache/samza/container/SamzaContainer.scala    |  5 +-
 .../samza/task/TestDefaultTaskExecutorFactory.java | 74 ++++++++++++++++++++++
 6 files changed, 158 insertions(+), 3 deletions(-)

diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 83639a3..44e7c4d 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,4 +1,3 @@
-#Mon Jun 01 15:50:38 PDT 2020
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskExecutorFactory.java b/samza-api/src/main/java/org/apache/samza/task/TaskExecutorFactory.java
new file mode 100644
index 0000000..85fcc04
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskExecutorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory for creating the executor used when running tasks in multi-thread mode.
+ */
+public interface TaskExecutorFactory {
+
+  /**
+   * @param config contains configs for the executor
+   * @return task executor
+   */
+  ExecutorService getTaskExecutor(Config config);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 266201b..59163b9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -63,6 +63,8 @@ public class JobConfig extends MapConfig {
   public static final String JOB_CONTAINER_COUNT = "job.container.count";
   static final int DEFAULT_JOB_CONTAINER_COUNT = 1;
   public static final String JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size";
+  public static final String JOB_CONTAINER_TASK_EXECUTOR_FACTORY = "job.container.task.executor.factory";
+  public static final String DEFAULT_JOB_CONTAINER_TASK_EXECUTOR_FACTORY = "org.apache.samza.task.DefaultTaskExecutorFactory";
   // num commit threads == min(max(2 * num tasks in container, thread pool size), max thread pool size)
   public static final String COMMIT_THREAD_POOL_SIZE = "job.container.commit.thread.pool.size";
   static final int DEFAULT_COMMIT_THREAD_POOL_SIZE = 2;
@@ -355,6 +357,10 @@ public class JobConfig extends MapConfig {
     }
   }
 
+  public String getTaskExecutorFactory() {
+    return get(JOB_CONTAINER_TASK_EXECUTOR_FACTORY, DEFAULT_JOB_CONTAINER_TASK_EXECUTOR_FACTORY);
+  }
+
   public int getCommitThreadPoolSize() {
     return getInt(COMMIT_THREAD_POOL_SIZE, DEFAULT_COMMIT_THREAD_POOL_SIZE);
   }
diff --git a/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java
new file mode 100644
index 0000000..0fe5f25
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+
+
+/**
+ * Default factory for creating the executor used when running tasks in multi-thread mode.
+ */
+public class DefaultTaskExecutorFactory implements TaskExecutorFactory {
+
+  @Override
+  public ExecutorService getTaskExecutor(Config config) {
+    int threadPoolSize = new JobConfig(config).getThreadPoolSize();
+
+    return Executors.newFixedThreadPool(threadPoolSize,
+        new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build());
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e1cbd54..da364f1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -457,8 +457,9 @@ object SamzaContainer extends Logging {
     samzaContainerMetrics.containerThreadPoolSize.set(threadPoolSize)
 
     val taskThreadPool = if (threadPoolSize > 0) {
-      Executors.newFixedThreadPool(threadPoolSize,
-        new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build())
+      val taskExecutorFactoryClassName = jobConfig.getTaskExecutorFactory
+      val taskExecutorFactory = ReflectionUtil.getObj(taskExecutorFactoryClassName, classOf[TaskExecutorFactory])
+      taskExecutorFactory.getTaskExecutor(config)
     } else {
       null
     }
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestDefaultTaskExecutorFactory.java b/samza-core/src/test/java/org/apache/samza/task/TestDefaultTaskExecutorFactory.java
new file mode 100644
index 0000000..0f7b449
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestDefaultTaskExecutorFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.ReflectionUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE;
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_TASK_EXECUTOR_FACTORY;
+
+
+/**
+ * Tests {@link DefaultTaskExecutorFactory}.
+ */
+public class TestDefaultTaskExecutorFactory {
+
+  @Test
+  public void testGetTaskExecutor() {
+    DefaultTaskExecutorFactory factory = new DefaultTaskExecutorFactory();
+
+    Map<String, String> mapConfig = new HashMap<>();
+    int poolSize = 12;
+    mapConfig.put(JOB_CONTAINER_THREAD_POOL_SIZE, String.valueOf(poolSize));
+    Config config = new MapConfig(mapConfig);
+
+    ExecutorService executor = factory.getTaskExecutor(config);
+
+    Assert.assertEquals(poolSize, ((ThreadPoolExecutor) executor).getCorePoolSize());
+  }
+
+  @Test
+  public void testGetTaskExecutorFactory() {
+    Map<String, String> mapConfig = new HashMap<>();
+    mapConfig.put(JOB_CONTAINER_TASK_EXECUTOR_FACTORY, MockTaskExecutorFactory.class.getName());
+    JobConfig config = new JobConfig(new MapConfig(mapConfig));
+
+    String taskExecutorFactoryClassName = config.getTaskExecutorFactory();
+    TaskExecutorFactory taskExecutorFactory = ReflectionUtil.getObj(taskExecutorFactoryClassName, TaskExecutorFactory.class);
+
+    Assert.assertTrue(taskExecutorFactory instanceof MockTaskExecutorFactory);
+  }
+
+  public static class MockTaskExecutorFactory implements TaskExecutorFactory {
+
+    @Override
+    public ExecutorService getTaskExecutor(Config config) {
+      return null;
+    }
+  }
+}