You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/03/10 06:38:48 UTC
[samza] branch master updated: SAMZA-2720: Allow custom task executor to be injected and used. (#1588)
This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3a90277 SAMZA-2720: Allow custom task executor to be injected and used. (#1588)
3a90277 is described below
commit 3a902771579917952d1155d5c6ac75cec84f09dc
Author: Christopher Zhang <ch...@berkeley.edu>
AuthorDate: Wed Mar 9 22:37:10 2022 -0800
SAMZA-2720: Allow custom task executor to be injected and used. (#1588)
* SAMZA-2720: Allow custom task executor to be injected and used.
* run ./gradlew wrapper
* iterate with feedback
* fix variable names and imports
---
gradle/wrapper/gradle-wrapper.properties | 1 -
.../org/apache/samza/task/TaskExecutorFactory.java | 35 ++++++++++
.../java/org/apache/samza/config/JobConfig.java | 6 ++
.../samza/task/DefaultTaskExecutorFactory.java | 40 ++++++++++++
.../apache/samza/container/SamzaContainer.scala | 5 +-
.../samza/task/TestDefaultTaskExecutorFactory.java | 74 ++++++++++++++++++++++
6 files changed, 158 insertions(+), 3 deletions(-)
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 83639a3..44e7c4d 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,4 +1,3 @@
-#Mon Jun 01 15:50:38 PDT 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskExecutorFactory.java b/samza-api/src/main/java/org/apache/samza/task/TaskExecutorFactory.java
new file mode 100644
index 0000000..85fcc04
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskExecutorFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Factory for creating the executor used when running tasks in multi-thread mode.
+ */
+public interface TaskExecutorFactory {
+
+ /**
+ * @param config contains configs for the executor
+ * @return task executor
+ */
+ ExecutorService getTaskExecutor(Config config);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 266201b..59163b9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -63,6 +63,8 @@ public class JobConfig extends MapConfig {
public static final String JOB_CONTAINER_COUNT = "job.container.count";
static final int DEFAULT_JOB_CONTAINER_COUNT = 1;
public static final String JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size";
+ public static final String JOB_CONTAINER_TASK_EXECUTOR_FACTORY = "job.container.task.executor.factory";
+ public static final String DEFAULT_JOB_CONTAINER_TASK_EXECUTOR_FACTORY = "org.apache.samza.task.DefaultTaskExecutorFactory";
// num commit threads == min(max(2 * num tasks in container, thread pool size), max thread pool size)
public static final String COMMIT_THREAD_POOL_SIZE = "job.container.commit.thread.pool.size";
static final int DEFAULT_COMMIT_THREAD_POOL_SIZE = 2;
@@ -355,6 +357,10 @@ public class JobConfig extends MapConfig {
}
}
+ public String getTaskExecutorFactory() {
+ return get(JOB_CONTAINER_TASK_EXECUTOR_FACTORY, DEFAULT_JOB_CONTAINER_TASK_EXECUTOR_FACTORY);
+ }
+
public int getCommitThreadPoolSize() {
return getInt(COMMIT_THREAD_POOL_SIZE, DEFAULT_COMMIT_THREAD_POOL_SIZE);
}
diff --git a/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java
new file mode 100644
index 0000000..0fe5f25
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/DefaultTaskExecutorFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+
+
+/**
+ * Default factory for creating the executor used when running tasks in multi-thread mode.
+ */
+public class DefaultTaskExecutorFactory implements TaskExecutorFactory {
+
+ @Override
+ public ExecutorService getTaskExecutor(Config config) {
+ int threadPoolSize = new JobConfig(config).getThreadPoolSize();
+
+ return Executors.newFixedThreadPool(threadPoolSize,
+ new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build());
+ }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index e1cbd54..da364f1 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -457,8 +457,9 @@ object SamzaContainer extends Logging {
samzaContainerMetrics.containerThreadPoolSize.set(threadPoolSize)
val taskThreadPool = if (threadPoolSize > 0) {
- Executors.newFixedThreadPool(threadPoolSize,
- new ThreadFactoryBuilder().setNameFormat("Samza Container Thread-%d").build())
+ val taskExecutorFactoryClassName = jobConfig.getTaskExecutorFactory
+ val taskExecutorFactory = ReflectionUtil.getObj(taskExecutorFactoryClassName, classOf[TaskExecutorFactory])
+ taskExecutorFactory.getTaskExecutor(config)
} else {
null
}
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestDefaultTaskExecutorFactory.java b/samza-core/src/test/java/org/apache/samza/task/TestDefaultTaskExecutorFactory.java
new file mode 100644
index 0000000..0f7b449
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/TestDefaultTaskExecutorFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.ReflectionUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE;
+import static org.apache.samza.config.JobConfig.JOB_CONTAINER_TASK_EXECUTOR_FACTORY;
+
+
+/**
+ * Tests {@link DefaultTaskExecutorFactory}.
+ */
+public class TestDefaultTaskExecutorFactory {
+
+ @Test
+ public void testGetTaskExecutor() {
+ DefaultTaskExecutorFactory factory = new DefaultTaskExecutorFactory();
+
+ Map<String, String> mapConfig = new HashMap<>();
+ int poolSize = 12;
+ mapConfig.put(JOB_CONTAINER_THREAD_POOL_SIZE, String.valueOf(poolSize));
+ Config config = new MapConfig(mapConfig);
+
+ ExecutorService executor = factory.getTaskExecutor(config);
+
+ Assert.assertEquals(poolSize, ((ThreadPoolExecutor) executor).getCorePoolSize());
+ }
+
+ @Test
+ public void testGetTaskExecutorFactory() {
+ Map<String, String> mapConfig = new HashMap<>();
+ mapConfig.put(JOB_CONTAINER_TASK_EXECUTOR_FACTORY, MockTaskExecutorFactory.class.getName());
+ JobConfig config = new JobConfig(new MapConfig(mapConfig));
+
+ String taskExecutorFactoryClassName = config.getTaskExecutorFactory();
+ TaskExecutorFactory taskExecutorFactory = ReflectionUtil.getObj(taskExecutorFactoryClassName, TaskExecutorFactory.class);
+
+ Assert.assertTrue(taskExecutorFactory instanceof MockTaskExecutorFactory);
+ }
+
+ public static class MockTaskExecutorFactory implements TaskExecutorFactory {
+
+ @Override
+ public ExecutorService getTaskExecutor(Config config) {
+ return null;
+ }
+ }
+}