You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/14 02:47:33 UTC
[2/2] git commit: TEZ-904. Committer recovery events should be
out-of-band. (hitesh)
TEZ-904. Committer recovery events should be out-of-band. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f58508a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f58508a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f58508a5
Branch: refs/heads/master
Commit: f58508a5692c817a11b7996ba563ba8a26b85560
Parents: 693f2ca
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Mar 13 18:47:05 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Mar 13 18:47:05 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/app/DAGAppMaster.java | 77 ++--
.../org/apache/tez/dag/app/RecoveryParser.java | 354 +++++++++++++++----
.../dag/app/dag/event/DAGEventRecoverEvent.java | 37 ++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 73 +++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 114 +++---
.../apache/tez/dag/history/HistoryEvent.java | 8 +-
.../tez/dag/history/HistoryEventType.java | 4 +-
.../apache/tez/dag/history/SummaryEvent.java | 11 +
.../history/events/DAGCommitStartedEvent.java | 34 +-
.../dag/history/events/DAGFinishedEvent.java | 33 +-
.../dag/history/events/DAGSubmittedEvent.java | 23 +-
.../events/VertexCommitStartedEvent.java | 41 ++-
.../dag/history/events/VertexFinishedEvent.java | 51 ++-
.../events/VertexGroupCommitFinishedEvent.java | 132 +++++++
.../events/VertexGroupCommitStartedEvent.java | 132 +++++++
.../dag/history/recovery/RecoveryService.java | 181 +++++-----
tez-dag/src/main/proto/HistoryEvents.proto | 21 ++
.../TestHistoryEventsProtoConversion.java | 75 +++-
.../org/apache/tez/test/TestDAGRecovery2.java | 144 ++++++++
.../java/org/apache/tez/test/TestProcessor.java | 5 +
.../apache/tez/test/dag/MultiAttemptDAG.java | 109 ++++++
21 files changed, 1390 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9df8752..9a01090 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app;
import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -99,6 +100,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
@@ -108,6 +110,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -1371,28 +1374,19 @@ public class DAGAppMaster extends AbstractService {
}
}
- private DAG recoverDAG() throws IOException {
- DAG recoveredDAG = null;
+ private RecoveredDAGData recoverDAG() throws IOException {
if (recoveryEnabled) {
if (this.appAttemptID.getAttemptId() > 1) {
+ LOG.info("Recovering data from previous attempts"
+ + ", currentAttemptId=" + this.appAttemptID.getAttemptId());
this.state = DAGAppMasterState.RECOVERING;
RecoveryParser recoveryParser = new RecoveryParser(
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
- recoveredDAG = recoveryParser.parseRecoveryData();
- if (recoveredDAG != null) {
- LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
- _updateLoggers(recoveredDAG, "");
- DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
- DAGEventType.DAG_RECOVER);
- dagEventDispatcher.handle(recoverDAGEvent);
- this.state = DAGAppMasterState.RUNNING;
- } else {
- LOG.info("No DAG to recover");
- this.state = DAGAppMasterState.IDLE;
- }
+ RecoveredDAGData recoveredDAGData = recoveryParser.parseRecoveryData();
+ return recoveredDAGData;
}
}
- return recoveredDAG;
+ return null;
}
@SuppressWarnings("unchecked")
@@ -1415,20 +1409,56 @@ public class DAGAppMaster extends AbstractService {
this.lastDAGCompletionTime = clock.getTime();
+ RecoveredDAGData recoveredDAGData = recoverDAG();
+
if (!isSession) {
- DAG recoveredDAG = null;
- if (appAttemptID.getAttemptId() != 1) {
- recoveredDAG = recoverDAG();
+ LOG.info("In Non-Session mode.");
+ } else {
+ LOG.info("In Session mode. Waiting for DAG over RPC");
+ this.state = DAGAppMasterState.IDLE;
+ }
+
+ if (recoveredDAGData != null) {
+ if (recoveredDAGData.isCompleted
+ || recoveredDAGData.nonRecoverable) {
+ LOG.info("Found previous DAG in completed or non-recoverable state"
+ + ", dagId=" + recoveredDAGData.recoveredDagID
+ + ", isCompleted=" + recoveredDAGData.isCompleted
+ + ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
+ + ", state=" + (recoveredDAGData.dagState == null ? "null" :
+ recoveredDAGData.dagState)
+ + ", failureReason=" + recoveredDAGData.reason);
+ _updateLoggers(recoveredDAGData.recoveredDAG, "");
+ if (recoveredDAGData.nonRecoverable) {
+ DAGEventRecoverEvent recoverDAGEvent =
+ new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+ DAGState.FAILED);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
+ } else {
+ DAGEventRecoverEvent recoverDAGEvent =
+ new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+ recoveredDAGData.dagState);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
+ }
+ } else {
+ LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
+ _updateLoggers(recoveredDAGData.recoveredDAG, "");
+ DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAGData.recoveredDAG.getID(),
+ DAGEventType.DAG_RECOVER);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
}
- if (recoveredDAG == null) {
+ } else {
+ if (!isSession) {
+ // No dag recovered - in non-session, just restart the original DAG
dagCounter.set(0);
startDAG();
}
- } else {
- LOG.info("In Session mode. Waiting for DAG over RPC");
- this.state = DAGAppMasterState.IDLE;
- recoverDAG();
+ }
+ if (isSession) {
this.dagSubmissionTimer = new Timer(true);
this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@Override
@@ -1436,7 +1466,6 @@ public class DAGAppMaster extends AbstractService {
checkAndHandleSessionTimeout();
}
}, sessionTimeoutInterval, sessionTimeoutInterval / 10);
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 9e59849..7e1feca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -18,6 +18,13 @@
package org.apache.tez.dag.app;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,7 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
@@ -37,9 +44,9 @@ import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
@@ -48,17 +55,15 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
public class RecoveryParser {
@@ -88,6 +93,15 @@ public class RecoveryParser {
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
}
+ public static class RecoveredDAGData {
+ public TezDAGID recoveredDagID = null;
+ public DAGImpl recoveredDAG = null;
+ public DAGState dagState = null;
+ public boolean isCompleted = false;
+ public boolean nonRecoverable = false;
+ public String reason = null;
+ }
+
private static void parseSummaryFile(FSDataInputStream inputStream)
throws IOException {
while (inputStream.available() > 0) {
@@ -149,6 +163,12 @@ public class RecoveryParser {
case VERTEX_COMMIT_STARTED:
event = new VertexCommitStartedEvent();
break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ event = new VertexGroupCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ event = new VertexGroupCommitFinishedEvent();
+ break;
case VERTEX_FINISHED:
event = new VertexFinishedEvent();
break;
@@ -266,18 +286,25 @@ public class RecoveryParser {
return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
}
- private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
- TezDAGID inProgressDAG = null;
- for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
- if (!entry.getValue().booleanValue()) {
+ private DAGSummaryData getLastCompletedOrInProgressDAG(
+ Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
+ DAGSummaryData inProgressDAG = null;
+ DAGSummaryData lastCompletedDAG = null;
+ for (Map.Entry<TezDAGID, DAGSummaryData> entry : dagSummaryDataMap.entrySet()) {
+ if (!entry.getValue().completed) {
if (inProgressDAG != null) {
throw new RuntimeException("Multiple in progress DAGs seen"
- + ", dagId=" + inProgressDAG
+ + ", dagId=" + inProgressDAG.dagId
+ ", dagId=" + entry.getKey());
}
- inProgressDAG = entry.getKey();
+ inProgressDAG = entry.getValue();
+ } else {
+ lastCompletedDAG = entry.getValue();
}
}
+ if (inProgressDAG == null) {
+ return lastCompletedDAG;
+ }
return inProgressDAG;
}
@@ -305,8 +332,134 @@ public class RecoveryParser {
return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
}
+ private static class DAGSummaryData {
+
+ final TezDAGID dagId;
+ boolean completed = false;
+ boolean dagCommitCompleted = true;
+ DAGState dagState;
+ Map<TezVertexID, Boolean> vertexCommitStatus =
+ new HashMap<TezVertexID, Boolean>();
+ Map<String, Boolean> vertexGroupCommitStatus =
+ new HashMap<String, Boolean>();
+ List<HistoryEvent> bufferedSummaryEvents =
+ new ArrayList<HistoryEvent>();
+
+ DAGSummaryData(TezDAGID dagId) {
+ this.dagId = dagId;
+ }
+
+ void handleSummaryEvent(SummaryEventProto proto) throws IOException {
+ HistoryEventType eventType =
+ HistoryEventType.values()[proto.getEventType()];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[RECOVERY SUMMARY]"
+ + " dagId=" + proto.getDagId()
+ + ", timestamp=" + proto.getTimestamp()
+ + ", event=" + eventType);
+ }
+ switch (eventType) {
+ case DAG_SUBMITTED:
+ completed = false;
+ break;
+ case DAG_FINISHED:
+ completed = true;
+ dagCommitCompleted = true;
+ DAGFinishedEvent dagFinishedEvent = new DAGFinishedEvent();
+ dagFinishedEvent.fromSummaryProtoStream(proto);
+ dagState = dagFinishedEvent.getState();
+ break;
+ case DAG_COMMIT_STARTED:
+ dagCommitCompleted = false;
+ break;
+ case VERTEX_COMMIT_STARTED:
+ VertexCommitStartedEvent vertexCommitStartedEvent =
+ new VertexCommitStartedEvent();
+ vertexCommitStartedEvent.fromSummaryProtoStream(proto);
+ vertexCommitStatus.put(
+ vertexCommitStartedEvent.getVertexID(), false);
+ break;
+ case VERTEX_FINISHED:
+ VertexFinishedEvent vertexFinishedEvent =
+ new VertexFinishedEvent();
+ vertexFinishedEvent.fromSummaryProtoStream(proto);
+ if (vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) {
+ vertexCommitStatus.put(
+ vertexFinishedEvent.getVertexID(), true);
+ bufferedSummaryEvents.add(vertexFinishedEvent);
+ }
+ break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+ new VertexGroupCommitStartedEvent();
+ vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
+ bufferedSummaryEvents.add(vertexGroupCommitStartedEvent);
+ vertexGroupCommitStatus.put(
+ vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+ new VertexGroupCommitFinishedEvent();
+ vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
+ bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent);
+ vertexGroupCommitStatus.put(
+ vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+ break;
+ }
+ }
- public DAG parseRecoveryData() throws IOException {
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("dagId=").append(dagId);
+ sb.append(", dagCompleted=").append(completed);
+ if (!vertexCommitStatus.isEmpty()) {
+ sb.append(", vertexCommitStatuses=[");
+ for (Entry<TezVertexID, Boolean> entry : vertexCommitStatus.entrySet()) {
+ sb.append("{ vertexId=").append(entry.getKey())
+ .append(", committed=").append(entry.getValue()).append("}, ");
+ }
+ sb.append("]");
+ }
+ if (!vertexGroupCommitStatus.isEmpty()) {
+ sb.append(", vertexGroupCommitStatuses=[");
+ for (Entry<String, Boolean> entry : vertexGroupCommitStatus.entrySet()) {
+ sb.append("{ vertexGroup=").append(entry.getKey())
+ .append(", committed=").append(entry.getValue()).append("}, ");
+ }
+ sb.append("]");
+ }
+ return sb.toString();
+ }
+ }
+
+ private String isDAGRecoverable(DAGSummaryData data) {
+ if (!data.dagCommitCompleted) {
+ return "DAG Commit was in progress, not recoverable"
+ + ", dagId=" + data.dagId;
+ }
+ if (!data.vertexCommitStatus.isEmpty()) {
+ for (Entry<TezVertexID, Boolean> entry : data.vertexCommitStatus.entrySet()) {
+ if (!(entry.getValue().booleanValue())) {
+ return "Vertex Commit was in progress, not recoverable"
+ + ", dagId=" + data.dagId
+ + ", vertexId=" + entry.getKey();
+ }
+ }
+ }
+ if (!data.vertexGroupCommitStatus.isEmpty()) {
+ for (Entry<String, Boolean> entry : data.vertexGroupCommitStatus.entrySet()) {
+ if (!(entry.getValue().booleanValue())) {
+ return "Vertex Group Commit was in progress, not recoverable"
+ + ", dagId=" + data.dagId
+ + ", vertexGroup=" + entry.getKey();
+ }
+ }
+ }
+ return null;
+ }
+
+ public RecoveredDAGData parseRecoveryData() throws IOException {
Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
+ " for recovering data from previous attempt");
@@ -330,8 +483,6 @@ public class RecoveryParser {
FSDataOutputStream newSummaryStream =
getSummaryOutputStream(newSummaryPath);
- Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
-
FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
LOG.info("Parsing summary file"
+ ", path=" + summaryPath.toString()
@@ -339,6 +490,8 @@ public class RecoveryParser {
+ ", lastModTime=" + summaryFileStatus.getModificationTime());
int dagCounter = 0;
+ Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
+ new HashMap<TezDAGID, DAGSummaryData>();
while (summaryStream.available() > 0) {
RecoveryProtos.SummaryEventProto proto =
RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
@@ -354,11 +507,10 @@ public class RecoveryParser {
if (dagCounter < dagId.getId()) {
dagCounter = dagId.getId();
}
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
- seenDAGs.put(dagId, false);
- } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
- seenDAGs.put(dagId, true);
+ if (!dagSummaryDataMap.containsKey(dagId)) {
+ dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
}
+ dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
proto.writeDelimitedTo(newSummaryStream);
}
newSummaryStream.hsync();
@@ -367,12 +519,35 @@ public class RecoveryParser {
// Set counter for next set of DAGs
dagAppMaster.setDAGCounter(dagCounter);
- TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+ DAGSummaryData lastInProgressDAGData =
+ getLastCompletedOrInProgressDAG(dagSummaryDataMap);
+ if (lastInProgressDAGData == null) {
+ LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
+ return null;
+ }
+ TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
if (lastInProgressDAG == null) {
- LOG.info("Nothing to recover as no uncompleted DAGs found");
+ LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
return null;
}
+ LOG.info("Checking if DAG is in recoverable state"
+ + ", dagId=" + lastInProgressDAGData.dagId);
+
+ final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
+ if (lastInProgressDAGData.completed) {
+ recoveredDAGData.isCompleted = true;
+ recoveredDAGData.dagState = lastInProgressDAGData.dagState;
+ }
+
+ String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
+ if (nonRecoverableReason != null) {
+ LOG.warn("Found last inProgress DAG but not recoverable: "
+ + lastInProgressDAGData);
+ recoveredDAGData.nonRecoverable = true;
+ recoveredDAGData.reason = nonRecoverableReason;
+ }
+
LOG.info("Trying to recover dag from recovery file"
+ ", dagId=" + lastInProgressDAG.toString()
+ ", dataDir=" + previousAttemptRecoveryDataDir
@@ -387,14 +562,13 @@ public class RecoveryParser {
+ ", dagId=" + lastInProgressDAG);
}
- DAGImpl recoveredDAG = null;
-
LOG.info("Copying DAG data into Current Attempt directory"
+ ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
lastInProgressDAG));
FSDataOutputStream newDAGRecoveryStream =
getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
+ boolean skipAllOtherEvents = false;
while (dagRecoveryStream.available() > 0) {
HistoryEvent event;
try {
@@ -403,8 +577,9 @@ public class RecoveryParser {
LOG.warn("Corrupt data found when trying to read next event", ioe);
break;
}
- if (event == null) {
+ if (event == null || skipAllOtherEvents) {
// reached end of data
+ event = null;
break;
}
HistoryEventType eventType = event.getEventType();
@@ -414,9 +589,13 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+ recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
lastInProgressDAG);
- dagAppMaster.setCurrentDAG(recoveredDAG);
+ recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
+ dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
+ if (recoveredDAGData.nonRecoverable) {
+ skipAllOtherEvents = true;
+ }
break;
}
case DAG_INITIALIZED:
@@ -424,8 +603,8 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_STARTED:
@@ -433,8 +612,8 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_COMMIT_STARTED:
@@ -442,8 +621,26 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case VERTEX_GROUP_COMMIT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_FINISHED:
@@ -452,13 +649,16 @@ public class RecoveryParser {
+ ", eventType=" + eventType
+ ", event=" + event.toString());
// If this is seen, nothing to recover
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
- return recoveredDAG;
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+ recoveredDAGData.isCompleted = true;
+ recoveredDAGData.dagState =
+ ((DAGFinishedEvent) event).getState();
+ skipAllOtherEvents = true;
}
case CONTAINER_LAUNCHED:
{
- // Nothing to do?
+ // Nothing to do for now
break;
}
case VERTEX_INITIALIZED:
@@ -466,9 +666,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -477,9 +677,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexStartedEvent vEvent = (VertexStartedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -488,9 +688,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -499,9 +699,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -510,9 +710,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -521,9 +721,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskStartedEvent tEvent = (TaskStartedEvent) event;
- Task task = recoveredDAG.getVertex(
+ Task task = recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
task.restoreFromEvent(tEvent);
break;
@@ -533,9 +733,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
- Task task = recoveredDAG.getVertex(
+ Task task = recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
task.restoreFromEvent(tEvent);
break;
@@ -545,10 +745,10 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
Task task =
- recoveredDAG.getVertex(
+ recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskAttemptID().getTaskID().getVertexID())
.getTask(tEvent.getTaskAttemptID().getTaskID());
task.restoreFromEvent(tEvent);
@@ -559,10 +759,10 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
Task task =
- recoveredDAG.getVertex(
+ recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskAttemptID().getTaskID().getVertexID())
.getTask(tEvent.getTaskAttemptID().getTaskID());
task.restoreFromEvent(tEvent);
@@ -573,10 +773,10 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexDataMovementEventsGeneratedEvent vEvent =
(VertexDataMovementEventsGeneratedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -596,6 +796,42 @@ public class RecoveryParser {
newDAGRecoveryStream.hsync();
newDAGRecoveryStream.close();
+ if (!recoveredDAGData.isCompleted
+ && !recoveredDAGData.nonRecoverable) {
+ if (lastInProgressDAGData.bufferedSummaryEvents != null
+ && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
+ for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
+ assert recoveredDAGData.recoveredDAG != null;
+ switch (bufferedEvent.getEventType()) {
+ case VERTEX_GROUP_COMMIT_STARTED:
+ recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+ break;
+ case VERTEX_FINISHED:
+ VertexFinishedEvent vertexFinishedEvent =
+ (VertexFinishedEvent) bufferedEvent;
+ Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
+ vertexFinishedEvent.getVertexID());
+ if (vertex == null) {
+ recoveredDAGData.nonRecoverable = true;
+ recoveredDAGData.reason = "All state could not be recovered"
+ + ", vertex completed but events not flushed"
+ + ", vertexId=" + vertexFinishedEvent.getVertexID();
+ } else {
+ vertex.restoreFromEvent(vertexFinishedEvent);
+ }
+ break;
+ default:
+ throw new RuntimeException("Invalid data found in buffered summary events"
+ + ", unknown event type "
+ + bufferedEvent.getEventType());
+ }
+ }
+ }
+ }
+
Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
dataRecoveredFileFlag);
LOG.info("Finished copying data from previous attempt into current attempt"
@@ -607,7 +843,7 @@ public class RecoveryParser {
flagFile.hsync();
flagFile.close();
- return recoveredDAG;
+ return recoveredDAGData;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
new file mode 100644
index 0000000..e64ad13
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventRecoverEvent extends DAGEvent {
+
+ private final DAGState desiredState;
+
+ public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState) {
+ super(dagId, DAGEventType.DAG_RECOVER);
+ this.desiredState = desiredState;
+ }
+
+ public DAGState getDesiredState() {
+ return desiredState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 432c189..8fc278f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -54,11 +55,11 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -73,32 +74,35 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -357,6 +361,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
private DAGState recoveredState = DAGState.NEW;
private boolean recoveryCommitInProgress = false;
+ Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
static class VertexGroupInfo {
String groupName;
@@ -497,18 +502,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
recoveredState = DAGState.RUNNING;
return recoveredState;
case DAG_COMMIT_STARTED:
- if (recoveredState != DAGState.RUNNING) {
- throw new RuntimeException("Commit Started Event seen but"
- + " recovered state is not RUNNING"
- + ", recoveredState=" + recoveredState);
- }
recoveryCommitInProgress = true;
return recoveredState;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+ (VertexGroupCommitStartedEvent) historyEvent;
+ recoveredGroupCommits.put(
+ vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+ return recoveredState;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+ (VertexGroupCommitFinishedEvent) historyEvent;
+ recoveredGroupCommits.put(
+ vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+ return recoveredState;
case DAG_FINISHED:
- if (!recoveryStartEventSeen) {
- throw new RuntimeException("Finished Event seen but"
- + " no Start Event was encountered earlier");
- }
recoveryCommitInProgress = false;
DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
this.finishTime = finishedEvent.getFinishTime();
@@ -722,7 +730,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
// commit all shared outputs
appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
- new DAGCommitStartedEvent(getID())));
+ new DAGCommitStartedEvent(getID(), clock.getTime())));
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
if (failedWhileCommitting) {
break;
@@ -1266,6 +1274,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@Override
public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+ if (dagEvent instanceof DAGEventRecoverEvent) {
+ // DAG completed or final end state known
+ DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
+ dag.recoveredState = recoverEvent.getDesiredState();
+ }
+
switch (dag.recoveredState) {
case NEW:
// send DAG an Init and start events
@@ -1292,8 +1306,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
case RUNNING:
// if commit is in progress, DAG should fail as commits are not
// recoverable
- if (dag.recoveryCommitInProgress) {
- // Fail the DAG as we have not seen a completion
+ boolean groupCommitInProgress = false;
+ if (!dag.recoveredGroupCommits.isEmpty()) {
+ for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
+ if (!entry.getValue().booleanValue()) {
+ LOG.info("Found a pending Vertex Group commit"
+ + ", vertexGroup=" + entry.getKey());
+ }
+ groupCommitInProgress = true;
+ }
+ }
+
+ if (groupCommitInProgress || dag.recoveryCommitInProgress) {
+ // Fail the DAG as we have not seen a commit completion
dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
dag.setFinishTime();
// Recover all other data for all vertices
@@ -1314,6 +1339,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGState.FAILED));
return DAGState.FAILED;
}
+
for (Vertex v : dag.vertices.values()) {
if (v.getInputVerticesCount() == 0) {
if (LOG.isDebugEnabled()) {
@@ -1596,10 +1622,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
for (VertexGroupInfo groupInfo : commitList) {
+ if (recoveredGroupCommits.containsKey(groupInfo.groupName)) {
+ LOG.info("VertexGroup was already committed as per recovery"
+ + " data, groupName=" + groupInfo.groupName);
+ continue;
+ }
groupInfo.committed = true;
Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
- new DAGCommitStartedEvent(dagId)));
+ new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
+ clock.getTime())));
for (String outputName : groupInfo.outputs) {
OutputCommitter committer = v.getOutputCommitters().get(outputName);
LOG.info("Committing output: " + outputName);
@@ -1612,6 +1644,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (failedCommit) {
break;
}
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+ new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
+ clock.getTime())));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3ca9e11..7b3b6b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -94,6 +94,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -106,7 +107,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
@@ -131,9 +131,9 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -146,7 +146,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
-
/** Implementation of Vertex interface. Maintains the state machines of Vertex.
* The read and write calls use ReadWriteLock for concurrency.
*/
@@ -529,6 +528,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private String logIdentifier;
private boolean recoveryCommitInProgress = false;
+ private boolean summaryCompleteSeen = false;
+ private boolean hasCommitter = false;
+ private boolean vertexCompleteSeen = false;
private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
// Recovery related flags
@@ -905,20 +907,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return recoveredState;
case VERTEX_COMMIT_STARTED:
- if (recoveredState != VertexState.RUNNING) {
- throw new RuntimeException("Commit Started Event seen but"
- + " recovered state is not RUNNING"
- + ", recoveredState=" + recoveredState);
- }
recoveryCommitInProgress = true;
+ hasCommitter = true;
return recoveredState;
case VERTEX_FINISHED:
- if (!recoveryStartEventSeen) {
- throw new RuntimeException("Finished Event seen but"
- + " no Started Event was encountered earlier");
+ VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+ if (finishedEvent.isFromSummary()) {
+ summaryCompleteSeen = true;
+ } else {
+ vertexCompleteSeen = true;
}
recoveryCommitInProgress = false;
- VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
recoveredState = finishedEvent.getState();
diagnostics.add(finishedEvent.getDiagnostics());
finishTime = finishedEvent.getFinishTime();
@@ -1280,16 +1279,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
LOG.info("Vertex succeeded: " + vertex.logIdentifier);
try {
- if (vertex.outputCommitters != null) {
- vertex.appContext.getHistoryHandler().handle(
- new DAGHistoryEvent(vertex.getDAGId(),
- new VertexCommitStartedEvent(vertex.vertexId)));
- }
if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
// commit only once. Dont commit shared outputs
LOG.info("Invoking committer commit for vertex, vertexId="
+ vertex.logIdentifier);
- if (vertex.outputCommitters != null) {
+ if (vertex.outputCommitters != null
+ && !vertex.outputCommitters.isEmpty()) {
+ boolean firstCommit = true;
for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
final OutputCommitter committer = entry.getValue();
final String outputName = entry.getKey();
@@ -1297,6 +1293,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// dont commit shared committers. Will be committed by the DAG
continue;
}
+ if (firstCommit) {
+ // Log commit start event on first actual commit
+ vertex.appContext.getHistoryHandler().handle(
+ new DAGHistoryEvent(vertex.getDAGId(),
+ new VertexCommitStartedEvent(vertex.vertexId,
+ vertex.clock.getTime())));
+ } else {
+ firstCommit = false;
+ }
vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
@@ -1806,31 +1811,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case SUCCEEDED:
case FAILED:
case KILLED:
- vertex.tasksNotYetScheduled = false;
- // recover tasks
- if (vertex.tasks != null) {
- TaskState taskState = TaskState.KILLED;
- switch (vertex.recoveredState) {
- case SUCCEEDED:
- taskState = TaskState.SUCCEEDED;
- break;
- case KILLED:
- taskState = TaskState.KILLED;
- break;
- case FAILED:
- taskState = TaskState.FAILED;
- break;
- }
- for (Task task : vertex.tasks.values()) {
- vertex.eventHandler.handle(
- new TaskEventRecoverTask(task.getTaskId(),
- taskState));
- }
- vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
- endState = VertexState.RUNNING;
+ if (vertex.recoveredState == VertexState.SUCCEEDED
+ && vertex.hasCommitter
+ && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
+ LOG.warn("Cannot recover vertex as all recovery events not"
+ + " found, vertex=" + vertex.logIdentifier
+ + ", hasCommitters=" + vertex.hasCommitter
+ + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen
+ + ", finalCompletionSeen=" + vertex.vertexCompleteSeen);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.COMMIT_FAILURE);
+ endState = VertexState.FAILED;
} else {
- endState = vertex.recoveredState;
- vertex.finished(endState);
+ vertex.tasksNotYetScheduled = false;
+ // recover tasks
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (vertex.recoveredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = vertex.recoveredState;
+ vertex.finished(endState);
+ }
}
break;
default:
@@ -2318,12 +2336,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
" asked to split by: " + originalSplitSource +
" but was already split by:" + vertex.originalOneToOneSplitSource);
}
- Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
- " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
- " in state " + vertex.getState() +
- " . Split in vertex " + originalSplitSource +
- " sent by vertex " + splitEvent.getSenderVertex() +
- " numTasks " + splitEvent.getNumTasks());
+ Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
+ " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+ " in state " + vertex.getState() +
+ " . Split in vertex " + originalSplitSource +
+ " sent by vertex " + splitEvent.getSenderVertex() +
+ " numTasks " + splitEvent.getNumTasks());
LOG.info("Splitting vertex " + vertex.getVertexId() +
" because of split in vertex " + originalSplitSource +
" sent by vertex " + splitEvent.getSenderVertex() +
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 78d1208..3f756c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -18,16 +18,16 @@
package org.apache.tez.dag.history;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
public interface HistoryEvent {
- HistoryEventType getEventType();
+ public HistoryEventType getEventType();
public JSONObject convertToATSJSON() throws JSONException;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 7b2087a..219bfe3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -36,5 +36,7 @@ public enum HistoryEventType {
CONTAINER_LAUNCHED,
VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
DAG_COMMIT_STARTED,
- VERTEX_COMMIT_STARTED
+ VERTEX_COMMIT_STARTED,
+ VERTEX_GROUP_COMMIT_STARTED,
+ VERTEX_GROUP_COMMIT_FINISHED
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
index 587ee3e..eaae813 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
@@ -21,8 +21,19 @@ package org.apache.tez.dag.history;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+
public interface SummaryEvent {
public void toSummaryProtoStream(OutputStream outputStream) throws IOException;
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException;
+
+ /**
+ * Whether to write this event immediately to the DAG recovery file
+ * Summary events are always written immediately to summary file.
+ * @return
+ */
+ public boolean writeToRecoveryImmediately();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 6d5a769..627751a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -18,26 +18,31 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.utils.ProtoUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class DAGCommitStartedEvent implements HistoryEvent {
+public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
+ private long commitStartTime;
public DAGCommitStartedEvent() {
}
- public DAGCommitStartedEvent(TezDAGID dagID) {
+ public DAGCommitStartedEvent(TezDAGID dagID, long commitStartTime) {
this.dagID = dagID;
+ this.commitStartTime = commitStartTime;
}
@Override
@@ -91,4 +96,21 @@ public class DAGCommitStartedEvent implements HistoryEvent {
return dagID;
}
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ ProtoUtils.toSummaryEventProto(dagID, commitStartTime,
+ getEventType()).writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.commitStartTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 38e0702..14381b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -18,6 +18,10 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.DAGState;
@@ -28,15 +32,15 @@ import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGFinishedProto;
-import org.apache.tez.dag.utils.ProtoUtils;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
@@ -166,8 +170,25 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
@Override
public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
- ProtoUtils.toSummaryEventProto(dagID, finishTime,
- HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setTimestamp(finishTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(ByteString.copyFrom(Ints.toByteArray(state.ordinal())));
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.finishTime = proto.getTimestamp();
+ this.state = DAGState.values()[
+ Ints.fromByteArray(proto.getEventPayload().toByteArray())];
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return true;
}
public long getFinishTime() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 853bea7..af83dc8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -18,6 +18,10 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.records.DAGProtos;
@@ -29,16 +33,13 @@ import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.utils.ProtoUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
- public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
+public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
private long submitTime;
@@ -168,6 +169,17 @@ import java.io.OutputStream;
HistoryEventType.DAG_SUBMITTED).writeDelimitedTo(outputStream);
}
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ throw new UnsupportedOperationException("Cannot re-initialize event from"
+ + " summary stream");
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return true;
+ }
+
public String getDAGName() {
if (dagPlan != null && dagPlan.hasName()) {
return dagPlan.getName();
@@ -190,4 +202,5 @@ import java.io.OutputStream;
public long getSubmitTime() {
return submitTime;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 564f9ed..387bff1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -18,26 +18,33 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.protobuf.ByteString;
-public class VertexCommitStartedEvent implements HistoryEvent {
+public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
private TezVertexID vertexID;
+ private long commitStartTime;
public VertexCommitStartedEvent() {
}
- public VertexCommitStartedEvent(TezVertexID vertexId) {
+ public VertexCommitStartedEvent(TezVertexID vertexId, long commitStartTime) {
this.vertexID = vertexId;
+ this.commitStartTime = commitStartTime;
}
@Override
@@ -91,4 +98,28 @@ public class VertexCommitStartedEvent implements HistoryEvent {
return this.vertexID;
}
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(vertexID.getDAGId().toString())
+ .setTimestamp(commitStartTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(
+ ByteString.copyFrom(vertexID.toString().getBytes()));
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ this.vertexID = TezVertexID.fromString(
+ new String(proto.getEventPayload().toByteArray()));
+ this.commitStartTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 2366eb1..6f07c91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -18,25 +18,29 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishedProto;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class VertexFinishedEvent implements HistoryEvent {
+public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
private TezVertexID vertexID;
private String vertexName;
@@ -48,6 +52,7 @@ public class VertexFinishedEvent implements HistoryEvent {
private VertexState state;
private String diagnostics;
private TezCounters tezCounters;
+ private boolean fromSummary = false;
public VertexFinishedEvent(TezVertexID vertexId,
String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
@@ -186,4 +191,40 @@ public class VertexFinishedEvent implements HistoryEvent {
public TezCounters getTezCounters() {
return tezCounters;
}
+
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ VertexFinishStateProto finishStateProto =
+ VertexFinishStateProto.newBuilder()
+ .setState(state.ordinal())
+ .setVertexId(vertexID.toString())
+ .build();
+
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(vertexID.getDAGId().toString())
+ .setTimestamp(finishTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(finishStateProto.toByteString());
+ builder.build().writeDelimitedTo(outputStream);
+
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ VertexFinishStateProto finishStateProto =
+ VertexFinishStateProto.parseFrom(proto.getEventPayload());
+ this.vertexID = TezVertexID.fromString(finishStateProto.getVertexId());
+ this.state = VertexState.values()[finishStateProto.getState()];
+ this.finishTime = proto.getTimestamp();
+ this.fromSummary = true;
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+ public boolean isFromSummary() {
+ return fromSummary;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
new file mode 100644
index 0000000..99a5288
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
+
+ private TezDAGID dagID;
+ private String vertexGroupName;
+ private long commitFinishTime;
+
+ public VertexGroupCommitFinishedEvent() {
+ }
+
+ public VertexGroupCommitFinishedEvent(TezDAGID dagID,
+ String vertexGroupName, long commitFinishTime) {
+ this.dagID = dagID;
+ this.vertexGroupName = vertexGroupName;
+ this.commitFinishTime = commitFinishTime;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexGroupCommitFinishedProto toProto() {
+ return VertexGroupCommitFinishedProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setVertexGroupName(vertexGroupName)
+ .build();
+ }
+
+ public void fromProto(VertexGroupCommitFinishedProto proto) {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.vertexGroupName = proto.getVertexGroupName();
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "dagId=" + dagID
+ + ", vertexGroup=" + vertexGroupName;
+ }
+
+ public String getVertexGroupName() {
+ return this.vertexGroupName;
+ }
+
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setTimestamp(commitFinishTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(toProto().toByteString());
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ VertexGroupCommitFinishedProto vertexGroupCommitFinishedProto =
+ VertexGroupCommitFinishedProto.parseFrom(proto.getEventPayload());
+ fromProto(vertexGroupCommitFinishedProto);
+ this.commitFinishTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
new file mode 100644
index 0000000..04d6276
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
+
+ private TezDAGID dagID;
+ private String vertexGroupName;
+ private long commitStartTime;
+
+ public VertexGroupCommitStartedEvent() {
+ }
+
+ public VertexGroupCommitStartedEvent(TezDAGID dagID,
+ String vertexGroupName, long commitStartTime) {
+ this.dagID = dagID;
+ this.vertexGroupName = vertexGroupName;
+ this.commitStartTime = commitStartTime;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_GROUP_COMMIT_STARTED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexGroupCommitStartedProto toProto() {
+ return VertexGroupCommitStartedProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setVertexGroupName(vertexGroupName)
+ .build();
+ }
+
+ public void fromProto(VertexGroupCommitStartedProto proto) {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.vertexGroupName = proto.getVertexGroupName();
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "dagId=" + dagID
+ + ", vertexGroup=" + vertexGroupName;
+ }
+
+ public String getVertexGroupName() {
+ return this.vertexGroupName;
+ }
+
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setTimestamp(commitStartTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(toProto().toByteString());
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ VertexGroupCommitStartedProto vertexGroupCommitStartedProto =
+ VertexGroupCommitStartedProto.parseFrom(proto.getEventPayload());
+ fromProto(vertexGroupCommitStartedProto);
+ this.commitStartTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+}