You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:33 UTC
[31/50] [abbrv] tez git commit: TEZ-2125. Create a task communicator
for local mode. Allow tasks to run in the AM. (sseth)
TEZ-2125. Create a task communicator for local mode. Allow tasks to run
in the AM. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f7862e84
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f7862e84
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f7862e84
Branch: refs/heads/TEZ-2003
Commit: f7862e840f8e9ceefda4928c81a636ab6589bdab
Parents: 25980c1
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 20 16:12:52 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:13:29 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 25 +++++---
.../dag/app/TaskAttemptListenerImpTezDag.java | 18 ++++--
.../dag/app/TezLocalTaskCommunicatorImpl.java | 46 ++++++++++++++
.../tez/dag/app/TezTaskCommunicatorImpl.java | 67 ++++++++------------
.../app/launcher/ContainerLauncherRouter.java | 17 +++--
.../app/launcher/LocalContainerLauncher.java | 31 ++++++---
.../dag/app/rm/TaskSchedulerEventHandler.java | 2 +
.../apache/tez/dag/app/MockDAGAppMaster.java | 3 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 2 +-
.../tez/service/impl/ContainerRunnerImpl.java | 2 +-
.../tez/tests/TestExternalTezServices.java | 57 +++++++++++++----
.../org/apache/tez/runtime/task/TezChild.java | 34 +++++-----
13 files changed, 204 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 1a2264c..76496c9 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -5,5 +5,6 @@ ALL CHANGES:
TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
TEZ-2122. Setup pluggable components at AM/Vertex level.
TEZ-2123. Fix component managers to use pluggable components. (Enable hybrid mode)
+ TEZ-2125. Create a task communicator for local mode. Allow tasks to run in the AM.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 89b6506..701eca8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -465,7 +465,7 @@ public class DAGAppMaster extends AbstractService {
//service to handle requests to TaskUmbilicalProtocol
taskAttemptListener = createTaskAttemptListener(context,
- taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers);
+ taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers, isLocal);
addIfService(taskAttemptListener, true);
containerSignatureMatcher = createContainerSignatureMatcher();
@@ -531,7 +531,7 @@ public class DAGAppMaster extends AbstractService {
taskSchedulerEventHandler);
addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
- this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers);
+ this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers, isLocal);
addIfService(containerLauncherRouter, true);
dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
@@ -1038,9 +1038,13 @@ public class DAGAppMaster extends AbstractService {
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context,
- TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, String[] taskCommunicatorClasses) {
+ TaskHeartbeatHandler thh,
+ ContainerHeartbeatHandler chh,
+ String[] taskCommunicatorClasses,
+ boolean isLocal) {
TaskAttemptListener lis =
- new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager, taskCommunicatorClasses);
+ new TaskAttemptListenerImpTezDag(context, thh, chh, jobTokenSecretManager,
+ taskCommunicatorClasses, isLocal);
return lis;
}
@@ -1061,10 +1065,12 @@ public class DAGAppMaster extends AbstractService {
return chh;
}
- protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, String []containerLauncherClasses) throws
+ protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf,
+ String[] containerLauncherClasses,
+ boolean isLocal) throws
UnknownHostException {
- return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, containerLauncherClasses);
-
+ return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory,
+ containerLauncherClasses, isLocal);
}
public ApplicationId getAppID() {
@@ -2331,9 +2337,8 @@ public class DAGAppMaster extends AbstractService {
StringBuilder sb = new StringBuilder();
sb.append("AM Level configured ").append(component).append(": ");
for (int i = 0; i < classIdentifiers.length; i++) {
- sb.append("[").append(i).append(":").append(map.inverse().get(i)).append(":")
- .append(taskSchedulers.inverse().get(i)).append(
- "]");
+ sb.append("[").append(i).append(":").append(map.inverse().get(i))
+ .append(":").append(classIdentifiers[i]).append("]");
if (i != classIdentifiers.length - 1) {
sb.append(",");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 71b0d2a..1f0bb0e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -99,13 +99,20 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
// TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager.
JobTokenSecretManager jobTokenSecretManager,
- String [] taskCommunicatorClassIdentifiers) {
+ String [] taskCommunicatorClassIdentifiers,
+ boolean isPureLocalMode) {
super(TaskAttemptListenerImpTezDag.class.getName());
this.context = context;
this.taskHeartbeatHandler = thh;
this.containerHeartbeatHandler = chh;
if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) {
- taskCommunicatorClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ if (isPureLocalMode) {
+ taskCommunicatorClassIdentifiers =
+ new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+ } else {
+ taskCommunicatorClassIdentifiers =
+ new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ }
}
this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
@@ -131,11 +138,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) {
- if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) ||
- taskCommClassIdentifier
- .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+ if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Using Default Task Communicator");
return new TezTaskCommunicatorImpl(this);
+ } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
+ LOG.info("Using Default Local Task Communicator");
+ return new TezLocalTaskCommunicatorImpl(this);
} else {
LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier);
Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
new file mode 100644
index 0000000..3704cc4
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class TezLocalTaskCommunicatorImpl extends TezTaskCommunicatorImpl {
+
+ private static final Log LOG = LogFactory.getLog(TezLocalTaskCommunicatorImpl.class);
+
+ public TezLocalTaskCommunicatorImpl(
+ TaskCommunicatorContext taskCommunicatorContext) {
+ super(taskCommunicatorContext);
+ }
+
+ @Override
+ protected void startRpcServer() {
+ try {
+ this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
+ } catch (UnknownHostException e) {
+ throw new TezUncheckedException(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 258c927..0bf1b5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -15,10 +15,8 @@
package org.apache.tez.dag.app;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -76,7 +74,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
private final TezTaskUmbilicalProtocol taskUmbilical;
private final String tokenIdentifier;
private final Token<JobTokenIdentifier> sessionToken;
- private InetSocketAddress address;
+ protected InetSocketAddress address;
private Server server;
public static final class ContainerInfo {
@@ -120,10 +118,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
}
-
@Override
public void serviceStart() {
-
startRpcServer();
}
@@ -134,43 +130,32 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
protected void startRpcServer() {
Configuration conf = getConfig();
- if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
- try {
- JobTokenSecretManager jobTokenSecretManager =
- new JobTokenSecretManager();
- jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
-
- server = new RPC.Builder(conf)
- .setProtocol(TezTaskUmbilicalProtocol.class)
- .setBindAddress("0.0.0.0")
- .setPort(0)
- .setInstance(taskUmbilical)
- .setNumHandlers(
- conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
- TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
- .setSecretManager(jobTokenSecretManager).build();
-
- // Enable service authorization?
- if (conf.getBoolean(
- CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
- false)) {
- refreshServiceAcls(conf, new TezAMPolicyProvider());
- }
-
- server.start();
- this.address = NetUtils.getConnectAddress(server);
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
- } else {
- try {
- this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
- } catch (UnknownHostException e) {
- throw new TezUncheckedException(e);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
+ try {
+ JobTokenSecretManager jobTokenSecretManager =
+ new JobTokenSecretManager();
+ jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
+
+ server = new RPC.Builder(conf)
+ .setProtocol(TezTaskUmbilicalProtocol.class)
+ .setBindAddress("0.0.0.0")
+ .setPort(0)
+ .setInstance(taskUmbilical)
+ .setNumHandlers(
+ conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+ TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+ .setSecretManager(jobTokenSecretManager).build();
+
+ // Enable service authorization?
+ if (conf.getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ false)) {
+ refreshServiceAcls(conf, new TezAMPolicyProvider());
}
+
+ server.start();
+ this.address = NetUtils.getConnectAddress(server);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 4f9b5bf..70b0cbc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
@@ -49,17 +48,24 @@ public class ContainerLauncherRouter extends AbstractService
public ContainerLauncherRouter(Configuration conf, AppContext context,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
- String[] containerLauncherClassIdentifiers) throws UnknownHostException {
+ String[] containerLauncherClassIdentifiers,
+ boolean isPureLocalMode) throws UnknownHostException {
super(ContainerLauncherRouter.class.getName());
if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
- containerLauncherClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ if (isPureLocalMode) {
+ containerLauncherClassIdentifiers =
+ new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+ } else {
+ containerLauncherClassIdentifiers =
+ new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ }
}
containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
- taskAttemptListener, workingDirectory, conf);
+ taskAttemptListener, workingDirectory, isPureLocalMode, conf);
}
}
@@ -67,6 +73,7 @@ public class ContainerLauncherRouter extends AbstractService
AppContext context,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
+ boolean isPureLocalMode,
Configuration conf) throws
UnknownHostException {
if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
@@ -76,7 +83,7 @@ public class ContainerLauncherRouter extends AbstractService
.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Creating LocalContainerLauncher");
return
- new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
+ new LocalContainerLauncher(context, taskAttemptListener, workingDirectory, isPureLocalMode);
} else {
LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
Class<? extends ContainerLauncher> containerLauncherClazz =
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index 9a38732..18b2e35 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -36,6 +36,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -90,9 +91,10 @@ public class LocalContainerLauncher extends AbstractService implements
private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
private final String workingDirectory;
private final TaskAttemptListener tal;
- private final Map<String, String> localEnv = new HashMap<String, String>();
+ private final Map<String, String> localEnv;
private final ExecutionContext executionContext;
private int numExecutors;
+ private final boolean isPureLocalMode;
private final ConcurrentHashMap<ContainerId, ListenableFuture<TezChild.ContainerExecutionResult>>
runningContainers =
@@ -112,16 +114,26 @@ public class LocalContainerLauncher extends AbstractService implements
public LocalContainerLauncher(AppContext context,
TaskAttemptListener taskAttemptListener,
- String workingDirectory) throws UnknownHostException {
+ String workingDirectory,
+ boolean isPureLocalMode) throws UnknownHostException {
super(LocalContainerLauncher.class.getName());
this.context = context;
this.tal = taskAttemptListener;
this.workingDirectory = workingDirectory;
- AuxiliaryServiceHelper.setServiceDataIntoEnv(
- ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
- executionContext = new ExecutionContextImpl(InetAddress.getLocalHost().getHostName());
- // User cannot be set here since it isn't available till a DAG is running.
+ this.isPureLocalMode = isPureLocalMode;
+ if (isPureLocalMode) {
+ localEnv = Maps.newHashMap();
+ AuxiliaryServiceHelper.setServiceDataIntoEnv(
+ ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
+ } else {
+ localEnv = System.getenv();
+ }
+
+ // Check if the hostname is set in the environment before overriding it.
+ String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() :
+ System.getenv(Environment.NM_HOST.name());
+ executionContext = new ExecutionContextImpl(host);
}
@Override
@@ -338,7 +350,9 @@ public class LocalContainerLauncher extends AbstractService implements
InterruptedException, TezException, IOException {
Map<String, String> containerEnv = new HashMap<String, String>();
containerEnv.putAll(localEnv);
- containerEnv.put(Environment.USER.name(), context.getUser());
+ // Use the user from env if it's available.
+ String user = isPureLocalMode ? System.getenv(Environment.USER.name()) : context.getUser();
+ containerEnv.put(Environment.USER.name(), user);
long memAvailable;
synchronized (this) { // needed to fix findbugs Inconsistent synchronization warning
@@ -347,8 +361,7 @@ public class LocalContainerLauncher extends AbstractService implements
TezChild tezChild =
TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier,
attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials,
- memAvailable, context.getUser());
- tezChild.setUmbilical(tezTaskUmbilicalProtocol);
+ memAvailable, context.getUser(), tezTaskUmbilicalProtocol);
return tezChild;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 5a0ace8..5a8e9fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -434,6 +434,8 @@ public class TaskSchedulerEventHandler extends AbstractService
} else {
customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
}
+ LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" +
+ customAppIdIdentifier);
taskSchedulers[i] = createTaskScheduler(host, port,
trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index a466bc6..73e93c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -516,7 +516,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
// use mock container launcher for tests
@Override
protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf,
- String[] containerLaunchers)
+ String[] containerLaunchers,
+ boolean isLocal)
throws UnknownHostException {
return new ContainerLauncherRouter(containerLauncher);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index bffb5b9..7a365bd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -117,7 +117,7 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(container).when(amContainer).getContainer();
taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null);
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
taskSpec = mock(TaskSpec.class);
doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index 4a6ce33..25d6030 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -282,7 +282,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
request.getContainerIdString(),
request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs,
envMap, objectRegistry, pid,
- executionContext, credentials, memoryAvailable, request.getUser());
+ executionContext, credentials, memoryAvailable, request.getUser(), null);
ContainerExecutionResult result = tezChild.run();
LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
sw.stop().elapsedMillis());
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index 9c149c6..01c2080 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -40,6 +40,7 @@ import org.apache.tez.service.MiniTezTestServiceCluster;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
public class TestExternalTezServices {
@@ -120,26 +121,23 @@ public class TestExternalTezServices {
confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
-// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
// Default all jobs to run via the service. Individual tests override this on a per vertex/dag level.
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
- TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
// Setup various executor sets
PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
@@ -232,18 +230,55 @@ public class TestExternalTezServices {
@Test(timeout = 60000)
public void testMixed1() throws Exception { // M-ExtService, R-containers
- int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 3 for num reducers.
+ int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers.
runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS);
}
@Test(timeout = 60000)
public void testMixed2() throws Exception { // M-Containers, R-ExtService
- int expectedExternalSubmissions = 0 + 3; //4 for 4 src files, 3 for num reducers.
+ int expectedExternalSubmissions = 0 + 3; // 3 for num reducers.
runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH);
}
+ @Test(timeout = 60000)
+ public void testMixed3() throws Exception { // M - service, R-AM
+ int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers (in-AM).
+ runJoinValidate("Mixed3", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
+ PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+ }
+
+ @Test(timeout = 60000)
+ public void testMixed4() throws Exception { // M - containers, R-AM
+ int expectedExternalSubmissions = 0 + 0; // Nothing in external service.
+ runJoinValidate("Mixed4", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+ PROPS_REGULAR_CONTAINERS, PROPS_IN_AM);
+ }
+
+ @Test(timeout = 60000)
+ public void testMixed5() throws Exception { // M1 - containers, M2-extservice, R-AM
+ int expectedExternalSubmissions = 2 + 0; // 2 for M2
+ runJoinValidate("Mixed5", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+ PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM);
+ }
+
+
+ @Ignore // Re-activate this after the AM registers the shuffle token with the launcher.
+ @Test(timeout = 60000)
+ public void testMixed6() throws Exception { // M - AM, R - Service
+ int expectedExternalSubmissions = 0 + 3; // 3 for R in service
+ runJoinValidate("Mixed6", expectedExternalSubmissions, PROPS_IN_AM,
+ PROPS_IN_AM, PROPS_EXT_SERVICE_PUSH);
+ }
+
+ @Test(timeout = 60000)
+ public void testMixed7() throws Exception { // M - AM, R - Containers
+ int expectedExternalSubmissions = 0; // Nothing in ext service
+ runJoinValidate("Mixed7", expectedExternalSubmissions, PROPS_IN_AM,
+ PROPS_IN_AM, PROPS_REGULAR_CONTAINERS);
+ }
+
private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
Map<String, String> rhsProps,
http://git-wip-us.apache.org/repos/asf/tez/blob/f7862e84/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 3cba3ce..7615f08 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -64,6 +64,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,7 +94,6 @@ public class TezChild {
private final int amHeartbeatInterval;
private final long sendCounterInterval;
private final int maxEventsToGet;
- private final boolean isLocal;
private final String workingDir;
private final ListeningExecutorService executor;
@@ -108,9 +108,10 @@ public class TezChild {
private final String user;
private Multimap<String, String> startedInputsMap = HashMultimap.create();
+ private final boolean ownUmbilical;
+ private final TezTaskUmbilicalProtocol umbilical;
private TaskReporter taskReporter;
- private TezTaskUmbilicalProtocol umbilical;
private int taskCount = 0;
private TezVertexID lastVertexID;
@@ -119,7 +120,7 @@ public class TezChild {
Map<String, String> serviceProviderEnvMap,
ObjectRegistryImpl objectRegistry, String pid,
ExecutionContext executionContext,
- Credentials credentials, long memAvailable, String user)
+ Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException {
this.defaultConf = conf;
this.containerIdString = containerIdentifier;
@@ -133,6 +134,8 @@ public class TezChild {
this.memAvailable = memAvailable;
this.user = user;
+ LOG.info("TezChild created with umbilical: " + umbilical);
+
getTaskMaxSleepTime = defaultConf.getInt(
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
@@ -161,25 +164,27 @@ public class TezChild {
}
}
- this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
- TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
TezCommonUtils.convertJobTokenToBytes(jobToken));
- if (!isLocal) {
+ if (umbilical == null) {
final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port);
SecurityUtil.setTokenService(jobToken, address);
taskOwner.addToken(jobToken);
- umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
+ this.umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
@Override
public TezTaskUmbilicalProtocol run() throws Exception {
return RPC.getProxy(TezTaskUmbilicalProtocol.class,
TezTaskUmbilicalProtocol.versionID, address, defaultConf);
}
});
+ ownUmbilical = true;
+ } else {
+ this.umbilical = umbilical;
+ ownUmbilical = false;
}
}
@@ -353,7 +358,7 @@ public class TezChild {
if (taskReporter != null) {
taskReporter.shutdown();
}
- if (!isLocal) {
+ if (ownUmbilical) {
RPC.stopProxy(umbilical);
// TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible.
// LogManager.shutdown();
@@ -361,12 +366,6 @@ public class TezChild {
}
}
- public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){
- if(tezTaskUmbilicalProtocol != null){
- this.umbilical = tezTaskUmbilicalProtocol;
- }
- }
-
public static class ContainerExecutionResult {
public static enum ExitStatus {
SUCCESS(0),
@@ -412,7 +411,8 @@ public class TezChild {
public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory,
Map<String, String> serviceProviderEnvMap, @Nullable String pid,
- ExecutionContext executionContext, Credentials credentials, long memAvailable, String user)
+ ExecutionContext executionContext, Credentials credentials, long memAvailable, String user,
+ TezTaskUmbilicalProtocol tezUmbilical)
throws IOException, InterruptedException, TezException {
// Pull in configuration specified for the session.
@@ -425,7 +425,7 @@ public class TezChild {
return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid,
- executionContext, credentials, memAvailable, user);
+ executionContext, credentials, memAvailable, user, tezUmbilical);
}
public static void main(String[] args) throws IOException, InterruptedException, TezException {
@@ -459,7 +459,7 @@ public class TezChild {
tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()),
System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())),
credentials, Runtime.getRuntime().maxMemory(), System
- .getenv(ApplicationConstants.Environment.USER.toString()));
+ .getenv(ApplicationConstants.Environment.USER.toString()), null);
tezChild.run();
}