You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:09 UTC

[31/43] tez git commit: TEZ-2125. Create a task communicator for local mode. Allow tasks to run in the AM. (sseth)

TEZ-2125. Create a task communicator for local mode. Allow tasks to run
in the AM. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a45ef858
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a45ef858
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a45ef858

Branch: refs/heads/TEZ-2003
Commit: a45ef85833df44750296430f9b85f42574795578
Parents: c47951a
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 20 16:12:52 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 8 14:41:30 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 25 +++++---
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 18 ++++--
 .../dag/app/TezLocalTaskCommunicatorImpl.java   | 46 ++++++++++++++
 .../tez/dag/app/TezTaskCommunicatorImpl.java    | 67 ++++++++------------
 .../app/launcher/ContainerLauncherRouter.java   | 17 +++--
 .../app/launcher/LocalContainerLauncher.java    | 31 ++++++---
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  2 +
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  3 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |  2 +-
 .../tez/service/impl/ContainerRunnerImpl.java   |  2 +-
 .../tez/tests/TestExternalTezServices.java      | 57 +++++++++++++----
 .../org/apache/tez/runtime/task/TezChild.java   | 34 +++++-----
 13 files changed, 204 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 1a2264c..76496c9 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -5,5 +5,6 @@ ALL CHANGES:
   TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
   TEZ-2122. Setup pluggable components at AM/Vertex level.
   TEZ-2123. Fix component managers to use pluggable components. (Enable hybrid mode)
+  TEZ-2125. Create a task communicator for local mode. Allow tasks to run in the AM.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 89b6506..701eca8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -465,7 +465,7 @@ public class DAGAppMaster extends AbstractService {
 
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context,
-        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers);
+        taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers, isLocal);
     addIfService(taskAttemptListener, true);
 
     containerSignatureMatcher = createContainerSignatureMatcher();
@@ -531,7 +531,7 @@ public class DAGAppMaster extends AbstractService {
         taskSchedulerEventHandler);
     addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
 
-    this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers);
+    this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers, isLocal);
     addIfService(containerLauncherRouter, true);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
 
@@ -1038,9 +1038,13 @@ public class DAGAppMaster extends AbstractService {
   }
 
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
-      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, String[] taskCommunicatorClasses) {
+                                                          TaskHeartbeatHandler thh,
+                                                          ContainerHeartbeatHandler chh,
+                                                          String[] taskCommunicatorClasses,
+                                                          boolean isLocal) {
     TaskAttemptListener lis =
-        new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager, taskCommunicatorClasses);
+        new TaskAttemptListenerImpTezDag(context, thh, chh, jobTokenSecretManager,
+            taskCommunicatorClasses, isLocal);
     return lis;
   }
 
@@ -1061,10 +1065,12 @@ public class DAGAppMaster extends AbstractService {
     return chh;
   }
 
-  protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, String []containerLauncherClasses) throws
+  protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf,
+                                                                  String[] containerLauncherClasses,
+                                                                  boolean isLocal) throws
       UnknownHostException {
-    return  new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, containerLauncherClasses);
-
+    return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory,
+        containerLauncherClasses, isLocal);
   }
 
   public ApplicationId getAppID() {
@@ -2331,9 +2337,8 @@ public class DAGAppMaster extends AbstractService {
     StringBuilder sb = new StringBuilder();
     sb.append("AM Level configured ").append(component).append(": ");
     for (int i = 0; i < classIdentifiers.length; i++) {
-      sb.append("[").append(i).append(":").append(map.inverse().get(i)).append(":")
-          .append(taskSchedulers.inverse().get(i)).append(
-          "]");
+      sb.append("[").append(i).append(":").append(map.inverse().get(i))
+          .append(":").append(classIdentifiers[i]).append("]");
       if (i != classIdentifiers.length - 1) {
         sb.append(",");
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d21b7d0..8346839 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -99,13 +99,20 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
                                       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
                                       // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager.
                                       JobTokenSecretManager jobTokenSecretManager,
-                                      String [] taskCommunicatorClassIdentifiers) {
+                                      String [] taskCommunicatorClassIdentifiers,
+                                      boolean isPureLocalMode) {
     super(TaskAttemptListenerImpTezDag.class.getName());
     this.context = context;
     this.taskHeartbeatHandler = thh;
     this.containerHeartbeatHandler = chh;
     if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) {
-      taskCommunicatorClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+      if (isPureLocalMode) {
+        taskCommunicatorClassIdentifiers =
+            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+      } else {
+        taskCommunicatorClassIdentifiers =
+            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+      }
     }
     this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
     for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
@@ -131,11 +138,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) {
-    if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
-        taskCommClassIdentifier
-            .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+    if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Using Default Task Communicator");
       return new TezTaskCommunicatorImpl(this);
+    } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+      LOG.info("Using Default Local Task Communicator");
+      return new TezLocalTaskCommunicatorImpl(this);
     } else {
       LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
       Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
new file mode 100644
index 0000000..3704cc4
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class TezLocalTaskCommunicatorImpl extends TezTaskCommunicatorImpl {
+
+  private static final Log LOG = LogFactory.getLog(TezLocalTaskCommunicatorImpl.class);
+
+  public TezLocalTaskCommunicatorImpl(
+      TaskCommunicatorContext taskCommunicatorContext) {
+    super(taskCommunicatorContext);
+  }
+
+  @Override
+  protected void startRpcServer() {
+    try {
+      this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
+    } catch (UnknownHostException e) {
+      throw new TezUncheckedException(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 258c927..0bf1b5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -15,10 +15,8 @@
 package org.apache.tez.dag.app;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -76,7 +74,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
   private final TezTaskUmbilicalProtocol taskUmbilical;
   private final String tokenIdentifier;
   private final Token<JobTokenIdentifier> sessionToken;
-  private InetSocketAddress address;
+  protected InetSocketAddress address;
   private Server server;
 
   public static final class ContainerInfo {
@@ -120,10 +118,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
   }
 
-
   @Override
   public void serviceStart() {
-
     startRpcServer();
   }
 
@@ -134,43 +130,32 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
   protected void startRpcServer() {
     Configuration conf = getConfig();
-    if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
-      try {
-        JobTokenSecretManager jobTokenSecretManager =
-            new JobTokenSecretManager();
-        jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
-
-        server = new RPC.Builder(conf)
-            .setProtocol(TezTaskUmbilicalProtocol.class)
-            .setBindAddress("0.0.0.0")
-            .setPort(0)
-            .setInstance(taskUmbilical)
-            .setNumHandlers(
-                conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
-                    TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
-            .setSecretManager(jobTokenSecretManager).build();
-
-        // Enable service authorization?
-        if (conf.getBoolean(
-            CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
-            false)) {
-          refreshServiceAcls(conf, new TezAMPolicyProvider());
-        }
-
-        server.start();
-        this.address = NetUtils.getConnectAddress(server);
-      } catch (IOException e) {
-        throw new TezUncheckedException(e);
-      }
-    } else {
-      try {
-        this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
-      } catch (UnknownHostException e) {
-        throw new TezUncheckedException(e);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
+    try {
+      JobTokenSecretManager jobTokenSecretManager =
+          new JobTokenSecretManager();
+      jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
+
+      server = new RPC.Builder(conf)
+          .setProtocol(TezTaskUmbilicalProtocol.class)
+          .setBindAddress("0.0.0.0")
+          .setPort(0)
+          .setInstance(taskUmbilical)
+          .setNumHandlers(
+              conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+                  TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+          .setSecretManager(jobTokenSecretManager).build();
+
+      // Enable service authorization?
+      if (conf.getBoolean(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+          false)) {
+        refreshServiceAcls(conf, new TezAMPolicyProvider());
       }
+
+      server.start();
+      this.address = NetUtils.getConnectAddress(server);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 4f9b5bf..70b0cbc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
@@ -49,17 +48,24 @@ public class ContainerLauncherRouter extends AbstractService
   public ContainerLauncherRouter(Configuration conf, AppContext context,
                                  TaskAttemptListener taskAttemptListener,
                                  String workingDirectory,
-                                 String[] containerLauncherClassIdentifiers) throws UnknownHostException {
+                                 String[] containerLauncherClassIdentifiers,
+                                 boolean isPureLocalMode) throws UnknownHostException {
     super(ContainerLauncherRouter.class.getName());
 
     if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
-      containerLauncherClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+      if (isPureLocalMode) {
+        containerLauncherClassIdentifiers =
+            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+      } else {
+        containerLauncherClassIdentifiers =
+            new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+      }
     }
     containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
 
     for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
       containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
-          taskAttemptListener, workingDirectory, conf);
+          taskAttemptListener, workingDirectory, isPureLocalMode, conf);
     }
   }
 
@@ -67,6 +73,7 @@ public class ContainerLauncherRouter extends AbstractService
                                                     AppContext context,
                                                     TaskAttemptListener taskAttemptListener,
                                                     String workingDirectory,
+                                                    boolean isPureLocalMode,
                                                     Configuration conf) throws
       UnknownHostException {
     if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
@@ -76,7 +83,7 @@ public class ContainerLauncherRouter extends AbstractService
         .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Creating LocalContainerLauncher");
       return
-          new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
+          new LocalContainerLauncher(context, taskAttemptListener, workingDirectory, isPureLocalMode);
     } else {
       LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
       Class<? extends ContainerLauncher> containerLauncherClazz =

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 9a38732..18b2e35 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -36,6 +36,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -90,9 +91,10 @@ public class LocalContainerLauncher extends AbstractService implements
   private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
   private final String workingDirectory;
   private final TaskAttemptListener tal;
-  private final Map<String, String> localEnv = new HashMap<String, String>();
+  private final Map<String, String> localEnv;
   private final ExecutionContext executionContext;
   private int numExecutors;
+  private final boolean isPureLocalMode;
 
   private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>
       runningContainers =
@@ -112,16 +114,26 @@ public class LocalContainerLauncher extends AbstractService implements
 
   public LocalContainerLauncher(AppContext context,
                                 TaskAttemptListener taskAttemptListener,
-                                String workingDirectory) throws UnknownHostException {
+                                String workingDirectory,
+                                boolean isPureLocalMode) throws UnknownHostException {
     super(LocalContainerLauncher.class.getName());
     this.context = context;
     this.tal = taskAttemptListener;
 
     this.workingDirectory = workingDirectory;
-    AuxiliaryServiceHelper.setServiceDataIntoEnv(
-        ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
-    executionContext = new ExecutionContextImpl(InetAddress.getLocalHost().getHostName());
-    // User cannot be set here since it isn't available till a DAG is running.
+    this.isPureLocalMode = isPureLocalMode;
+    if (isPureLocalMode) {
+      localEnv = Maps.newHashMap();
+      AuxiliaryServiceHelper.setServiceDataIntoEnv(
+          ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
+    } else {
+      localEnv = System.getenv();
+    }
+
+    // Check if the hostname is set in the environment before overriding it.
+    String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() :
+        System.getenv(Environment.NM_HOST.name());
+    executionContext = new ExecutionContextImpl(host);
   }
 
   @Override
@@ -338,7 +350,9 @@ public class LocalContainerLauncher extends AbstractService implements
       InterruptedException, TezException, IOException {
     Map<String, String> containerEnv = new HashMap<String, String>();
     containerEnv.putAll(localEnv);
-    containerEnv.put(Environment.USER.name(), context.getUser());
+    // Use the user from env if it's available.
+    String user = isPureLocalMode ? System.getenv(Environment.USER.name()) : context.getUser();
+    containerEnv.put(Environment.USER.name(), user);
 
     long memAvailable;
     synchronized (this) { // needed to fix findbugs Inconsistent synchronization warning
@@ -347,8 +361,7 @@ public class LocalContainerLauncher extends AbstractService implements
     TezChild tezChild =
         TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
             attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials,
-            memAvailable, context.getUser());
-    tezChild.setUmbilical(tezTaskUmbilicalProtocol);
+            memAvailable, context.getUser(), tezTaskUmbilicalProtocol);
     return tezChild;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 5a0ace8..5a8e9fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -434,6 +434,8 @@ public class TaskSchedulerEventHandler extends AbstractService
       } else {
         customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
       }
+      LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" +
+          customAppIdIdentifier);
       taskSchedulers[i] = createTaskScheduler(host, port,
           trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 59efb87..4f014a4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -511,7 +511,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
   // use mock container launcher for tests
   @Override
   protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
-                                                                  String[] containerLaunchers)
+                                                                  String[] containerLaunchers,
+                                                                  boolean isLocal)
       throws UnknownHostException {
     return new ContainerLauncherRouter(containerLauncher);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 98fcddc..0cf1959 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -119,7 +119,7 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(container).when(amContainer).getContainer();
 
     taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
-        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null);
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
 
     taskSpec = mock(TaskSpec.class);
     doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index 4a6ce33..25d6030 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -282,7 +282,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
               request.getContainerIdString(),
               request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs,
               envMap, objectRegistry, pid,
-              executionContext, credentials, memoryAvailable, request.getUser());
+              executionContext, credentials, memoryAvailable, request.getUser(), null);
       ContainerExecutionResult result = tezChild.run();
       LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
           sw.stop().elapsedMillis());

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 9c149c6..01c2080 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -40,6 +40,7 @@ import org.apache.tez.service.MiniTezTestServiceCluster;
 import org.apache.tez.test.MiniTezCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestExternalTezServices {
@@ -120,26 +121,23 @@ public class TestExternalTezServices {
 
     confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
         TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-//        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
         EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
 
     confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
         TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-//        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
         EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
 
     confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
         TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-//        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
         EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
 
     // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level.
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
-        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
 
     // Setup various executor sets
     PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
@@ -232,18 +230,55 @@ public class TestExternalTezServices {
 
   @Test(timeout = 60000)
   public void testMixed1() throws Exception { // M-ExtService, R-containers
-    int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 3 for num reducers.
+    int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers.
     runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
         PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS);
   }
 
   @Test(timeout = 60000)
   public void testMixed2() throws Exception { // M-Containers, R-ExtService
-    int expectedExternalSubmissions = 0 + 3; //4 for 4 src files, 3 for num reducers.
+    int expectedExternalSubmissions = 0 + 3; // 3 for num reducers.
     runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
         PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH);
   }
 
+  @Test(timeout = 60000)
+  public void testMixed3() throws Exception { // M - service, R-AM
+    int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers (in-AM).
+    runJoinValidate("Mixed3", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
+        PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+  }
+
+  @Test(timeout = 60000)
+  public void testMixed4() throws Exception { // M - containers, R-AM
+    int expectedExternalSubmissions = 0 + 0; // Nothing in external service.
+    runJoinValidate("Mixed4", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+        PROPS_REGULAR_CONTAINERS, PROPS_IN_AM);
+  }
+
+  @Test(timeout = 60000)
+  public void testMixed5() throws Exception { // M1 - containers, M2-extservice, R-AM
+    int expectedExternalSubmissions = 2 + 0; // 2 for M2
+    runJoinValidate("Mixed5", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+        PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+  }
+
+
+  @Ignore // Re-activate this after the AM registers the shuffle token with the launcher.
+  @Test(timeout = 60000)
+  public void testMixed6() throws Exception { // M - AM, R - Service
+    int expectedExternalSubmissions = 0 + 3; // 3 for R in service
+    runJoinValidate("Mixed6", expectedExternalSubmissions, PROPS_IN_AM,
+        PROPS_IN_AM, PROPS_EXT_SERVICE_PUSH);
+  }
+
+  @Test(timeout = 60000)
+  public void testMixed7() throws Exception { // M - AM, R - Containers
+    int expectedExternalSubmissions = 0; // Nothing in ext service
+    runJoinValidate("Mixed7", expectedExternalSubmissions, PROPS_IN_AM,
+        PROPS_IN_AM, PROPS_REGULAR_CONTAINERS);
+  }
+
 
   private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
                                Map<String, String> rhsProps,

http://git-wip-us.apache.org/repos/asf/tez/blob/a45ef858/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 3cba3ce..7615f08 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -64,6 +64,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.runtime.api.ExecutionContext;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,7 +94,6 @@ public class TezChild {
   private final int amHeartbeatInterval;
   private final long sendCounterInterval;
   private final int maxEventsToGet;
-  private final boolean isLocal;
   private final String workingDir;
 
   private final ListeningExecutorService executor;
@@ -108,9 +108,10 @@ public class TezChild {
   private final String user;
 
   private Multimap<String, String> startedInputsMap = HashMultimap.create();
+  private final boolean ownUmbilical;
 
+  private final TezTaskUmbilicalProtocol umbilical;
   private TaskReporter taskReporter;
-  private TezTaskUmbilicalProtocol umbilical;
   private int taskCount = 0;
   private TezVertexID lastVertexID;
 
@@ -119,7 +120,7 @@ public class TezChild {
       Map<String, String> serviceProviderEnvMap,
       ObjectRegistryImpl objectRegistry, String pid,
       ExecutionContext executionContext,
-      Credentials credentials, long memAvailable, String user)
+      Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical)
       throws IOException, InterruptedException {
     this.defaultConf = conf;
     this.containerIdString = containerIdentifier;
@@ -133,6 +134,8 @@ public class TezChild {
     this.memAvailable = memAvailable;
     this.user = user;
 
+    LOG.info("TezChild created with umbilical: " + umbilical);
+
     getTaskMaxSleepTime = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
@@ -161,25 +164,27 @@ public class TezChild {
       }
     }
 
-    this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
-        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
     UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
     Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
     serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
         TezCommonUtils.convertJobTokenToBytes(jobToken));
 
-    if (!isLocal) {
+    if (umbilical == null) {
       final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
       SecurityUtil.setTokenService(jobToken, address);
       taskOwner.addToken(jobToken);
-      umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+      this.umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
         @Override
         public TezTaskUmbilicalProtocol run() throws Exception {
           return RPC.getProxy(TezTaskUmbilicalProtocol.class,
               TezTaskUmbilicalProtocol.versionID, address, defaultConf);
         }
       });
+      ownUmbilical = true;
+    } else {
+      this.umbilical = umbilical;
+      ownUmbilical = false;
     }
   }
   
@@ -353,7 +358,7 @@ public class TezChild {
       if (taskReporter != null) {
         taskReporter.shutdown();
       }
-      if (!isLocal) {
+      if (ownUmbilical) {
         RPC.stopProxy(umbilical);
         // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible.
 //        LogManager.shutdown();
@@ -361,12 +366,6 @@ public class TezChild {
     }
   }
 
-  public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){
-    if(tezTaskUmbilicalProtocol != null){
-      this.umbilical = tezTaskUmbilicalProtocol;
-    }
-  }
-
   public static class ContainerExecutionResult {
     public static enum ExitStatus {
       SUCCESS(0),
@@ -412,7 +411,8 @@ public class TezChild {
   public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
       String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
       Map<String, String> serviceProviderEnvMap, @Nullable String pid,
-      ExecutionContext executionContext, Credentials credentials, long memAvailable, String user)
+      ExecutionContext executionContext, Credentials credentials, long memAvailable, String user,
+      TezTaskUmbilicalProtocol tezUmbilical)
       throws IOException, InterruptedException, TezException {
 
     // Pull in configuration specified for the session.
@@ -425,7 +425,7 @@ public class TezChild {
 
     return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
         attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
-        executionContext, credentials, memAvailable, user);
+        executionContext, credentials, memAvailable, user, tezUmbilical);
   }
 
   public static void main(String[] args) throws IOException, InterruptedException, TezException {
@@ -459,7 +459,7 @@ public class TezChild {
         tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
         System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
         credentials, Runtime.getRuntime().maxMemory(), System
-            .getenv(ApplicationConstants.Environment.USER.toString()));
+            .getenv(ApplicationConstants.Environment.USER.toString()), null);
     tezChild.run();
   }