You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:19 UTC
[4/4] git commit: TEZ-847. Support basic AM recovery. (hitesh)
TEZ-847. Support basic AM recovery. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5b464f27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5b464f27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5b464f27
Branch: refs/heads/master
Commit: 5b464f27da273723e422607518c271cd3f040560
Parents: 18290c8
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Mar 5 15:34:53 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Mar 5 15:34:53 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/tez/client/TezSession.java | 2 -
.../apache/tez/dag/api/TezConfiguration.java | 10 +-
.../tez/dag/api/VertexManagerPluginContext.java | 5 +
.../apache/tez/runtime/api/OutputCommitter.java | 18 +
.../tez/runtime/api/OutputCommitterContext.java | 10 +
tez-api/src/main/proto/DAGApiRecords.proto | 15 +-
.../tez/dag/api/client/VertexStatusBuilder.java | 2 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 72 +-
.../org/apache/tez/dag/app/RecoveryParser.java | 613 ++++++++++
.../java/org/apache/tez/dag/app/dag/DAG.java | 3 +
.../java/org/apache/tez/dag/app/dag/Task.java | 3 +
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 6 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 5 +
.../org/apache/tez/dag/app/dag/VertexState.java | 1 +
.../tez/dag/app/dag/event/DAGEventType.java | 6 +-
.../dag/app/dag/event/TaskAttemptEventType.java | 3 +
.../dag/app/dag/event/TaskEventRecoverTask.java | 41 +
.../tez/dag/app/dag/event/TaskEventType.java | 6 +-
.../app/dag/event/VertexEventRecoverVertex.java | 36 +
.../app/dag/event/VertexEventRouteEvent.java | 14 +-
.../event/VertexEventSourceVertexRecovered.java | 56 +
.../tez/dag/app/dag/event/VertexEventType.java | 8 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 465 +++++---
.../dag/impl/OutputCommitterContextImpl.java | 10 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 87 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 234 +++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 1054 +++++++++++++++---
.../tez/dag/app/dag/impl/VertexManager.java | 30 +-
.../tez/dag/history/HistoryEventType.java | 5 +-
.../apache/tez/dag/history/ats/ATSService.java | 7 -
.../tez/dag/history/events/AMLaunchedEvent.java | 14 +-
.../tez/dag/history/events/AMStartedEvent.java | 9 +
.../history/events/ContainerLaunchedEvent.java | 12 +
.../history/events/DAGCommitStartedEvent.java | 94 ++
.../dag/history/events/DAGFinishedEvent.java | 67 +-
.../dag/history/events/DAGInitializedEvent.java | 7 +
.../tez/dag/history/events/DAGStartedEvent.java | 7 +
.../dag/history/events/DAGSubmittedEvent.java | 11 +
.../events/TaskAttemptFinishedEvent.java | 51 +-
.../history/events/TaskAttemptStartedEvent.java | 17 +-
.../dag/history/events/TaskFinishedEvent.java | 56 +-
.../dag/history/events/TaskStartedEvent.java | 14 +-
.../events/VertexCommitStartedEvent.java | 94 ++
.../VertexDataMovementEventsGeneratedEvent.java | 8 +
.../dag/history/events/VertexFinishedEvent.java | 59 +-
.../history/events/VertexInitializedEvent.java | 78 +-
.../events/VertexParallelismUpdatedEvent.java | 159 +++
.../dag/history/events/VertexStartedEvent.java | 14 +-
.../dag/history/recovery/RecoveryService.java | 171 ++-
.../apache/tez/dag/recovery/RecoveryParser.java | 186 ----
tez-dag/src/main/proto/HistoryEvents.proto | 23 +-
.../dag/api/client/TestVertexStatusBuilder.java | 8 +-
.../TestHistoryEventsProtoConversion.java | 569 ++++++++++
tez-dist/pom.xml | 12 +-
.../mapreduce/committer/MROutputCommitter.java | 34 +-
.../tez/runtime/api/impl/EventMetaData.java | 2 +-
.../apache/tez/runtime/api/impl/InputSpec.java | 4 +
.../vertexmanager/ShuffleVertexManager.java | 4 +-
.../org/apache/tez/test/TestDAGRecovery.java | 135 +++
.../apache/tez/test/dag/MultiAttemptDAG.java | 177 +++
60 files changed, 4259 insertions(+), 664 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 8f3101e..9e29910 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -116,8 +116,6 @@ public class TezSession {
sessionConfig.getTezConfiguration(), applicationId,
null, sessionName, sessionConfig.getAMConfiguration(),
tezJarResources, sessionCredentials);
- // Set Tez Sessions to not retry on AM crashes
- appContext.setMaxAppAttempts(1);
yarnClient.submitApplication(appContext);
} catch (YarnException e) {
throw new TezException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 142d2d9..29d04ab 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -369,12 +369,20 @@ public class TezConfiguration extends Configuration {
public static final String DAG_RECOVERY_ENABLED =
TEZ_PREFIX + "dag.recovery.enabled";
- public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = false;
+ public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
public static final String DAG_RECOVERY_FILE_IO_BUFFER_SIZE =
TEZ_PREFIX + "dag.recovery.io.buffer.size";
public static final int DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT = 8192;
+ public static final String DAG_RECOVERY_MAX_UNFLUSHED_EVENTS =
+ TEZ_PREFIX + "dag.recovery.max.unflushed.events";
+ public static final int DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT = 100;
+
+ public static final String DAG_RECOVERY_FLUSH_INTERVAL_SECS =
+ TEZ_PREFIX + "dag.recovery.flush.interval.secs";
+ public static final int DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT = 30;
+
public static final String DAG_RECOVERY_DATA_DIR_NAME = "recovery";
public static final String DAG_RECOVERY_SUMMARY_FILE_SUFFIX = ".summary";
public static final String DAG_RECOVERY_RECOVER_FILE_SUFFIX = ".recovery";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index a281b90..76202f8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -125,4 +125,9 @@ public interface VertexManagerPluginContext {
* @param locationHint
*/
public void setVertexLocationHint(VertexLocationHint locationHint);
+
+ /**
+ * @return DAG Attempt number
+ */
+ public int getDAGAttemptNumber();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
index 301d01b..aadbf12 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
@@ -69,4 +69,22 @@ public abstract class OutputCommitter {
public abstract void abortOutput(VertexStatus.State finalState)
throws Exception;
+ /**
+ * Whether the OutputCommitter supports recovery of output from a Task
+ * that completed in a previous DAG attempt
+ * @return True if recovery supported
+ */
+ public boolean isTaskRecoverySupported() {
+ return true;
+ }
+
+ /**
+ * Recover task output from a previous DAG attempt
+ * @param taskIndex Index of task to be recovered
+ * @param previousDAGAttempt Previous DAG Attempt Number
+ * @throws java.lang.Exception
+ */
+ public void recoverTask(int taskIndex, int previousDAGAttempt) throws Exception {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
index 3132758..b5837e8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
@@ -18,6 +18,8 @@
package org.apache.tez.runtime.api;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
@@ -63,4 +65,12 @@ public interface OutputCommitterContext {
*/
public byte[] getUserPayload();
+ /**
+ * Get Vertex Index in the DAG
+ * @return Vertex index
+ */
+ @Unstable
+ @Evolving
+ public int getVertexIndex();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 5faa7f1..c7a317e 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -171,13 +171,14 @@ message ProgressProto {
enum VertexStatusStateProto {
VERTEX_NEW = 0;
VERTEX_INITIALIZING = 1;
- VERTEX_INITED = 2;
- VERTEX_RUNNING = 3;
- VERTEX_SUCCEEDED = 4;
- VERTEX_FAILED = 5;
- VERTEX_KILLED = 6;
- VERTEX_ERROR = 7;
- VERTEX_TERMINATING = 8;
+ VERTEX_RECOVERING = 2;
+ VERTEX_INITED = 3;
+ VERTEX_RUNNING = 4;
+ VERTEX_SUCCEEDED = 5;
+ VERTEX_FAILED = 6;
+ VERTEX_KILLED = 7;
+ VERTEX_ERROR = 8;
+ VERTEX_TERMINATING = 9;
}
message VertexStatusProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 0e693b8..dc24f7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -66,6 +66,8 @@ public class VertexStatusBuilder extends VertexStatus {
return VertexStatusStateProto.VERTEX_NEW;
case INITIALIZING:
return VertexStatusStateProto.VERTEX_INITIALIZING;
+ case RECOVERING:
+ return VertexStatusStateProto.VERTEX_NEW;
case INITED:
return VertexStatusStateProto.VERTEX_INITED;
case RUNNING:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 1ce07fb..c8185b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -48,6 +48,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -220,6 +222,8 @@ public class DAGAppMaster extends AbstractService {
private FileSystem recoveryFS;
private int recoveryBufferSize;
+ protected boolean isLastAMRetry = false;
+
// DAG Counter
private final AtomicInteger dagCounter = new AtomicInteger();
@@ -262,6 +266,14 @@ public class DAGAppMaster extends AbstractService {
@Override
public synchronized void serviceInit(final Configuration conf) throws Exception {
+ int maxAppAttempts = 1;
+ String maxAppAttemptsEnv = System.getenv(
+ ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
+ if (maxAppAttemptsEnv != null) {
+ maxAppAttempts = Integer.valueOf(maxAppAttemptsEnv);
+ }
+ isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+
this.amConf = conf;
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@@ -481,6 +493,11 @@ public class DAGAppMaster extends AbstractService {
}
}
+ public void setCurrentDAG(DAG currentDAG) {
+ this.currentDAG = currentDAG;
+ context.setDAG(currentDAG);
+ }
+
private class DAGAppMasterEventHandler implements
EventHandler<DAGAppMasterEvent> {
@Override
@@ -539,7 +556,7 @@ public class DAGAppMaster extends AbstractService {
}
/** Create and initialize (but don't start) a single dag. */
- protected DAG createDAG(DAGPlan dagPB, TezDAGID dagId) {
+ DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) {
if (dagId == null) {
dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
dagCounter.incrementAndGet());
@@ -566,7 +583,7 @@ public class DAGAppMaster extends AbstractService {
TokenCache.setSessionToken(sessionToken, dagCredentials);
// create single dag
- DAG newDag =
+ DAGImpl newDag =
new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
taskAttemptListener, dagCredentials, clock,
appMasterUgi.getShortUserName(),
@@ -989,6 +1006,7 @@ public class DAGAppMaster extends AbstractService {
return TezSessionStatus.INITIALIZING;
case IDLE:
return TezSessionStatus.READY;
+ case RECOVERING:
case RUNNING:
return TezSessionStatus.RUNNING;
case ERROR:
@@ -1328,6 +1346,31 @@ public class DAGAppMaster extends AbstractService {
}
}
+ private DAG recoverDAG() throws IOException {
+ DAG recoveredDAG = null;
+ if (recoveryEnabled) {
+ if (this.appAttemptID.getAttemptId() > 1) {
+ this.state = DAGAppMasterState.RECOVERING;
+ RecoveryParser recoveryParser = new RecoveryParser(
+ this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
+ recoveredDAG = recoveryParser.parseRecoveryData();
+ if (recoveredDAG != null) {
+ LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
+ _updateLoggers(recoveredDAG, "");
+ DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
+ DAGEventType.DAG_RECOVER);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
+ } else {
+ LOG.info("No DAG to recover");
+ this.state = DAGAppMasterState.IDLE;
+ }
+ }
+ }
+ return recoveredDAG;
+ }
+
+ @SuppressWarnings("unchecked")
@Override
public synchronized void serviceStart() throws Exception {
@@ -1348,18 +1391,18 @@ public class DAGAppMaster extends AbstractService {
this.lastDAGCompletionTime = clock.getTime();
if (!isSession) {
- startDAG();
+ DAG recoveredDAG = null;
+ if (appAttemptID.getAttemptId() != 1) {
+ recoveredDAG = recoverDAG();
+ }
+ if (recoveredDAG == null) {
+ dagCounter.set(0);
+ startDAG();
+ }
} else {
LOG.info("In Session mode. Waiting for DAG over RPC");
this.state = DAGAppMasterState.IDLE;
-
- if (recoveryEnabled) {
- if (this.appAttemptID.getAttemptId() > 0) {
- // Recovery data and copy over into new recovery dir
- this.state = DAGAppMasterState.IDLE;
- // TODO
- }
- }
+ recoverDAG();
this.dagSubmissionTimer = new Timer(true);
this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@@ -1570,6 +1613,9 @@ public class DAGAppMaster extends AbstractService {
}
appMaster.stop();
+
+
+
}
}
@@ -1672,4 +1718,8 @@ public class DAGAppMaster extends AbstractService {
private void sendEvent(Event<?> event) {
dispatcher.getEventHandler().handle(event);
}
+
+ synchronized void setDAGCounter(int dagCounter) {
+ this.dagCounter.set(dagCounter);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
new file mode 100644
index 0000000..9e59849
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class RecoveryParser {
+
+ private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
+
+ private final DAGAppMaster dagAppMaster;
+ private final FileSystem recoveryFS;
+ private final Path recoveryDataDir;
+ private final Path currentAttemptRecoveryDataDir;
+ private final int recoveryBufferSize;
+ private final int currentAttemptId;
+
+ private static final String dataRecoveredFileFlag = "dataRecovered";
+
+ public RecoveryParser(DAGAppMaster dagAppMaster,
+ FileSystem recoveryFS,
+ Path recoveryDataDir,
+ int currentAttemptId) {
+ this.dagAppMaster = dagAppMaster;
+ this.recoveryFS = recoveryFS;
+ this.recoveryDataDir = recoveryDataDir;
+ this.currentAttemptId = currentAttemptId;
+ this.currentAttemptRecoveryDataDir =
+ getAttemptRecoveryDataDir(recoveryDataDir, currentAttemptId);
+ recoveryBufferSize = dagAppMaster.getConfig().getInt(
+ TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
+ TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+ }
+
+ private static void parseSummaryFile(FSDataInputStream inputStream)
+ throws IOException {
+ while (inputStream.available() > 0) {
+ RecoveryProtos.SummaryEventProto proto =
+ RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+ LOG.info("[SUMMARY]"
+ + " dagId=" + proto.getDagId()
+ + ", timestamp=" + proto.getTimestamp()
+ + ", event=" + HistoryEventType.values()[proto.getEventType()]);
+ }
+ }
+
+ private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
+ throws IOException {
+ int eventTypeOrdinal = inputStream.readInt();
+ if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
+ HistoryEventType.values().length) {
+ // Corrupt data
+ // reached end
+ throw new IOException("Corrupt data found when trying to read next event type"
+ + ", eventTypeOrdinal=" + eventTypeOrdinal);
+ }
+ HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
+ HistoryEvent event;
+ switch (eventType) {
+ case AM_LAUNCHED:
+ event = new AMLaunchedEvent();
+ break;
+ case AM_STARTED:
+ event = new AMStartedEvent();
+ break;
+ case DAG_SUBMITTED:
+ event = new DAGSubmittedEvent();
+ break;
+ case DAG_INITIALIZED:
+ event = new DAGInitializedEvent();
+ break;
+ case DAG_STARTED:
+ event = new DAGStartedEvent();
+ break;
+ case DAG_COMMIT_STARTED:
+ event = new DAGCommitStartedEvent();
+ break;
+ case DAG_FINISHED:
+ event = new DAGFinishedEvent();
+ break;
+ case CONTAINER_LAUNCHED:
+ event = new ContainerLaunchedEvent();
+ break;
+ case VERTEX_INITIALIZED:
+ event = new VertexInitializedEvent();
+ break;
+ case VERTEX_STARTED:
+ event = new VertexStartedEvent();
+ break;
+ case VERTEX_PARALLELISM_UPDATED:
+ event = new VertexParallelismUpdatedEvent();
+ break;
+ case VERTEX_COMMIT_STARTED:
+ event = new VertexCommitStartedEvent();
+ break;
+ case VERTEX_FINISHED:
+ event = new VertexFinishedEvent();
+ break;
+ case TASK_STARTED:
+ event = new TaskStartedEvent();
+ break;
+ case TASK_FINISHED:
+ event = new TaskFinishedEvent();
+ break;
+ case TASK_ATTEMPT_STARTED:
+ event = new TaskAttemptStartedEvent();
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ event = new TaskAttemptFinishedEvent();
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ event = new VertexDataMovementEventsGeneratedEvent();
+ break;
+ default:
+ throw new IOException("Invalid data found, unknown event type "
+ + eventType);
+
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parsing event from input stream"
+ + ", eventType=" + eventType);
+ }
+ event.fromProtoStream(inputStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parsed event from input stream"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ }
+ return event;
+ }
+
+
+
+
+
+ private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
+ throws IOException {
+ while (inputStream.available() > 0) {
+ HistoryEvent historyEvent = getNextEvent(inputStream);
+ LOG.info("Parsed event from recovery stream"
+ + ", eventType=" + historyEvent.getEventType()
+ + ", event=" + historyEvent);
+ }
+ }
+
+ private Path getAttemptRecoveryDataDir(Path recoveryDataDir,
+ int attemptId) {
+ return new Path(recoveryDataDir, Integer.toString(attemptId));
+ }
+
+ public static void main(String argv[]) throws IOException {
+ // TODO clean up with better usage and error handling
+ Configuration conf = new Configuration();
+ String summaryPath = argv[0];
+ List<String> dagPaths = new ArrayList<String>();
+ if (argv.length > 1) {
+ for (int i = 1; i < argv.length; ++i) {
+ dagPaths.add(argv[i]);
+ }
+ }
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Parsing Summary file " + summaryPath);
+ parseSummaryFile(fs.open(new Path(summaryPath)));
+ for (String dagPath : dagPaths) {
+ LOG.info("Parsing DAG recovery file " + dagPath);
+ parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+ }
+ }
+
+ private Path getSummaryPath(Path recoveryDataDir) {
+ return new Path(recoveryDataDir,
+ dagAppMaster.getAttemptID().getApplicationId().toString()
+ + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ }
+
+ private FSDataOutputStream getSummaryOutputStream(Path summaryPath)
+ throws IOException {
+ return recoveryFS.create(summaryPath, true, recoveryBufferSize);
+ }
+
+ private FSDataInputStream getSummaryStream(Path summaryPath)
+ throws IOException {
+ if (!recoveryFS.exists(summaryPath)) {
+ return null;
+ }
+ return recoveryFS.open(summaryPath, recoveryBufferSize);
+ }
+
+ private Path getDAGRecoveryFilePath(Path recoveryDataDir,
+ TezDAGID dagID) {
+ return new Path(recoveryDataDir,
+ dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ }
+
+ private FSDataInputStream getDAGRecoveryStream(Path recoveryDataDir,
+ TezDAGID dagID)
+ throws IOException {
+ Path dagRecoveryPath = getDAGRecoveryFilePath(recoveryDataDir, dagID);
+ if (!recoveryFS.exists(dagRecoveryPath)) {
+ return null;
+ }
+ return recoveryFS.open(dagRecoveryPath, recoveryBufferSize);
+ }
+
+ private FSDataOutputStream getDAGRecoveryOutputStream(Path recoveryDataDir,
+ TezDAGID dagID)
+ throws IOException {
+ Path dagRecoveryPath = new Path(recoveryDataDir,
+ dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
+ }
+
+ private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
+ TezDAGID inProgressDAG = null;
+ for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
+ if (!entry.getValue().booleanValue()) {
+ if (inProgressDAG != null) {
+ throw new RuntimeException("Multiple in progress DAGs seen"
+ + ", dagId=" + inProgressDAG
+ + ", dagId=" + entry.getKey());
+ }
+ inProgressDAG = entry.getKey();
+ }
+ }
+ return inProgressDAG;
+ }
+
+ private Path getPreviousAttemptRecoveryDataDir() {
+ int foundPreviousAttempt = -1;
+ for (int i = currentAttemptId - 1; i > 0; --i) {
+ Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+ Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag);
+ try {
+ if (recoveryFS.exists(dataRecoveredFile)) {
+ foundPreviousAttempt = i;
+ break;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception when checking previous attempt dir for "
+ + dataRecoveredFile.toString(), e);
+ }
+ }
+ if (foundPreviousAttempt == -1) {
+ LOG.info("Falling back to first attempt as no other recovered attempts"
+ + " found");
+ foundPreviousAttempt = 1;
+ }
+
+ return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
+ }
+
+
+ public DAG parseRecoveryData() throws IOException {
+ Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
+ LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
+ + " for recovering data from previous attempt");
+ if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
+ LOG.info("Nothing to recover as previous attempt data does not exist"
+ + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
+ return null;
+ }
+
+ Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir);
+ FSDataInputStream summaryStream = getSummaryStream(
+ summaryPath);
+ if (summaryStream == null) {
+ LOG.info("Nothing to recover as summary file does not exist"
+ + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
+ + ", summaryPath=" + summaryPath.toString());
+ return null;
+ }
+
+ Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir);
+ FSDataOutputStream newSummaryStream =
+ getSummaryOutputStream(newSummaryPath);
+
+ Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
+
+ FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
+ LOG.info("Parsing summary file"
+ + ", path=" + summaryPath.toString()
+ + ", len=" + summaryFileStatus.getLen()
+ + ", lastModTime=" + summaryFileStatus.getModificationTime());
+
+ int dagCounter = 0;
+ while (summaryStream.available() > 0) {
+ RecoveryProtos.SummaryEventProto proto =
+ RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+ HistoryEventType eventType =
+ HistoryEventType.values()[proto.getEventType()];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[RECOVERY SUMMARY]"
+ + " dagId=" + proto.getDagId()
+ + ", timestamp=" + proto.getTimestamp()
+ + ", event=" + eventType);
+ }
+ TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
+ if (dagCounter < dagId.getId()) {
+ dagCounter = dagId.getId();
+ }
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+ seenDAGs.put(dagId, false);
+ } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+ seenDAGs.put(dagId, true);
+ }
+ proto.writeDelimitedTo(newSummaryStream);
+ }
+ newSummaryStream.hsync();
+ newSummaryStream.close();
+
+ // Set counter for next set of DAGs
+ dagAppMaster.setDAGCounter(dagCounter);
+
+ TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+ if (lastInProgressDAG == null) {
+ LOG.info("Nothing to recover as no uncompleted DAGs found");
+ return null;
+ }
+
+ LOG.info("Trying to recover dag from recovery file"
+ + ", dagId=" + lastInProgressDAG.toString()
+ + ", dataDir=" + previousAttemptRecoveryDataDir
+ + ", intoCurrentDir=" + currentAttemptRecoveryDataDir);
+
+ FSDataInputStream dagRecoveryStream = getDAGRecoveryStream(
+ previousAttemptRecoveryDataDir, lastInProgressDAG);
+ if (dagRecoveryStream == null) {
+ // Could not find data to recover
+ // Error out
+ throw new IOException("Could not find recovery data for last in progress DAG"
+ + ", dagId=" + lastInProgressDAG);
+ }
+
+ DAGImpl recoveredDAG = null;
+
+ LOG.info("Copying DAG data into Current Attempt directory"
+ + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
+ lastInProgressDAG));
+ FSDataOutputStream newDAGRecoveryStream =
+ getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
+
+ while (dagRecoveryStream.available() > 0) {
+ HistoryEvent event;
+ try {
+ event = getNextEvent(dagRecoveryStream);
+ } catch (IOException ioe) {
+ LOG.warn("Corrupt data found when trying to read next event", ioe);
+ break;
+ }
+ if (event == null) {
+ // reached end of data
+ break;
+ }
+ HistoryEventType eventType = event.getEventType();
+ switch (eventType) {
+ case DAG_SUBMITTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+ lastInProgressDAG);
+ dagAppMaster.setCurrentDAG(recoveredDAG);
+ break;
+ }
+ case DAG_INITIALIZED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case DAG_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case DAG_COMMIT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case DAG_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ // If this is seen, nothing to recover
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ return recoveredDAG;
+ }
+ case CONTAINER_LAUNCHED:
+ {
+ // Nothing to do?
+ break;
+ }
+ case VERTEX_INITIALIZED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexStartedEvent vEvent = (VertexStartedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_PARALLELISM_UPDATED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_COMMIT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case TASK_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskStartedEvent tEvent = (TaskStartedEvent) event;
+ Task task = recoveredDAG.getVertex(
+ tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case TASK_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
+ Task task = recoveredDAG.getVertex(
+ tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case TASK_ATTEMPT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
+ Task task =
+ recoveredDAG.getVertex(
+ tEvent.getTaskAttemptID().getTaskID().getVertexID())
+ .getTask(tEvent.getTaskAttemptID().getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case TASK_ATTEMPT_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
+ Task task =
+ recoveredDAG.getVertex(
+ tEvent.getTaskAttemptID().getTaskID().getVertexID())
+ .getTask(tEvent.getTaskAttemptID().getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexDataMovementEventsGeneratedEvent vEvent =
+ (VertexDataMovementEventsGeneratedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ default:
+ throw new RuntimeException("Invalid data found, unknown event type "
+ + eventType);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[DAG RECOVERY]"
+ + " dagId=" + lastInProgressDAG
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ }
+ newDAGRecoveryStream.writeInt(eventType.ordinal());
+ event.toProtoStream(newDAGRecoveryStream);
+ }
+ newDAGRecoveryStream.hsync();
+ newDAGRecoveryStream.close();
+
+ Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
+ dataRecoveredFileFlag);
+ LOG.info("Finished copying data from previous attempt into current attempt"
+ + " - setting flag by creating file"
+ + ", path=" + dataCopiedFlagPath.toString());
+ FSDataOutputStream flagFile =
+ recoveryFS.create(dataCopiedFlagPath, true, recoveryBufferSize);
+ flagFile.writeInt(1);
+ flagFile.hsync();
+ flagFile.close();
+
+ return recoveredDAG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 8117619..45fb50a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
@@ -84,4 +85,6 @@ public interface DAG {
Credentials getCredentials();
UserGroupInformation getDagUGI();
+
+ DAGState restoreFromEvent(HistoryEvent historyEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 8243b70..ac96681 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskReport;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -80,4 +82,5 @@ public interface Task {
public List<String> getDiagnostics();
+ TaskState restoreFromEvent(HistoryEvent historyEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 0cc9163..2af1232 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -27,6 +27,7 @@ import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -36,7 +37,7 @@ import org.apache.tez.dag.records.TezVertexID;
* Read only view of TaskAttempt.
*/
public interface TaskAttempt {
-
+
public static class TaskAttemptStatus {
public TaskAttemptState state;
public DAGCounter localityCounter;
@@ -118,4 +119,7 @@ public interface TaskAttempt {
public Task getTask();
public boolean getIsRescheduled();
+
+ TaskAttemptState restoreFromEvent(HistoryEvent event);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9e7a0a7..807e9b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -37,6 +37,8 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
@@ -114,4 +116,7 @@ public interface Vertex extends Comparable<Vertex> {
// TODO remove this once RootInputVertexManager is fixed to not use
// internal apis
AppContext getAppContext();
+
+ VertexState restoreFromEvent(HistoryEvent event);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 5a7af0a..7130c7a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -27,4 +27,5 @@ public enum VertexState {
KILLED,
ERROR,
TERMINATING,
+ RECOVERING,
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 476c688..741dcfa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -26,7 +26,7 @@ public enum DAGEventType {
//Producer:Client
DAG_KILL,
- //Producer:MRAppMaster
+ //Producer:AM
DAG_INIT,
DAG_START,
@@ -44,4 +44,8 @@ public enum DAGEventType {
DAG_DIAGNOSTIC_UPDATE,
INTERNAL_ERROR,
DAG_COUNTER_UPDATE,
+
+ // Event to trigger recovery
+ // Producer:AM
+ DAG_RECOVER
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 018bd3d..db3fd3b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -60,5 +60,8 @@ public enum TaskAttemptEventType {
// Producer: consumer destination vertex
TA_OUTPUT_FAILED,
+
+ // Recovery
+ TA_RECOVER,
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
new file mode 100644
index 0000000..e7e59e3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class TaskEventRecoverTask extends TaskEvent {
+
+ TaskState desiredState;
+
+ public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState) {
+ super(taskID, TaskEventType.T_RECOVER);
+ this.desiredState = desiredState;
+ }
+
+ public TaskEventRecoverTask(TezTaskID taskID) {
+ this(taskID, null);
+ }
+
+ public TaskState getDesiredState() {
+ return desiredState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index a0b99a9..4830ae0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -40,5 +40,9 @@ public enum TaskEventType {
T_ATTEMPT_OUTPUT_CONSUMABLE,
T_ATTEMPT_FAILED,
T_ATTEMPT_SUCCEEDED,
- T_ATTEMPT_KILLED
+ T_ATTEMPT_KILLED,
+
+ // Recovery event
+ T_RECOVER
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
new file mode 100644
index 0000000..34e45fe
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventRecoverVertex extends VertexEvent {
+
+ VertexState desiredState;
+
+ public VertexEventRecoverVertex(TezVertexID vertexId, VertexState desiredState) {
+ super(vertexId, VertexEventType.V_RECOVER);
+ this.desiredState = desiredState;
+ }
+
+ public VertexState getDesiredState() {
+ return desiredState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
index a872ae2..69195db 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -27,13 +27,25 @@ public class VertexEventRouteEvent extends VertexEvent {
final List<TezEvent> events;
+ final boolean recovered;
+
public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) {
+ this(vertexId, events, false);
+ }
+
+ public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events,
+ boolean recovered) {
super(vertexId, VertexEventType.V_ROUTE_EVENT);
this.events = events;
+ this.recovered = recovered;
}
-
+
public List<TezEvent> getEvents() {
return events;
}
+ public boolean isRecovered() {
+ return recovered;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
new file mode 100644
index 0000000..5e61369
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+
+import java.util.List;
+
+public class VertexEventSourceVertexRecovered extends VertexEvent {
+
+ VertexState sourceVertexState;
+ TezVertexID sourceVertexID;
+ List<TezTaskAttemptID> completedTaskAttempts;
+
+ public VertexEventSourceVertexRecovered(TezVertexID vertexID,
+ TezVertexID sourceVertexID,
+ VertexState sourceVertexState,
+ List<TezTaskAttemptID> completedTaskAttempts) {
+ super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED);
+ this.sourceVertexState = sourceVertexState;
+ this.sourceVertexID = sourceVertexID;
+ this.completedTaskAttempts = completedTaskAttempts;
+ }
+
+ public VertexState getSourceVertexState() {
+ return sourceVertexState;
+ }
+
+ public TezVertexID getSourceVertexID() {
+ return sourceVertexID;
+ }
+
+ public List<TezTaskAttemptID> getCompletedTaskAttempts() {
+ return completedTaskAttempts;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 0cf14eb..69952d9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -56,5 +56,11 @@ public enum VertexEventType {
//Producer: VertexInputInitializer
V_ROOT_INPUT_INITIALIZED,
V_ROOT_INPUT_FAILED,
-
+
+ // Recover Event, Producer:DAG
+ V_RECOVER,
+
+ // Recover Event, Producer:Vertex
+ V_SOURCE_VERTEX_RECOVERED
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 04ed223..432c189 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -90,12 +90,15 @@ import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -157,6 +160,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final List<String> diagnostics = new ArrayList<String>();
+ // Recovery related flags
+ boolean recoveryInitEventSeen = false;
+ boolean recoveryStartEventSeen = false;
+
private static final DiagnosticsUpdateTransition
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition
@@ -176,6 +183,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
.addTransition(DAGState.NEW, DAGState.NEW,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.NEW,
+ EnumSet.of(DAGState.NEW, DAGState.INITED, DAGState.RUNNING,
+ DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED,
+ DAGState.ERROR, DAGState.TERMINATING),
+ DAGEventType.DAG_RECOVER,
+ new RecoverTransition())
.addTransition(DAGState.NEW, DAGState.NEW,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
@@ -342,7 +355,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
-
+ private DAGState recoveredState = DAGState.NEW;
+ private boolean recoveryCommitInProgress = false;
+
static class VertexGroupInfo {
String groupName;
Set<String> groupMembers;
@@ -466,6 +481,46 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
+ public DAGState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case DAG_INITIALIZED:
+ recoveredState = initializeDAG((DAGInitializedEvent) historyEvent);
+ recoveryInitEventSeen = true;
+ return recoveredState;
+ case DAG_STARTED:
+ if (!recoveryInitEventSeen) {
+ throw new RuntimeException("Started Event seen but"
+ + " no Init Event was encountered earlier");
+ }
+ recoveryStartEventSeen = true;
+ this.startTime = ((DAGStartedEvent) historyEvent).getStartTime();
+ recoveredState = DAGState.RUNNING;
+ return recoveredState;
+ case DAG_COMMIT_STARTED:
+ if (recoveredState != DAGState.RUNNING) {
+ throw new RuntimeException("Commit Started Event seen but"
+ + " recovered state is not RUNNING"
+ + ", recoveredState=" + recoveredState);
+ }
+ recoveryCommitInProgress = true;
+ return recoveredState;
+ case DAG_FINISHED:
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Start Event was encountered earlier");
+ }
+ recoveryCommitInProgress = false;
+ DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
+ this.finishTime = finishedEvent.getFinishTime();
+ recoveredState = finishedEvent.getState();
+ return recoveredState;
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+ }
+ }
+
+ @Override
public TezCounters getAllCounters() {
readLock.lock();
@@ -666,6 +721,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
boolean failedWhileCommitting = false;
if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
// commit all shared outputs
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+ new DAGCommitStartedEvent(getID())));
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
if (failedWhileCommitting) {
break;
@@ -820,7 +877,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
void logJobHistoryFinishedEvent() {
this.setFinishTime();
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
- finishTime, DAGStatus.State.SUCCEEDED, "", getAllCounters());
+ finishTime, DAGState.SUCCEEDED, "", getAllCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(dagId, finishEvt));
}
@@ -839,7 +896,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
new DAGHistoryEvent(dagId, startEvt));
}
- void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
+ void logJobHistoryUnsuccesfulEvent(DAGState state) {
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
clock.getTime(), state,
StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
@@ -947,11 +1004,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
}
- DAGStatus.State logState = getDAGStatusFromState(finalState);
- if (logState == DAGStatus.State.SUCCEEDED) {
+ if (finalState == DAGState.SUCCEEDED) {
logJobHistoryFinishedEvent();
} else {
- logJobHistoryUnsuccesfulEvent(logState);
+ logJobHistoryUnsuccesfulEvent(finalState);
}
eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
@@ -1047,179 +1103,294 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return null;
}
- // TODO Recovery
- /*
- @Override
- public List<AMInfo> getAMInfos() {
- return amInfos;
+ public DAGState initializeDAG() {
+ return initializeDAG(null);
}
- */
-
- private static class InitTransition
- implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
- /**
- * Note that this transition method is called directly (and synchronously)
- * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
- * just plain sequential call within AM context), so we can trigger
- * modifications in AM state from here (at least, if AM is written that
- * way; MR version is).
- */
- @Override
- public DAGState transition(DAGImpl dag, DAGEvent event) {
- // TODO Metrics
- //dag.metrics.submittedJob(dag);
- //dag.metrics.preparingJob(dag);
+ DAGState initializeDAG(DAGInitializedEvent event) {
+ if (event != null) {
+ initTime = event.getInitTime();
+ } else {
+ initTime = clock.getTime();
+ }
- dag.initTime = dag.clock.getTime();
- dag.commitAllOutputsOnSuccess = dag.conf.getBoolean(
- TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
- TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+ commitAllOutputsOnSuccess = conf.getBoolean(
+ TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+ TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+
+ // If we have no vertices, fail the dag
+ numVertices = getJobPlan().getVertexCount();
+ if (numVertices == 0) {
+ addDiagnostic("No vertices for dag");
+ trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
+ if (event != null) {
+ return DAGState.FAILED;
+ }
+ return finished(DAGState.FAILED);
+ }
- // If we have no vertices, fail the dag
- dag.numVertices = dag.getJobPlan().getVertexCount();
- if (dag.numVertices == 0) {
- dag.addDiagnostic("No vertices for dag");
- dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
- return dag.finished(DAGState.FAILED);
+ if (jobPlan.getVertexGroupsCount() > 0) {
+ for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
+ vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
-
- if (dag.jobPlan.getVertexGroupsCount() > 0) {
- for (PlanVertexGroupInfo groupInfo : dag.jobPlan.getVertexGroupsList()) {
- dag.vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
- }
- for (VertexGroupInfo groupInfo : dag.vertexGroups.values()) {
- for (String vertexName : groupInfo.groupMembers) {
- List<VertexGroupInfo> groupList = dag.vertexGroupInfo.get(vertexName);
- if (groupList == null) {
- groupList = Lists.newLinkedList();
- dag.vertexGroupInfo.put(vertexName, groupList);
- }
- groupList.add(groupInfo);
+ for (VertexGroupInfo groupInfo : vertexGroups.values()) {
+ for (String vertexName : groupInfo.groupMembers) {
+ List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
+ if (groupList == null) {
+ groupList = Lists.newLinkedList();
+ vertexGroupInfo.put(vertexName, groupList);
}
+ groupList.add(groupInfo);
}
}
+ }
- // create the vertices`
- for (int i=0; i < dag.numVertices; ++i) {
- String vertexName = dag.getJobPlan().getVertex(i).getName();
- VertexImpl v = createVertex(dag, vertexName, i);
- dag.addVertex(v);
- }
+ // create the vertices`
+ for (int i=0; i < numVertices; ++i) {
+ String vertexName = getJobPlan().getVertex(i).getName();
+ VertexImpl v = createVertex(this, vertexName, i);
+ addVertex(v);
+ }
- createDAGEdges(dag);
- Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+ createDAGEdges(this);
+ Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
- // setup the dag
- for (Vertex v : dag.vertices.values()) {
- parseVertexEdges(dag, edgePlans, v);
- }
+ // setup the dag
+ for (Vertex v : vertices.values()) {
+ parseVertexEdges(this, edgePlans, v);
+ }
- // Initialize the edges, now that the payload and vertices have been set.
- for (Edge e : dag.edges.values()) {
- e.initialize();
- }
+ // Initialize the edges, now that the payload and vertices have been set.
+ for (Edge e : edges.values()) {
+ e.initialize();
+ }
- assignDAGScheduler(dag);
-
- for (Map.Entry<String, VertexGroupInfo> entry : dag.vertexGroups.entrySet()) {
- String groupName = entry.getKey();
- VertexGroupInfo groupInfo = entry.getValue();
- if (!groupInfo.outputs.isEmpty()) {
- // shared outputs
- for (String vertexName : groupInfo.groupMembers) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting shared outputs for group: " + groupName +
- " on vertex: " + vertexName);
- }
- Vertex v = dag.getVertex(vertexName);
- v.addSharedOutputs(groupInfo.outputs);
+ assignDAGScheduler(this);
+
+ for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
+ String groupName = entry.getKey();
+ VertexGroupInfo groupInfo = entry.getValue();
+ if (!groupInfo.outputs.isEmpty()) {
+ // shared outputs
+ for (String vertexName : groupInfo.groupMembers) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting shared outputs for group: " + groupName +
+ " on vertex: " + vertexName);
}
+ Vertex v = getVertex(vertexName);
+ v.addSharedOutputs(groupInfo.outputs);
}
}
+ }
+ return DAGState.INITED;
+ }
- // TODO Metrics
- //dag.metrics.endPreparingJob(dag);
- dag.logJobHistoryInitedEvent();
- return DAGState.INITED;
-
+ private void createDAGEdges(DAGImpl dag) {
+ for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
+ EdgeProperty edgeProperty = DagTypeConverters
+ .createEdgePropertyMapFromDAGPlan(edgePlan);
+
+ // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
+ // referencing the fake edge manager within the API module.
+ if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
+ && edgeProperty.getEdgeManagerDescriptor() == null) {
+ EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
+ NullEdgeManager.class.getName());
+ EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
+ edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
+ edgeProperty.getEdgeDestination());
+ edgeProperty = ep;
+ }
+ // edge manager may be also set via API when using custom edge type
+ dag.edges.put(edgePlan.getId(),
+ new Edge(edgeProperty, dag.getEventHandler()));
}
+ }
- private void createDAGEdges(DAGImpl dag) {
- for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
- EdgeProperty edgeProperty = DagTypeConverters
- .createEdgePropertyMapFromDAGPlan(edgePlan);
-
- // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
- // referencing the fake edge manager within the API module.
- if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
- && edgeProperty.getEdgeManagerDescriptor() == null) {
- EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
- NullEdgeManager.class.getName());
- EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
- edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
- edgeProperty.getEdgeDestination());
- edgeProperty = ep;
- }
-
- // edge manager may be also set via API when using custom edge type
- dag.edges.put(edgePlan.getId(),
- new Edge(edgeProperty, dag.getEventHandler()));
- }
+ private static void assignDAGScheduler(DAGImpl dag) {
+ LOG.info("Using Natural order dag scheduler");
+ dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+ }
+
+ private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
+ TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+
+ VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
+ VertexLocationHint vertexLocationHint = DagTypeConverters
+ .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+
+ VertexImpl v = new VertexImpl(
+ vertexId, vertexPlan, vertexName, dag.conf,
+ dag.eventHandler, dag.taskAttemptListener,
+ dag.clock, dag.taskHeartbeatHandler,
+ !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
+ dag.vertexGroups);
+ return v;
+ }
+
+ // hooks up this VertexImpl to input and output EdgeProperties
+ private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
+ VertexPlan vertexPlan = vertex.getVertexPlan();
+
+ Map<Vertex, Edge> inVertices =
+ new HashMap<Vertex, Edge>();
+
+ Map<Vertex, Edge> outVertices =
+ new HashMap<Vertex, Edge>();
+
+ for(String inEdgeId : vertexPlan.getInEdgeIdList()){
+ EdgePlan edgePlan = edgePlans.get(inEdgeId);
+ Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
+ Edge edge = dag.edges.get(inEdgeId);
+ edge.setSourceVertex(inVertex);
+ edge.setDestinationVertex(vertex);
+ inVertices.put(inVertex, edge);
}
- private void assignDAGScheduler(DAGImpl dag) {
- LOG.info("Using Natural order dag scheduler");
- dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+ for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
+ EdgePlan edgePlan = edgePlans.get(outEdgeId);
+ Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
+ Edge edge = dag.edges.get(outEdgeId);
+ edge.setSourceVertex(vertex);
+ edge.setDestinationVertex(outVertex);
+ outVertices.put(outVertex, edge);
}
- private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
- TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+ vertex.setInputVertices(inVertices);
+ vertex.setOutputVertices(outVertices);
+ }
- VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
- VertexLocationHint vertexLocationHint = DagTypeConverters
- .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+ private static class RecoverTransition
+ implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
+ @Override
+ public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+ switch (dag.recoveredState) {
+ case NEW:
+ // send DAG an Init and start events
+ dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+ dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_START));
+ return DAGState.NEW;
+ case INITED:
+ // DAG inited but not started
+ // This implies vertices need to be sent init event
+ // Root vertices need to be sent start event
+ // The vertices may already have been sent these events but the
+ // DAG start may not have been persisted
+ for (Vertex v : dag.vertices.values()) {
+ if (v.getInputVerticesCount() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Running Recovery event to root vertex "
+ + v.getName());
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ VertexState.RUNNING));
+ }
+ }
+ return DAGState.RUNNING;
+ case RUNNING:
+ // if commit is in progress, DAG should fail as commits are not
+ // recoverable
+ if (dag.recoveryCommitInProgress) {
+ // Fail the DAG as we have not seen a completion
+ dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
+ dag.setFinishTime();
+ // Recover all other data for all vertices
+ // send recover event to all vertices with a final end state
+ for (Vertex v : dag.vertices.values()) {
+ VertexState desiredState = VertexState.SUCCEEDED;
+ if (dag.recoveredState.equals(DAGState.KILLED)) {
+ desiredState = VertexState.KILLED;
+ } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+ dag.recoveredState)) {
+ desiredState = VertexState.FAILED;
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ desiredState));
+ }
+ dag.logJobHistoryUnsuccesfulEvent(DAGState.FAILED);
+ dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+ DAGState.FAILED));
+ return DAGState.FAILED;
+ }
+ for (Vertex v : dag.vertices.values()) {
+ if (v.getInputVerticesCount() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Running Recovery event to root vertex "
+ + v.getName());
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ VertexState.RUNNING));
+ }
+ }
+ return DAGState.RUNNING;
+ case SUCCEEDED:
+ case ERROR:
+ case FAILED:
+ case KILLED:
+ // Completed
+
+ // Recover all other data for all vertices
+ // send recover event to all vertices with a final end state
+ for (Vertex v : dag.vertices.values()) {
+ VertexState desiredState = VertexState.SUCCEEDED;
+ if (dag.recoveredState.equals(DAGState.KILLED)) {
+ desiredState = VertexState.KILLED;
+ } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+ dag.recoveredState)) {
+ desiredState = VertexState.FAILED;
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ desiredState));
+ }
- VertexImpl v = new VertexImpl(
- vertexId, vertexPlan, vertexName, dag.conf,
- dag.eventHandler, dag.taskAttemptListener,
- dag.clock, dag.taskHeartbeatHandler,
- !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
- dag.vertexGroups);
- return v;
+ // Let us inform AM of completion
+ dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+ dag.recoveredState));
+
+ LOG.info("Recovered DAG: " + dag.getID() + " finished with state: "
+ + dag.recoveredState);
+ return dag.recoveredState;
+ default:
+ // Error state
+ LOG.warn("Trying to recover DAG, failed to recover"
+ + " from non-handled state" + dag.recoveredState);
+ dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+ DAGState.ERROR));
+ return DAGState.ERROR;
+ }
}
- // hooks up this VertexImpl to input and output EdgeProperties
- private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
- VertexPlan vertexPlan = vertex.getVertexPlan();
+ }
- Map<Vertex, Edge> inVertices =
- new HashMap<Vertex, Edge>();
+ private static class InitTransition
+ implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
- Map<Vertex, Edge> outVertices =
- new HashMap<Vertex, Edge>();
+ /**
+ * Note that this transition method is called directly (and synchronously)
+ * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
+ * just plain sequential call within AM context), so we can trigger
+ * modifications in AM state from here (at least, if AM is written that
+ * way; MR version is).
+ */
+ @Override
+ public DAGState transition(DAGImpl dag, DAGEvent event) {
+ // TODO Metrics
+ //dag.metrics.submittedJob(dag);
+ //dag.metrics.preparingJob(dag);
- for(String inEdgeId : vertexPlan.getInEdgeIdList()){
- EdgePlan edgePlan = edgePlans.get(inEdgeId);
- Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
- Edge edge = dag.edges.get(inEdgeId);
- edge.setSourceVertex(inVertex);
- edge.setDestinationVertex(vertex);
- inVertices.put(inVertex, edge);
+ DAGState state = dag.initializeDAG();
+ if (state != DAGState.INITED) {
+ return state;
}
- for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
- EdgePlan edgePlan = edgePlans.get(outEdgeId);
- Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
- Edge edge = dag.edges.get(outEdgeId);
- edge.setSourceVertex(vertex);
- edge.setDestinationVertex(outVertex);
- outVertices.put(outVertex, edge);
- }
+ // TODO Metrics
+ //dag.metrics.endPreparingJob(dag);
+ dag.logJobHistoryInitedEvent();
+ return DAGState.INITED;
+
- vertex.setInputVertices(inVertices);
- vertex.setOutputVertices(outVertices);
}
} // end of InitTransition
@@ -1427,6 +1598,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
for (VertexGroupInfo groupInfo : commitList) {
groupInfo.committed = true;
Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+ new DAGCommitStartedEvent(dagId)));
for (String outputName : groupInfo.outputs) {
OutputCommitter committer = v.getOutputCommitters().get(outputName);
LOG.info("Committing output: " + outputName);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
index 070bd75..ff9e401 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
@@ -31,13 +31,15 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
private final String vertexName;
private final String outputName;
private final byte[] userPayload;
+ private final int vertexIdx;
public OutputCommitterContextImpl(ApplicationId applicationId,
int dagAttemptNumber,
String dagName,
String vertexName,
String outputName,
- @Nullable byte[] userPayload) {
+ @Nullable byte[] userPayload,
+ int vertexIdx) {
checkNotNull(applicationId, "applicationId is null");
checkNotNull(dagName, "dagName is null");
checkNotNull(vertexName, "vertexName is null");
@@ -48,6 +50,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
this.vertexName = vertexName;
this.outputName = outputName;
this.userPayload = userPayload;
+ this.vertexIdx = vertexIdx;
}
@Override
@@ -80,4 +83,9 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
return userPayload;
}
+ @Override
+ public int getVertexIndex() {
+ return vertexIdx;
+ }
+
}