You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2021/01/28 20:33:53 UTC
[tez] branch master updated: TEZ-4269: Re-Work Threadpool in
DAGAppMaster (#92)
This is an automated email from the ASF dual-hosted git repository.
jeagles pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 41ff945 TEZ-4269: Re-Work Threadpool in DAGAppMaster (#92)
41ff945 is described below
commit 41ff94583e6433402e2a7cc183269d2582f0302f
Author: belugabehr <12...@users.noreply.github.com>
AuthorDate: Thu Jan 28 15:33:42 2021 -0500
TEZ-4269: Re-Work Threadpool in DAGAppMaster (#92)
---
.../main/java/org/apache/tez/dag/app/AppContext.java | 5 +----
.../main/java/org/apache/tez/dag/app/DAGAppMaster.java | 17 ++++-------------
2 files changed, 5 insertions(+), 17 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index fc4ddcf..c9a7083 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.app;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -74,9 +74,6 @@ public interface AppContext {
DAG getCurrentDAG();
- // For testing only!
- ThreadPoolExecutor getThreadPool();
-
ListeningExecutorService getExecService();
void setDAG(DAG dag);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index ba072a9..dbcefe9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -48,10 +48,10 @@ import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -303,7 +303,6 @@ public class DAGAppMaster extends AbstractService {
private Path tezSystemStagingDir;
private FileSystem recoveryFS;
- private ThreadPoolExecutor rawExecutor;
private ListeningExecutorService execService;
// TODO May not need to be a bidi map
@@ -621,9 +620,9 @@ public class DAGAppMaster extends AbstractService {
TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
// NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus
// occupy large memory chunks when numerous Runables are pending for execution
- rawExecutor = new ThreadPoolExecutor(threadCount, threadCount,
- 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build());
+ ExecutorService rawExecutor =
+ Executors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder()
+ .setDaemon(true).setNameFormat("App Shared Pool - #%d").build());
execService = MoreExecutors.listeningDecorator(rawExecutor);
initServices(conf);
@@ -1504,14 +1503,6 @@ public class DAGAppMaster extends AbstractService {
}
@Override
- // For Testing only!
- public ThreadPoolExecutor getThreadPool() {
- synchronized (DAGAppMaster.this) {
- return rawExecutor;
- }
- }
-
- @Override
public ListeningExecutorService getExecService() {
return execService;
}