You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/27 19:32:02 UTC
git commit: TEZ-408. Add support for multiple DAGs within the same
AM. (hitesh)
Updated Branches:
refs/heads/master fcbf0ddaf -> 849f12579
TEZ-408. Add support for multiple DAGs within the same AM. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/849f1257
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/849f1257
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/849f1257
Branch: refs/heads/master
Commit: 849f12579654bdf168c9f14bb68eb9205e7787b1
Parents: fcbf0dd
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 27 10:31:42 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 27 10:31:42 2013 -0700
----------------------------------------------------------------------
.../org/apache/tez/client/TezClientUtils.java | 7 +-
.../java/org/apache/tez/client/TezSession.java | 17 +-
.../common/counters/FileSystemCounterGroup.java | 2 -
.../common/counters/FrameworkCounterGroup.java | 8 +-
.../org/apache/tez/dag/api/TezConstants.java | 6 +-
.../apache/hadoop/mapred/YarnTezDagChild.java | 12 +-
.../java/org/apache/tez/dag/app/AppContext.java | 2 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 247 ++++++++++++------
.../apache/tez/dag/app/DAGAppMasterState.java | 1 +
.../tez/dag/app/dag/TaskTerminationCause.java | 4 +-
.../dag/event/DAGAppMasterEventDAGFinished.java | 12 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 12 +-
.../app/rm/container/AMContainerHelpers.java | 26 +-
.../dag/app/rm/container/AMContainerImpl.java | 4 +-
.../mapreduce/examples/OrderedWordCount.java | 253 +++++++++++--------
.../org/apache/tez/mapreduce/TestMRRJobs.java | 5 +-
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 114 ++++++++-
17 files changed, 503 insertions(+), 229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 7c6a5ed..8689385 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -261,6 +261,10 @@ public class TezClientUtils {
TezConfiguration.DEFAULT_TEZ_AM_JAVA_OPTS));
vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
+ if (dag == null) {
+ vargs.add("--" + TezConstants.TEZ_SESSION_MODE_CLI_OPTION);
+ }
+
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
File.separator + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -396,9 +400,6 @@ public class TezClientUtils {
textPath, LocalResourceType.FILE,
LocalResourceVisibility.APPLICATION));
}
- } else {
- Apps.addToEnvironment(environment,
- TezConstants.TEZ_AM_IS_SESSION_ENV, "set");
}
Map<ApplicationAccessType, String> acls
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index acf523d..dd7fcab 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -88,6 +88,8 @@ public class TezSession {
sessionConfig.getTezConfiguration(), applicationId,
null, sessionName, sessionConfig.getAMConfiguration(),
tezJarResources);
+ // Set Tez Sessions to not retry on AM crashes
+ appContext.setMaxAppAttempts(1);
tezConfPBLRsrc = appContext.getAMContainerSpec().getLocalResources().get(
TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
yarnClient.submitApplication(appContext);
@@ -121,6 +123,7 @@ public class TezSession {
DAGClientAMProtocolBlockingPB proxy;
while (true) {
+ // FIXME implement a max time to wait for submit
proxy = TezClientUtils.getAMProxy(yarnClient,
sessionConfig.getYarnConfiguration(), applicationId);
if (proxy != null) {
@@ -150,17 +153,19 @@ public class TezSession {
LOG.info("Shutting down Tez Session"
+ ", sessionName=" + sessionName
+ ", applicationId=" + applicationId);
- DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(yarnClient,
- sessionConfig.getYarnConfiguration(), applicationId);
- if (proxy != null) {
- try {
+ try {
+ DAGClientAMProtocolBlockingPB proxy = TezClientUtils.getAMProxy(
+ yarnClient, sessionConfig.getYarnConfiguration(), applicationId);
+ if (proxy != null) {
ShutdownSessionRequestProto request =
ShutdownSessionRequestProto.newBuilder().build();
proxy.shutdownSession(null, request);
return;
- } catch (ServiceException e) {
- LOG.info("Failed to shutdown Tez Session via proxy", e);
}
+ } catch (TezException e) {
+ LOG.info("Failed to shutdown Tez Session via proxy", e);
+ } catch (ServiceException e) {
+ LOG.info("Failed to shutdown Tez Session via proxy", e);
}
LOG.info("Could not connect to AM, killing session via YARN"
+ ", sessionName=" + sessionName
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
index d4b167a..bb2dc8b 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
@@ -232,7 +232,6 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
}
@Override
- @SuppressWarnings("unchecked")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other.getUnderlyingGroup(), "other group")
instanceof FileSystemCounterGroup<?>) {
@@ -255,7 +254,6 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
WritableUtils.writeVInt(out, numSetCounters(entry.getValue()));
for (Object counter : entry.getValue()) {
if (counter == null) continue;
- @SuppressWarnings("unchecked")
FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
WritableUtils.writeVInt(out, c.key.ordinal()); // key
WritableUtils.writeVLong(out, c.getValue()); // value
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
index 42fb636..a99e5a4 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
@@ -26,8 +26,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.WritableUtils;
@@ -44,8 +42,7 @@ import com.google.common.collect.Iterators;
@InterfaceAudience.Private
public abstract class FrameworkCounterGroup<T extends Enum<T>,
C extends TezCounter> implements CounterGroupBase<C> {
- private static final Log LOG = LogFactory.getLog(FrameworkCounterGroup.class);
-
+
private final Class<T> enumClass; // for Enum.valueOf
private final Object[] counters; // local casts are OK and save a class ref
private String displayName = null;
@@ -106,7 +103,6 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
}
}
- @SuppressWarnings("unchecked")
public FrameworkCounterGroup(Class<T> enumClass) {
this.enumClass = enumClass;
T[] enums = enumClass.getEnumConstants();
@@ -194,8 +190,8 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
return n;
}
+ @SuppressWarnings("rawtypes")
@Override
- @SuppressWarnings("unchecked")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other, "other counter group")
instanceof FrameworkCounterGroup<?, ?>) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index 5463d65..5109a0f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -23,7 +23,9 @@ package org.apache.tez.dag.api;
*/
public class TezConstants {
- // Env variable names
- public static final String TEZ_AM_IS_SESSION_ENV = "TEZ_AM_IS_SESSION";
+ /**
+ * Command-line argument to be set when running the Tez AM in session mode.
+ */
+ public static final String TEZ_SESSION_MODE_CLI_OPTION = "session";
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 3092837..a20b774 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -368,7 +368,7 @@ public class YarnTezDagChild {
for (int idle = 0; null == containerTask; ++idle) {
long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
LOG.info("Sleeping for " + sleepTimeMilliSecs
- + "ms before retrying again. Got null now.");
+ + "ms before retrying getTask again. Got null now.");
MILLISECONDS.sleep(sleepTimeMilliSecs);
containerTask = umbilical.getTask(containerContext);
}
@@ -404,7 +404,7 @@ public class YarnTezDagChild {
}
lastVertexId = newVertexId;
updateLoggers(currentTaskAttemptID);
-
+
currentTask = createLogicalTask(attemptNumber, taskSpec,
defaultConf, tezUmbilical, serviceConsumerMetadata);
} finally {
@@ -426,9 +426,15 @@ public class YarnTezDagChild {
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
+ LOG.info("Initializing task"
+ + ", taskAttemptId=" + currentTaskAttemptID);
currentTask.initialize();
if (!currentTask.hadFatalError()) {
+ LOG.info("Running task"
+ + ", taskAttemptId=" + currentTaskAttemptID);
currentTask.run();
+ LOG.info("Closing task"
+ + ", taskAttemptId=" + currentTaskAttemptID);
currentTask.close();
}
LOG.info("Task completed"
@@ -525,7 +531,7 @@ public class YarnTezDagChild {
String [] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
conf.setStrings(TezJobConfig.LOCAL_DIRS, localDirs);
LOG.info("LocalDirs for child: " + Arrays.toString(localDirs));
-
+
return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf,
tezUmbilical, serviceConsumerMetadata);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 64f8965..26c0992 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -42,7 +42,7 @@ public interface AppContext {
DAGAppMaster getAppMaster();
- Configuration getConf();
+ Configuration getAMConf();
ApplicationId getApplicationID();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index b846992..88d1a30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -35,6 +35,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -140,6 +143,7 @@ public class DAGAppMaster extends AbstractService {
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private Clock clock;
+ private final boolean isSession;
private long appsStartTime;
private final long startTime;
private final long appSubmitTime;
@@ -152,7 +156,7 @@ public class DAGAppMaster extends AbstractService {
private AMContainerMap containers;
private AMNodeMap nodes;
private AppContext context;
- private Configuration conf;
+ private Configuration amConf;
private Dispatcher dispatcher;
private ContainerLauncher containerLauncher;
private TaskCleaner taskCleaner;
@@ -178,20 +182,31 @@ public class DAGAppMaster extends AbstractService {
private Credentials fsTokens = new Credentials(); // Filled during init
private UserGroupInformation currentUser; // Will be setup during init
+ private AtomicBoolean sessionStopped = new AtomicBoolean(false);
+
+ // DAG Counter
+ private final AtomicInteger dagCounter = new AtomicInteger();
+
+ // Session counters
+ private final AtomicInteger submittedDAGs = new AtomicInteger();
+ private final AtomicInteger successfulDAGs = new AtomicInteger();
+ private final AtomicInteger failedDAGs = new AtomicInteger();
+ private final AtomicInteger killedDAGs = new AtomicInteger();
+
// must be LinkedHashMap to preserve order of service addition
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- long appSubmitTime) {
+ long appSubmitTime, boolean isSession) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
- new SystemClock(), appSubmitTime);
+ new SystemClock(), appSubmitTime, isSession);
}
public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
- Clock clock, long appSubmitTime) {
+ Clock clock, long appSubmitTime, boolean isSession) {
super(DAGAppMaster.class.getName());
this.clock = clock;
this.startTime = clock.getTime();
@@ -202,6 +217,7 @@ public class DAGAppMaster extends AbstractService {
this.nmPort = nmPort;
this.nmHttpPort = nmHttpPort;
this.state = DAGAppMasterState.NEW;
+ this.isSession = isSession;
// TODO Metrics
//this.metrics = DAGAppMetrics.create();
LOG.info("Created DAGAppMaster for application " + applicationAttemptId);
@@ -212,7 +228,7 @@ public class DAGAppMaster extends AbstractService {
this.state = DAGAppMasterState.INITED;
- this.conf = conf;
+ this.amConf = conf;
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
@@ -307,10 +323,47 @@ public class DAGAppMaster extends AbstractService {
}
break;
case DAG_FINISHED:
- setStateOnDAGCompletion();
- LOG.info("Shutting down on completion of dag:" +
- ((DAGAppMasterEventDAGFinished)event).getDAGId().toString());
- shutdownHandler.shutdown();
+ DAGAppMasterEventDAGFinished finishEvt =
+ (DAGAppMasterEventDAGFinished) event;
+ if (!isSession) {
+ setStateOnDAGCompletion();
+ LOG.info("Shutting down on completion of dag:" +
+ finishEvt.getDAGId().toString());
+ shutdownHandler.shutdown();
+ } else {
+ LOG.info("DAG completed, dagId="
+ + finishEvt.getDAGId().toString()
+ + ", dagState=" + finishEvt.getDAGState());
+ switch(finishEvt.getDAGState()) {
+ case SUCCEEDED:
+ successfulDAGs.incrementAndGet();
+ break;
+ case ERROR:
+ case FAILED:
+ failedDAGs.incrementAndGet();
+ break;
+ case KILLED:
+ killedDAGs.incrementAndGet();
+ break;
+ default:
+ LOG.fatal("Received a DAG Finished Event with state="
+ + finishEvt.getDAGState()
+ + ". Error. Shutting down.");
+ state = DAGAppMasterState.ERROR;
+ shutdownHandler.shutdown();
+ break;
+ }
+ if (!state.equals(DAGAppMasterState.ERROR)) {
+ if (!sessionStopped.get()) {
+ LOG.info("Waiting for next DAG to be submitted.");
+ state = DAGAppMasterState.IDLE;
+ } else {
+ LOG.info("Session shutting down now.");
+ state = DAGAppMasterState.SUCCEEDED;
+ shutdownHandler.shutdown();
+ }
+ }
+ }
break;
default:
throw new TezUncheckedException(
@@ -373,10 +426,21 @@ public class DAGAppMaster extends AbstractService {
/** Create and initialize (but don't start) a single dag. */
protected DAG createDAG(DAGPlan dagPB) {
- TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
- // create single job
+ TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(),
+ dagCounter.incrementAndGet());
+
+ Iterator<PlanKeyValuePair> iter =
+ dagPB.getDagKeyValues().getConfKeyValuesList().iterator();
+ Configuration dagConf = new Configuration(amConf);
+
+ while (iter.hasNext()) {
+ PlanKeyValuePair keyValPair = iter.next();
+ dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+ }
+
+ // create single dag
DAG newDag =
- new DAGImpl(dagId, conf, dagPB, dispatcher.getEventHandler(),
+ new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
currentUser.getShortUserName(),
taskHeartbeatHandler, context);
@@ -524,8 +588,15 @@ public class DAGAppMaster extends AbstractService {
}
public List<String> getDiagnostics() {
- if(currentDAG != null) {
- return currentDAG.getDiagnostics();
+ if (!isSession) {
+ if(currentDAG != null) {
+ return currentDAG.getDiagnostics();
+ }
+ } else {
+ return Collections.singletonList("Session stats:"
+ + "submittedDAGs=" + submittedDAGs.get()
+ + ", successfulDAGs=" + successfulDAGs.get()
+ + ", failedDAGs=" + failedDAGs.get());
}
return null;
}
@@ -539,37 +610,74 @@ public class DAGAppMaster extends AbstractService {
private synchronized void setStateOnDAGCompletion() {
DAGAppMasterState oldState = state;
- if(state == DAGAppMasterState.RUNNING) {
- switch(currentDAG.getState()) {
- case SUCCEEDED:
- state = DAGAppMasterState.SUCCEEDED;
- break;
- case FAILED:
- state = DAGAppMasterState.FAILED;
- break;
- case KILLED:
- state = DAGAppMasterState.KILLED;
- break;
- case ERROR:
- state = DAGAppMasterState.ERROR;
- break;
- default:
- state = DAGAppMasterState.ERROR;
- break;
- }
+ if(isSession) {
+ return;
+ }
+ switch(currentDAG.getState()) {
+ case SUCCEEDED:
+ state = DAGAppMasterState.SUCCEEDED;
+ break;
+ case FAILED:
+ state = DAGAppMasterState.FAILED;
+ break;
+ case KILLED:
+ state = DAGAppMasterState.KILLED;
+ break;
+ case ERROR:
+ state = DAGAppMasterState.ERROR;
+ break;
+ default:
+ state = DAGAppMasterState.ERROR;
+ break;
}
LOG.info("On DAG completion. Old state: "
+ oldState + " new state: " + state);
}
+ synchronized String submitDAGToAppMaster(DAGPlan dagPlan)
+ throws TezException {
+ if(currentDAG != null
+ && !state.equals(DAGAppMasterState.IDLE)) {
+ throw new TezException("App master already running a DAG");
+ }
+ if (state.equals(DAGAppMasterState.ERROR)
+ || sessionStopped.get()) {
+ throw new TezException("AM unable to accept new DAG submissions."
+ + " In the process of shutting down");
+ }
+
+ // RPC server runs in the context of the job user as it was started in
+ // the job user's UGI context
+ LOG.info("Starting DAG submitted via RPC");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing DAG plan to: "
+ + TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
+
+ File outFile = new File(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
+ try {
+ PrintWriter printWriter = new PrintWriter(outFile);
+ String dagPbString = dagPlan.toString();
+ printWriter.println(dagPbString);
+ printWriter.close();
+ } catch (IOException e) {
+ throw new TezException("Failed to write TEZ_PLAN to "
+ + outFile.toString(), e);
+ }
+ }
+
+ submittedDAGs.incrementAndGet();
+ startDAG(dagPlan);
+ return currentDAG.getID().toString();
+ }
+
public class DAGClientHandler {
public List<String> getAllDAGs() throws TezException {
return Collections.singletonList(currentDAG.getID().toString());
}
- public DAGStatus getDAGStatus(String dagIdStr)
- throws TezException {
+ public DAGStatus getDAGStatus(String dagIdStr) throws TezException {
return getDAG(dagIdStr).getDAGStatus();
}
@@ -588,6 +696,7 @@ public class DAGAppMaster extends AbstractService {
if(dagId == null) {
throw new TezException("Bad dagId: " + dagIdStr);
}
+
if(currentDAG == null) {
throw new TezException("No running dag at present");
}
@@ -607,29 +716,7 @@ public class DAGAppMaster extends AbstractService {
}
public synchronized String submitDAG(DAGPlan dagPlan) throws TezException {
- if(currentDAG != null) {
- throw new TezException("App master already running a DAG");
- }
- // RPC server runs in the context of the job user as it was started in
- // the job user's UGI context
- LOG.info("Starting DAG submitted via RPC");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing DAG plan to: " + TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
-
- File outFile = new File(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME);
- try {
- PrintWriter printWriter = new PrintWriter(outFile);
- String dagPbString = dagPlan.toString();
- printWriter.println(dagPbString);
- printWriter.close();
- } catch (IOException e) {
- throw new TezException("Failed to write TEZ_PLAN to " + outFile.toString(), e);
- }
-
-
- }
- startDAG(dagPlan);
- return currentDAG.getID().toString();
+ return submitDAGToAppMaster(dagPlan);
}
public synchronized void shutdownAM() {
@@ -640,8 +727,12 @@ public class DAGAppMaster extends AbstractService {
LOG.info("Sending a kill event to the current DAG"
+ ", dagId=" + currentDAG.getID());
sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
+ sessionStopped.set(true);
} else {
LOG.info("No current running DAG, shutting down the AM");
+ if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
+ state = DAGAppMasterState.SUCCEEDED;
+ }
shutdownHandler.shutdown();
}
}
@@ -665,7 +756,7 @@ public class DAGAppMaster extends AbstractService {
}
@Override
- public Configuration getConf() {
+ public Configuration getAMConf() {
return conf;
}
@@ -944,7 +1035,7 @@ public class DAGAppMaster extends AbstractService {
startServices();
super.serviceStart();
- this.state = DAGAppMasterState.RUNNING;
+ this.state = DAGAppMasterState.IDLE;
// metrics system init is really init & start.
// It's more test friendly to put it here.
@@ -955,6 +1046,12 @@ public class DAGAppMaster extends AbstractService {
startTime, appsStartTime, appSubmitTime);
dispatcher.getEventHandler().handle(
new DAGHistoryEvent(startEvent));
+
+ if (!isSession) {
+ startDAG();
+ } else {
+ LOG.info("In Session mode. Waiting for DAG over RPC");
+ }
}
@Override
@@ -1049,10 +1146,18 @@ public class DAGAppMaster extends AbstractService {
// the objects myself.
conf.setBoolean("fs.automatic.close", false);
+ // Command line options
+ Options opts = new Options();
+ opts.addOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION,
+ false, "Run Tez Application Master in Session mode");
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
DAGAppMaster appMaster =
new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString),
- Integer.parseInt(nodeHttpPortString), appSubmitTime);
+ Integer.parseInt(nodeHttpPortString), appSubmitTime,
+ cliParser.hasOption(TezConstants.TEZ_SESSION_MODE_CLI_OPTION));
ShutdownHookManager.get().addShutdownHook(
new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
@@ -1086,12 +1191,13 @@ public class DAGAppMaster extends AbstractService {
LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler");
appMaster.taskSchedulerEventHandler.setSignalled(true);
}
+
if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED,
- DAGAppMasterState.RUNNING).contains(appMaster.state)) {
- // DAG not in a final state. Must have receive a KILL signal
+ DAGAppMasterState.IDLE, DAGAppMasterState.RUNNING)
+ .contains(appMaster.state)) {
+ // DAG not in a final state. Must have receive a KILL signal
appMaster.state = DAGAppMasterState.KILLED;
}
-
appMaster.stop();
}
}
@@ -1127,13 +1233,6 @@ public class DAGAppMaster extends AbstractService {
}
}
- Iterator<PlanKeyValuePair> iter =
- dagPlan.getDagKeyValues().getConfKeyValuesList().iterator();
- while (iter.hasNext()) {
- PlanKeyValuePair keyValPair = iter.next();
- conf.set(keyValPair.getKey(), keyValPair.getValue());
- }
-
// Job name is the same as the app name until we support multiple dags
// for an app later
appName = dagPlan.getName();
@@ -1145,6 +1244,8 @@ public class DAGAppMaster extends AbstractService {
private void startDAG(DAG dag) {
currentDAG = dag;
+ this.state = DAGAppMasterState.RUNNING;
+
// End of creating the job.
((RunningAppContext) context).setDAG(currentDAG);
@@ -1177,12 +1278,6 @@ public class DAGAppMaster extends AbstractService {
public Object run() throws Exception {
appMaster.init(conf);
appMaster.start();
- String submitDAGOverRpc = System.getenv(TezConstants.TEZ_AM_IS_SESSION_ENV);
- if(submitDAGOverRpc == null || submitDAGOverRpc.isEmpty()) {
- appMaster.startDAG();
- } else {
- LOG.info("Waiting for DAG over RPC");
- }
return null;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
index f598eeb..a410c0a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterState.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app;
public enum DAGAppMasterState {
NEW,
INITED,
+ IDLE,
RUNNING,
SUCCEEDED,
FAILED,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
index 6736d2a..321dd01 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskTerminationCause.java
@@ -24,8 +24,8 @@ package org.apache.tez.dag.app.dag;
public enum TaskTerminationCause {
/** DAG was killed */
- DAG_KILL,
-
+ DAG_KILL,
+
/** Other vertex failed causing DAG to fail thus killing the parent vertex */
OTHER_VERTEX_FAILURE,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
index cad26d1..f58dace 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventDAGFinished.java
@@ -18,17 +18,25 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.records.TezDAGID;
public class DAGAppMasterEventDAGFinished extends DAGAppMasterEvent {
private final TezDAGID dagId;
+ private final DAGState dagState;
- public DAGAppMasterEventDAGFinished(TezDAGID dagId) {
+ public DAGAppMasterEventDAGFinished(TezDAGID dagId,
+ DAGState dagState) {
super(DAGAppMasterEventType.DAG_FINISHED);
this.dagId = dagId;
+ this.dagState = dagState;
}
-
+
public TezDAGID getDAGId() {
return dagId;
}
+
+ public DAGState getDAGState() {
+ return dagState;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index abde208..a8d7c30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -713,7 +713,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
*/
if (finishTime == 0) setFinishTime();
- eventHandler.handle(new DAGAppMasterEventDAGFinished(getID()));
+ eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
// TODO Metrics
/*
@@ -869,7 +869,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return dag.finished(DAGState.FAILED);
}
}
-
+
private void createDAGEdges(DAGImpl dag) {
for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
EdgeProperty edgeProperty = DagTypeConverters
@@ -1156,7 +1156,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
-
+
private static class VertexReRunningTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
@@ -1165,7 +1165,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
job.numCompletedVertices--;
job.vertexReRunning(vertex);
-
+
LOG.info("Vertex " + vertex.getVertexId() + " re-running."
+ ", numCompletedVertices=" + job.numCompletedVertices
@@ -1175,13 +1175,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ ", numVertices=" + job.numVertices);
}
}
-
+
private void vertexSucceeded(Vertex vertex) {
numSuccessfulVertices++;
// TODO: Metrics
//job.metrics.completedTask(task);
}
-
+
private void vertexReRunning(Vertex vertex) {
numSuccessfulVertices--;
addDiagnostic("Vertex re-running " + vertex.getVertexId());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 11e8aeb..be7d99d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.utils.TezRuntimeChildJVM;
import org.apache.tez.runtime.library.common.security.TokenCache;
import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
@@ -55,7 +56,10 @@ public class AMContainerHelpers {
private static final Log LOG = LogFactory.getLog(AMContainerHelpers.class);
private static Object commonContainerSpecLock = new Object();
- private static ContainerLaunchContext commonContainerSpec = null;
+ private static TezDAGID lastDAGID = null;
+ private static Map<TezDAGID, ContainerLaunchContext> commonContainerSpecs =
+ new HashMap<TezDAGID, ContainerLaunchContext>();
+
/**
* Create a {@link LocalResource} record with all the given parameters.
@@ -134,6 +138,7 @@ public class AMContainerHelpers {
@VisibleForTesting
public static ContainerLaunchContext createContainerLaunchContext(
+ TezDAGID tezDAGID,
Map<ApplicationAccessType, String> acls,
ContainerId containerId,
Map<String, LocalResource> localResources,
@@ -142,10 +147,23 @@ public class AMContainerHelpers {
TaskAttemptListener taskAttemptListener, Credentials credentials,
boolean shouldProfile, AppContext appContext) {
+ ContainerLaunchContext commonContainerSpec = null;
synchronized (commonContainerSpecLock) {
- if (commonContainerSpec == null) {
- commonContainerSpec = createCommonContainerLaunchContext(
- acls, credentials);
+ if (!commonContainerSpecs.containsKey(tezDAGID)) {
+ commonContainerSpec =
+ createCommonContainerLaunchContext(acls, credentials);
+ commonContainerSpecs.put(tezDAGID, commonContainerSpec);
+ } else {
+ commonContainerSpec = commonContainerSpecs.get(tezDAGID);
+ }
+
+ // Ensure that we remove container specs for previous AMs to reduce
+ // memory footprint
+ if (lastDAGID == null) {
+ lastDAGID = tezDAGID;
+ } else if (!lastDAGID.equals(tezDAGID)) {
+ commonContainerSpecs.remove(lastDAGID);
+ lastDAGID = tezDAGID;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 4731a24..9303b62 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -71,7 +71,8 @@ public class AMContainerImpl implements AMContainer {
private final TaskAttemptListener taskAttemptListener;
protected final EventHandler eventHandler;
- private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
+ private final List<TezTaskAttemptID> completedAttempts =
+ new LinkedList<TezTaskAttemptID>();
// TODO Maybe this should be pulled from the TaskAttempt.s
private final Map<TezTaskAttemptID, TaskSpec> remoteTaskMap =
@@ -330,6 +331,7 @@ public class AMContainerImpl implements AMContainer {
ContainerContext containerContext = event.getContainerContext();
container.clc = AMContainerHelpers.createContainerLaunchContext(
+ container.appContext.getCurrentDAGID(),
container.appContext.getApplicationACLs(),
container.getContainerId(),
containerContext.getLocalResources(),
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index cad79f5..9d54a98 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -82,6 +82,12 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
/**
* An MRR job built on top of word count to return words sorted by
* their frequency of occurrence.
+ *
+ * Use -DUSE_TEZ_SESSION=true to run jobs in a session mode.
+ * If multiple input/outputs are provided, this job will process each pair
+ * as a separate DAG in a sequential manner.
+ * Use -DINTER_JOB_SLEEP_INTERVAL=<N> where N is the sleep interval in seconds
+ * between the sequential DAGs.
*/
public class OrderedWordCount {
@@ -136,74 +142,9 @@ public class OrderedWordCount {
}
}
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: wordcount <in> <out>");
- System.exit(2);
- }
- String inputPath = otherArgs[0];
- String outputPath = otherArgs[1];
-
- boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
-
- UserGroupInformation.setConfiguration(conf);
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
-
- TezConfiguration tezConf = new TezConfiguration(conf);
- TezClient tezClient = new TezClient(tezConf);
- ApplicationId appId = tezClient.createApplication();
-
- FileSystem fs = FileSystem.get(conf);
- if (fs.exists(new Path(outputPath))) {
- throw new FileAlreadyExistsException("Output directory " + outputPath +
- " already exists");
- }
-
- String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
- + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
- + Path.SEPARATOR + appId.toString();
- Path stagingDir = new Path(stagingDirStr);
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
- stagingDir = fs.makeQualified(stagingDir);
- TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
-
- tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
- MRHelpers.getMRAMJavaOpts(conf));
-
- String jarPath = ClassUtil.findContainingJar(OrderedWordCount.class);
- if (jarPath == null) {
- throw new TezUncheckedException("Could not find any jar containing"
- + " OrderedWordCount.class in the classpath");
- }
- Path remoteJarPath = fs.makeQualified(
- new Path(stagingDir, "dag_job.jar"));
- fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
- FileStatus jarFileStatus = fs.getFileStatus(remoteJarPath);
-
- Map<String, LocalResource> commonLocalResources =
- new TreeMap<String, LocalResource>();
- LocalResource dagJarLocalRsrc = LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromPath(remoteJarPath),
- LocalResourceType.FILE,
- LocalResourceVisibility.APPLICATION,
- jarFileStatus.getLen(),
- jarFileStatus.getModificationTime());
- commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
-
- TezSession tezSession = null;
- AMConfiguration amConfig = new AMConfiguration("default", null,
- commonLocalResources, tezConf, null);
- if (useTezSession) {
- LOG.info("Creating Tez Session");
- TezSessionConfiguration sessionConfig =
- new TezSessionConfiguration(amConfig, tezConf);
- tezSession = new TezSession("OrderedWordCountSession",
- sessionConfig);
- tezSession.start();
- LOG.info("Created Tez Session");
- }
+ private static DAG createDAG(FileSystem fs, Configuration conf,
+ Map<String, LocalResource> commonLocalResources, Path stagingDir,
+ int dagIndex, String inputPath, String outputPath) throws Exception {
Configuration mapStageConf = new JobConf(conf);
mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
@@ -303,62 +244,170 @@ public class OrderedWordCount {
finalReduceVertex.setTaskEnvironment(reduceEnv);
vertices.add(finalReduceVertex);
- DAG dag = new DAG("OrderedWordCount");
+ DAG dag = new DAG("OrderedWordCount" + dagIndex);
for (int i = 0; i < vertices.size(); ++i) {
dag.addVertex(vertices.get(i));
if (i != 0) {
dag.addEdge(new Edge(vertices.get(i-1),
vertices.get(i), new EdgeProperty(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
+ SchedulingType.SEQUENTIAL,
new OutputDescriptor(
OnFileSortedOutput.class.getName()),
new InputDescriptor(
ShuffledMergedInputLegacy.class.getName()))));
}
}
+ return dag;
+ }
- DAGClient dagClient;
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
+ long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
+ * 1000;
+ if (((otherArgs.length%2) != 0)
+ || (!useTezSession && otherArgs.length != 2)) {
+ System.err.println("Usage: wordcount <in> <out>");
+ System.err.println("Usage (In Session Mode):"
+ + " wordcount <in1> <out1> ... <inN> <outN>");
+ System.exit(2);
+ }
+ List<String> inputPaths = new ArrayList<String>();
+ List<String> outputPaths = new ArrayList<String>();
+
+ for (int i = 0; i < otherArgs.length; i+=2) {
+ inputPaths.add(otherArgs[i]);
+ outputPaths.add(otherArgs[i+1]);
+ }
+
+ UserGroupInformation.setConfiguration(conf);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ TezConfiguration tezConf = new TezConfiguration(conf);
+ TezClient tezClient = new TezClient(tezConf);
+ ApplicationId appId = tezClient.createApplication();
+
+ FileSystem fs = FileSystem.get(conf);
+
+ String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+ + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+ + Path.SEPARATOR + appId.toString();
+ Path stagingDir = new Path(stagingDirStr);
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+ stagingDir = fs.makeQualified(stagingDir);
+ TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+ tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+ MRHelpers.getMRAMJavaOpts(conf));
+
+ String jarPath = ClassUtil.findContainingJar(OrderedWordCount.class);
+ if (jarPath == null) {
+ throw new TezUncheckedException("Could not find any jar containing"
+ + " OrderedWordCount.class in the classpath");
+ }
+ Path remoteJarPath = fs.makeQualified(
+ new Path(stagingDir, "dag_job.jar"));
+ fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+ FileStatus jarFileStatus = fs.getFileStatus(remoteJarPath);
+
+ Map<String, LocalResource> commonLocalResources =
+ new TreeMap<String, LocalResource>();
+ LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ jarFileStatus.getLen(),
+ jarFileStatus.getModificationTime());
+ commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+ TezSession tezSession = null;
+ AMConfiguration amConfig = new AMConfiguration("default", null,
+ commonLocalResources, tezConf, null);
if (useTezSession) {
- LOG.info("Submitting DAG to Tez Session");
- dagClient = tezSession.submitDAG(dag);
- LOG.info("Submitted DAG to Tez Session");
- } else {
- LOG.info("Submitting DAG as a new Tez Application");
- dagClient = tezClient.submitDAGApplication(dag, amConfig);
+ LOG.info("Creating Tez Session");
+ TezSessionConfiguration sessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("OrderedWordCountSession", appId,
+ sessionConfig);
+ tezSession.start();
+ LOG.info("Created Tez Session");
}
DAGStatus dagStatus = null;
try {
- while (true) {
- dagStatus = dagClient.getDAGStatus();
- if(dagStatus.getState() == DAGStatus.State.RUNNING ||
- dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
- dagStatus.getState() == DAGStatus.State.FAILED ||
- dagStatus.getState() == DAGStatus.State.KILLED ||
- dagStatus.getState() == DAGStatus.State.ERROR) {
- break;
+ for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
+ if (dagIndex != 1
+ && interJobSleepTimeout > 0) {
+ try {
+ LOG.info("Sleeping between jobs, sleepInterval="
+ + (interJobSleepTimeout/1000));
+ Thread.sleep(interJobSleepTimeout);
+ } catch (InterruptedException e) {
+ LOG.info("Main thread interrupted. Breaking out of job loop");
+ break;
+ }
}
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- // continue;
+
+ String inputPath = inputPaths.get(dagIndex-1);
+ String outputPath = outputPaths.get(dagIndex-1);
+
+ if (fs.exists(new Path(outputPath))) {
+ throw new FileAlreadyExistsException("Output directory "
+ + outputPath + " already exists");
+ }
+ LOG.info("Running OrderedWordCount DAG"
+ + ", dagIndex=" + dagIndex
+ + ", inputPath=" + inputPath
+ + ", outputPath=" + outputPath);
+
+ DAG dag = createDAG(fs, conf, commonLocalResources, stagingDir,
+ dagIndex, inputPath, outputPath);
+
+ DAGClient dagClient;
+ if (useTezSession) {
+ LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
+ dagClient = tezSession.submitDAG(dag);
+ LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
+ } else {
+ LOG.info("Submitting DAG as a new Tez Application");
+ dagClient = tezClient.submitDAGApplication(dag, amConfig);
}
- }
- while (dagStatus.getState() == DAGStatus.State.RUNNING) {
- try {
- ExampleDriver.printMRRDAGStatus(dagStatus);
+ while (true) {
+ dagStatus = dagClient.getDAGStatus();
+ if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+ dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.KILLED ||
+ dagStatus.getState() == DAGStatus.State.ERROR) {
+ break;
+ }
try {
- Thread.sleep(1000);
+ Thread.sleep(500);
} catch (InterruptedException e) {
// continue;
}
- dagStatus = dagClient.getDAGStatus();
- } catch (TezException e) {
- LOG.fatal("Failed to get application progress. Exiting");
- System.exit(-1);
}
+
+ while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+ try {
+ ExampleDriver.printMRRDAGStatus(dagStatus);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // continue;
+ }
+ dagStatus = dagClient.getDAGStatus();
+ } catch (TezException e) {
+ LOG.fatal("Failed to get application progress. Exiting");
+ System.exit(-1);
+ }
+ }
+ ExampleDriver.printMRRDAGStatus(dagStatus);
+ LOG.info("DAG " + dagIndex + " completed. "
+ + "FinalState=" + dagStatus.getState());
}
} finally {
fs.delete(stagingDir, true);
@@ -367,9 +416,11 @@ public class OrderedWordCount {
}
}
- ExampleDriver.printMRRDAGStatus(dagStatus);
- LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
- System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+ if (!useTezSession) {
+ ExampleDriver.printMRRDAGStatus(dagStatus);
+ LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+ System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
index 2b9a35f..274d150 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -78,6 +78,7 @@ public class TestMRRJobs {
@BeforeClass
public static void setup() throws IOException {
try {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).racks(null).build();
@@ -109,11 +110,11 @@ public class TestMRRJobs {
LOG.info("APP_JAR_HDFS: " + APP_JAR_HDFS);
LOG.info("YARN_SITE_XML: " + YARN_SITE_XML);
LOG.info("YARN_SITE_XML_HDFS: " + YARN_SITE_XML_HDFS);
-
+
localFs.copyFromLocalFile(new Path(MiniMRRTezCluster.APPJAR), APP_JAR);
localFs.setPermission(APP_JAR, new FsPermission("700"));
localFs.copyFromLocalFile(mrrTezCluster.getConfigFilePath(), YARN_SITE_XML);
-
+
remoteFs.copyFromLocalFile(new Path(MiniMRRTezCluster.APPJAR), APP_JAR_HDFS);
remoteFs.copyFromLocalFile(mrrTezCluster.getConfigFilePath(), YARN_SITE_XML_HDFS);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/849f1257/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 7280a1f..1c66deb 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -202,13 +203,78 @@ public class TestMRRJobsDAGApi {
// Submits a DAG to AM via RPC after AM has started
@Test(timeout = 60000)
- public void testMRRSleepJobPlanViaRPC() throws IOException,
+ public void testMRRSleepJobViaSession() throws IOException,
InterruptedException, TezException, ClassNotFoundException, YarnException {
State finalState = testMRRSleepJobDagSubmitCore(true, false, false);
Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
}
+ // Submits a DAG to AM via RPC after AM has started
+ @Test(timeout = 120000)
+ public void testMultipleMRRSleepJobViaSession() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ Map<String, String> commonEnv = createCommonEnv();
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+ .valueOf(new Random().nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ TezConfiguration tezConf = new TezConfiguration(
+ mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+ LocalResource appJarLr = createLocalResource(remoteFs,
+ remoteFs.makeQualified(APP_JAR_HDFS), LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION);
+ LocalResource yarnSiteLr = createLocalResource(remoteFs,
+ remoteFs.makeQualified(YARN_SITE_XML_HDFS), LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION);
+ Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+ commonLocalResources.put(APP_JAR.getName(), appJarLr);
+
+ Map<String, LocalResource> amLocalResources =
+ new HashMap<String, LocalResource>();
+ amLocalResources.put("yarn-site.xml", yarnSiteLr);
+ amLocalResources.putAll(commonLocalResources);
+
+ AMConfiguration amConfig = new AMConfiguration(
+ "default", commonEnv, amLocalResources,
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ TezSession tezSession = new TezSession("testsession", tezSessionConfig);
+ tezSession.start();
+
+ State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ ApplicationId appId = tezSession.getApplicationId();
+ tezSession.stop();
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FAILED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.KILLED)) {
+ break;
+ }
+ }
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ Assert.assertEquals(YarnApplicationState.FINISHED,
+ appReport.getYarnApplicationState());
+ Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+ appReport.getFinalApplicationStatus());
+ }
+
// Submits a simple 5 stage sleep job using tez session. Then kills it.
@Test(timeout = 60000)
public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
@@ -231,7 +297,27 @@ public class TestMRRJobsDAGApi {
boolean dagViaRPC,
boolean killDagWhileRunning,
boolean closeSessionBeforeSubmit) throws IOException,
- InterruptedException, TezException, ClassNotFoundException, YarnException {
+ InterruptedException, TezException, ClassNotFoundException,
+ YarnException {
+ return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
+ closeSessionBeforeSubmit, null);
+ }
+
+ private Map<String, String> createCommonEnv() {
+ Map<String, String> commonEnv = new HashMap<String, String>();
+ Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(), ".");
+ Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(),
+ System.getProperty("java.class.path"));
+ return commonEnv;
+ }
+
+ public State testMRRSleepJobDagSubmitCore(
+ boolean dagViaRPC,
+ boolean killDagWhileRunning,
+ boolean closeSessionBeforeSubmit,
+ TezSession reUseTezSession) throws IOException,
+ InterruptedException, TezException, ClassNotFoundException,
+ YarnException {
LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
@@ -325,11 +411,7 @@ public class TestMRRJobsDAGApi {
Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
commonLocalResources.put(APP_JAR.getName(), appJarLr);
- Map<String, String> commonEnv = new HashMap<String, String>();
- // TODO Use utility method post TEZ-205.
- Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(), ".");
- Apps.addToEnvironment(commonEnv, Environment.CLASSPATH.name(),
- System.getProperty("java.class.path"));
+ Map<String, String> commonEnv = createCommonEnv();
// TODO Use utility method post TEZ-205.
Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
@@ -368,7 +450,7 @@ public class TestMRRJobsDAGApi {
OnFileSortedOutput.class.getName()), new InputDescriptor(
ShuffledMergedInputLegacy.class.getName())));
Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
- DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL, new OutputDescriptor(
OnFileSortedOutput.class.getName()), new InputDescriptor(
ShuffledMergedInputLegacy.class.getName())));
@@ -389,6 +471,7 @@ public class TestMRRJobsDAGApi {
TezClient tezClient = new TezClient(tezConf);
DAGClient dagClient = null;
TezSession tezSession = null;
+ boolean reuseSession = reUseTezSession != null;
TezSessionConfiguration tezSessionConfig;
AMConfiguration amConfig = new AMConfiguration(
"default", commonEnv, amLocalResources,
@@ -397,9 +480,13 @@ public class TestMRRJobsDAGApi {
// TODO Use utility method post TEZ-205 to figure out AM arguments etc.
dagClient = tezClient.submitDAGApplication(dag, amConfig);
} else {
- tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
- tezSession = new TezSession("testsession", tezSessionConfig);
- tezSession.start();
+ if (reuseSession) {
+ tezSession = reUseTezSession;
+ } else {
+ tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("testsession", tezSessionConfig);
+ tezSession.start();
+ }
}
if (dagViaRPC && closeSessionBeforeSubmit) {
@@ -429,7 +516,7 @@ public class TestMRRJobsDAGApi {
+ ", finalAppStatus=" + appReport.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationState.FINISHED,
appState);
- Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
+ Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
appReport.getFinalApplicationStatus());
break;
}
@@ -460,6 +547,9 @@ public class TestMRRJobsDAGApi {
}
dagStatus = dagClient.getDAGStatus();
}
+ if (dagViaRPC && !reuseSession) {
+ tezSession.stop();
+ }
return dagStatus.getState();
}