You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/08/24 12:44:58 UTC

[tez] branch master updated: TEZ-4213: Bound appContext executor capacity using a configurable property (Panagiotis Garefalakis reviewed by Ashutosh Chauhan, Mustafa Iman, Attila Magyar)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new ef1d69a  TEZ-4213: Bound appContext executor capacity using a configurable property (Panagiotis Garefalakis reviewed by Ashutosh Chauhan, Mustafa Iman, Attila Magyar)
ef1d69a is described below

commit ef1d69aaf48f068aaf7a2c6b4eb72d320d5c2d1a
Author: Panagiotis Garefalakis <pa...@gmail.com>
AuthorDate: Mon Aug 24 14:37:53 2020 +0200

    TEZ-4213: Bound appContext executor capacity using a configurable property (Panagiotis Garefalakis reviewed by Ashutosh Chauhan, Mustafa Iman, Attila Magyar)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../org/apache/tez/dag/api/TezConfiguration.java   | 10 +++
 .../java/org/apache/tez/dag/app/AppContext.java    |  6 +-
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  | 22 ++++-
 .../dag/app/dag/RootInputInitializerManager.java   |  4 +-
 .../org/apache/tez/dag/app/TestDAGAppMaster.java   |  4 +-
 .../app/dag/TestRootInputInitializerManager.java   | 94 +++++++++++++++++++++-
 6 files changed, 127 insertions(+), 13 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 7dc412b..d5e5e73 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -847,6 +847,16 @@ public class TezConfiguration extends Configuration {
 
   public static final int TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT = 10;
 
+  /**
+   * Int value. Upper limit on the number of threads used by app context (vertex management and input init events).
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT =
+          TEZ_AM_PREFIX + "dag.appcontext.thread-count-limit";
+
+  public static final int TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT = 10;
+
   /** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across
    * all vertices. Setting it to the same value for all tasks is helpful for container reuse and
    * thus good for performance typically. */
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 4eb2ae2..fc4ddcf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app;
 
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -72,7 +73,10 @@ public interface AppContext {
   String getUser();
 
   DAG getCurrentDAG();
-  
+
+  // For testing only!
+  ThreadPoolExecutor getThreadPool();
+
   ListeningExecutorService getExecService();
 
   void setDAG(DAG dag);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index fcfb883..5400668 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -47,9 +47,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -302,7 +303,7 @@ public class DAGAppMaster extends AbstractService {
   private Path tezSystemStagingDir;
   private FileSystem recoveryFS;
 
-  private ExecutorService rawExecutor;
+  private ThreadPoolExecutor rawExecutor;
   private ListeningExecutorService execService;
 
   // TODO May not need to be a bidi map
@@ -621,8 +622,13 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
-    rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("App Shared Pool - " + "#%d").build());
+    int threadCount = conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
+            TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
+    // NOTE: LinkedBlockingQueue does not have a capacity Limit and can thus
+    // occupy large memory chunks when numerous Runables are pending for execution
+    rawExecutor = new ThreadPoolExecutor(threadCount, threadCount,
+            60L, TimeUnit.SECONDS, new LinkedBlockingQueue(),
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("App Shared Pool - " + "#%d").build());
     execService = MoreExecutors.listeningDecorator(rawExecutor);
 
     initServices(conf);
@@ -1504,6 +1510,14 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    // For Testing only!
+    public ThreadPoolExecutor getThreadPool() {
+      synchronized (DAGAppMaster.this) {
+        return rawExecutor;
+      }
+    }
+
+    @Override
     public ListeningExecutorService getExecService() {
       return execService;
     }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 9194c1d..bd4bcd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -121,7 +121,7 @@ public class RootInputInitializerManager {
    * @param inputs
    * @return
    */
-  private List<InitializerWrapper> createInitializerWrappers(
+  protected List<InitializerWrapper> createInitializerWrappers(
           List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) {
     String current = null;
     final List<InitializerWrapper> result = Collections.synchronizedList(new ArrayList<>());
@@ -161,7 +161,7 @@ public class RootInputInitializerManager {
    * @param pendingInitializerEvents
    * @param result
    */
-  private void createAndStartInitializing(List<TezEvent> pendingInitializerEvents, List<InitializerWrapper> result) {
+  protected void createAndStartInitializing(List<TezEvent> pendingInitializerEvents, List<InitializerWrapper> result) {
     handleInitializerEvents(pendingInitializerEvents);
     pendingInitializerEvents.clear();
     for (InitializerWrapper inputWrapper : result) {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
index 4adf310..d8167db 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java
@@ -627,7 +627,7 @@ public class TestDAGAppMaster {
     }
   }
 
-  private static class DAGAppMasterForTest extends DAGAppMaster {
+  public static class DAGAppMasterForTest extends DAGAppMaster {
     private DAGAppMasterShutdownHandler mockShutdown;
     private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class);
 
@@ -638,7 +638,7 @@ public class TestDAGAppMaster {
           new TezDagVersionInfo().getVersion(), createCredentials(), "jobname", null);
     }
 
-    private static Credentials createCredentials() {
+    public static Credentials createCredentials() {
       Credentials creds = new Credentials();
       JobTokenSecretManager jtsm = new JobTokenSecretManager();
       JobTokenIdentifier jtid = new JobTokenIdentifier(new Text());
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
index 01cc37f..000f077 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/TestRootInputInitializerManager.java
@@ -14,7 +14,9 @@
 
 package org.apache.tez.dag.app.dag;
 
+import static org.apache.tez.dag.app.TestDAGAppMaster.DAGAppMasterForTest.createCredentials;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
@@ -25,26 +27,43 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.TezApiVersionInfo;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -62,20 +81,26 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 public class TestRootInputInitializerManager {
-  ListeningExecutorService execService;
+
+  private static final File TEST_DIR = new File(System.getProperty("test.build.data"),
+          TestRootInputInitializerManager.class.getName()).getAbsoluteFile();
+  private static ListeningExecutorService execService;
 
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
     ExecutorService rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
             .setDaemon(true).setNameFormat("Test App Shared Pool - " + "#%d").build());
     execService = MoreExecutors.listeningDecorator(rawExecutor);
+    FileUtil.fullyDelete(TEST_DIR);
+    TEST_DIR.mkdirs();
   }
 
   @After
-  public void tearDown() throws Exception {
+  public void tearDown() {
     if (execService != null) {
       execService.shutdownNow();
     }
+    FileUtil.fullyDelete(TEST_DIR);
   }
 
   // Simple testing. No events if task doesn't succeed.
@@ -251,7 +276,68 @@ public class TestRootInputInitializerManager {
     InputInitializerForUgiTest.awaitInitialize();
 
     assertEquals(dagUgi, InputInitializerForUgiTest.ctorUgi);
-    assertEquals(dagUgi, InputInitializerForUgiTest.initializeUgi);
+    assertEquals(dagUgi.getRealUser(), InputInitializerForUgiTest.initializeUgi.getRealUser());
+  }
+
+  @Test (timeout = 10000)
+  public synchronized void testParallelInputInitialization() throws InterruptedException, IOException {
+    // Create Local DAGAppMaster with default conf
+    Configuration conf = new Configuration(true);
+    conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+    conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    FSDataOutputStream sessionJarsPBOutStream =
+            TezCommonUtils.createFileForAM(fs, new Path(TEST_DIR.toString(),
+                    TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
+    DAGProtos.PlanLocalResourcesProto.getDefaultInstance()
+            .writeDelimitedTo(sessionJarsPBOutStream);
+    sessionJarsPBOutStream.close();
+
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    DAGAppMaster am = new DAGAppMaster(attemptId,
+            ContainerId.newContainerId(attemptId, 1),
+            "127.0.0.1", 0, 0, new SystemClock(), 1, true,
+            TEST_DIR.toString(), new String[] {TEST_DIR.toString()},
+            new String[] {TEST_DIR.toString()},
+            new TezApiVersionInfo().getVersion(), createCredentials(),
+            "someuser", null);
+    am.init(conf);
+
+    Vertex vertex = mock(Vertex.class);
+    doReturn(mock(TezVertexID.class)).when(vertex).getVertexId();
+    UserGroupInformation dagUgi = UserGroupInformation.createRemoteUser("fakeuser");
+    StateChangeNotifier stateChangeNotifier = mock(StateChangeNotifier.class);
+    RootInputInitializerManager rootInputInitializerManager =
+            new RootInputInitializerManager(vertex, am.getContext(), dagUgi, stateChangeNotifier);
+
+    List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inlist = new LinkedList();
+    // Make sure we dont have any OOM issue by controlling the capacity of the thread pool
+    // and also block producer (createInitializerWrapper when resources are saturated)
+    InputDescriptor id = mock(InputDescriptor.class);
+    InputInitializerDescriptor iid = InputInitializerDescriptor.create(InputInitializerForUgiTest.class.getName());
+    for (int i=0; i < 10000; i++) {
+      RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInput =
+              new RootInputLeafOutput<>("InputName"+i, id, iid);
+      inlist.add(rootInput);
+    }
+
+    List<RootInputInitializerManager.InitializerWrapper> initWrappers =
+            rootInputInitializerManager.createInitializerWrappers(inlist);
+
+    int max_thread_size = conf.getInt(TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT,
+            TezConfiguration.TEZ_AM_DAG_APPCONTEXT_THREAD_COUNT_LIMIT_DEFAULT);
+    ThreadPoolExecutor amThreadPool = am.getContext().getThreadPool();
+
+    rootInputInitializerManager.executor.submit(()
+            -> rootInputInitializerManager.createAndStartInitializing(Collections.emptyList(), initWrappers));
+
+    while (am.getContext().getThreadPool().getQueue().size() > 0) {
+      assertTrue(amThreadPool.getPoolSize() <= max_thread_size);
+      Thread.sleep(100);
+    }
   }
 
   public static class InputInitializerForUgiTest extends InputInitializer {