You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/08/24 12:44:58 UTC
[tez] branch master updated: TEZ-4213: Bound appContext executor
capacity using a configurable property (Panagiotis Garefalakis reviewed by
Ashutosh Chauhan, Mustafa Iman, Attila Magyar)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new ef1d69a TEZ-4213: Bound appContext executor capacity using a configurable property (Panagiotis Garefalakis reviewed by Ashutosh Chauhan, Mustafa Iman, Attila Magyar)
ef1d69a is described below
commit ef1d69aaf48f068aaf7a2c6b4eb72d320d5c2d1a
Author: Panagiotis Garefalakis <pa...@gmail.com>
AuthorDate: Mon Aug 24 14:37:53 2020 +0200
TEZ-4213: Bound appContext executor capacity using a configurable property (Panagiotis Garefalakis reviewed by Ashutosh Chauhan, Mustafa Iman, Attila Magyar)
Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
.../org/apache/tez/dag/api/TezConfiguration.java | 10 +++
.../java/org/apache/tez/dag/app/AppContext.java | 6 +-
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 22 ++++-
.../dag/app/dag/RootInputInitializerManager.java | 4 +-
.../org/apache/tez/dag/app/TestDAGAppMaster.java | 4 +-
.../app/dag/TestRootInputInitializerManager.java | 94 +++++++++++++++++++++-
6 files changed, 127 insertions(+), 13 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 7dc412b..d5e5e73 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -847,6 +847,16 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT = 10;
+ /**
+ * Int value. Upper limit on the number of threads used by app context (vertex management and input init events).
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="integer")
+ public static final String TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT =
+ TEZ_AM_PREFIX + "dag.appcontext.thread-count-limit";
+
+ public static final int TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT = 10;
+
/** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across
* all vertices. Setting it to the same value for all tasks is helpful for container reuse and
* thus good for performance typically. */
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 4eb2ae2..fc4ddcf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -72,7 +73,10 @@ public interface AppContext {
String getUser();
DAG getCurrentDAG();
-
+
+ // For testing only!
+ ThreadPoolExecutor getThreadPool();
+
ListeningExecutorService getExecService();
void setDAG(DAG dag);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index fcfb883..5400668 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -47,9 +47,10 @@ import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -302,7 +303,7 @@ public class DAGAppMaster extends AbstractService {
private Path tezSystemStagingDir;
private FileSystem recoveryFS;
- private ExecutorService rawExecutor;
+ private ThreadPoolExecutor rawExecutor;
private ListeningExecutorService execService;
// TODO May not need to be a bidi map
@@ -621,8 +622,13 @@ public class DAGAppMaster extends AbstractService {
}
}
- rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("App Shared Pool - " + "#%d").build());
+ int threadCount = conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
+ TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
+ // NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus
+ // occupy large memory chunks when numerous Runables are pending for execution
+ rawExecutor = new ThreadPoolExecutor(threadCount, threadCount,
+ 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build());
execService = MoreExecutors.listeningDecorator(rawExecutor);
initServices(conf);
@@ -1504,6 +1510,14 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ // For Testing only!
+ public ThreadPoolExecutor getThreadPool() {
+ synchronized (DAGAppMaster.this) {
+ return rawExecutor;
+ }
+ }
+
+ @Override
public ListeningExecutorService getExecService() {
return execService;
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 9194c1d..bd4bcd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -121,7 +121,7 @@ public class RootInputInitializerManager {
* @param inputs
* @return
*/
- private List<InitializerWrapper> createInitializerWrappers(
+ protected List<InitializerWrapper> createInitializerWrappers(
List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) {
String current = null;
final List<InitializerWrapper> result = Collections.synchronizedList(new ArrayList<>());
@@ -161,7 +161,7 @@ public class RootInputInitializerManager {
* @param pendingInitializerEvents
* @param result
*/
- private void createAndStartInitializing(List<TezEvent> pendingInitializerEvents, List<InitializerWrapper> result) {
+ protected void createAndStartInitializing(List<TezEvent> pendingInitializerEvents, List<InitializerWrapper> result) {
handleInitializerEvents(pendingInitializerEvents);
pendingInitializerEvents.clear();
for (InitializerWrapper inputWrapper : result) {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 4adf310..d8167db 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -627,7 +627,7 @@ public class TestDAGAppMaster {
}
}
- private static class DAGAppMasterForTest extends DAGAppMaster {
+ public static class DAGAppMasterForTest extends DAGAppMaster {
private DAGAppMasterShutdownHandler mockShutdown;
private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class);
@@ -638,7 +638,7 @@ public class TestDAGAppMaster {
new TezDagVersionInfo().getVersion(), createCredentials(), "jobname", null);
}
- private static Credentials createCredentials() {
+ public static Credentials createCredentials() {
Credentials creds = new Credentials();
JobTokenSecretManager jtsm = new JobTokenSecretManager();
JobTokenIdentifier jtid = new JobTokenIdentifier(new Text());
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
index 01cc37f..000f077 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -14,7 +14,9 @@
package org.apache.tez.dag.app.dag;
+import static org.apache.tez.dag.app.TestDAGAppMaster.DAGAppMasterForTest.createCredentials;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
@@ -25,26 +27,43 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.TezApiVersionInfo;
+import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -62,20 +81,26 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestRootInputInitializerManager {
- ListeningExecutorService execService;
+
+ private static final File TEST_DIR = new File(System.getProperty("test.build.data"),
+ TestRootInputInitializerManager.class.getName()).getAbsoluteFile();
+ private static ListeningExecutorService execService;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
ExecutorService rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("Test App Shared Pool - " + "#%d").build());
execService = MoreExecutors.listeningDecorator(rawExecutor);
+ FileUtil.fullyDelete(TEST_DIR);
+ TEST_DIR.mkdirs();
}
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (execService != null) {
execService.shutdownNow();
}
+ FileUtil.fullyDelete(TEST_DIR);
}
// Simple testing. No events if task doesn't succeed.
@@ -251,7 +276,68 @@ public class TestRootInputInitializerManager {
InputInitializerForUgiTest.awaitInitialize();
assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi);
- assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi);
+ assertEquals(dagUgi.getRealUser(), InputInitializerForUgiTest.initializeUgi.getRealUser());
+ }
+
+ @Test (timeout = 10000)
+ public synchronized void testParallelInputInitialization() throws InterruptedException, IOException {
+ // Create Local DAGAppMaster with default conf
+ Configuration conf = new Configuration(true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ FSDataOutputStream sessionJarsPBOutStream =
+ TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(),
+ TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+ DAGProtos.PlanLocalResourcesProto.getDefaultInstance()
+ .writeDelimitedTo(sessionJarsPBOutStream);
+ sessionJarsPBOutStream.close();
+
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ DAGAppMaster am = new DAGAppMaster(attemptId,
+ ContainerId.newContainerId(attemptId, 1),
+ "127.0.0.1", 0, 0, new SystemClock(), 1, true,
+ TEST_DIR.toString(), new String[] {TEST_DIR.toString()},
+ new String[] {TEST_DIR.toString()},
+ new TezApiVersionInfo().getVersion(), createCredentials(),
+ "someuser", null);
+ am.init(conf);
+
+ Vertex vertex = mock(Vertex.class);
+ doReturn(mock(TezVertexID.class)).when(vertex).getVertexId();
+ UserGroupInformation dagUgi = UserGroupInformation.createRemoteUser("fakeuser");
+ StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+ RootInputInitializerManager rootInputInitializerManager =
+ new RootInputInitializerManager(vertex, am.getContext(), dagUgi, stateChangeNotifier);
+
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inlist = new LinkedList();
+ // Make sure we dont have any OOM issue by controlling the capacity of the thread pool
+ // and also block producer (createInitializerWrapper when resources are saturated)
+ InputDescriptor id = mock(InputDescriptor.class);
+ InputInitializerDescriptor iid = InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
+ for (int i=0; i < 10000; i++) {
+ RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+ new RootInputLeafOutput<>("InputName"+i, id, iid);
+ inlist.add(rootInput);
+ }
+
+ List<RootInputInitializerManager.InitializerWrapper> initWrappers =
+ rootInputInitializerManager.createInitializerWrappers(inlist);
+
+ int max_thread_size = conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
+ TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
+ ThreadPoolExecutor amThreadPool = am.getContext().getThreadPool();
+
+ rootInputInitializerManager.executor.submit(()
+ -> rootInputInitializerManager.createAndStartInitializing(Collections.emptyList(), initWrappers));
+
+ while (am.getContext().getThreadPool().getQueue().size() > 0) {
+ assertTrue(amThreadPool.getPoolSize() <= max_thread_size);
+ Thread.sleep(100);
+ }
}
public static class InputInitializerForUgiTest extends InputInitializer {