You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/26 19:43:49 UTC
git commit: TEZ-973. Abort additional attempts if recovery fails.
(hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master f31aba7bc -> 5a6f42a81
TEZ-973. Abort additional attempts if recovery fails. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5a6f42a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5a6f42a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5a6f42a8
Branch: refs/heads/master
Commit: 5a6f42a8118ef47475b4522ae984810616914461
Parents: f31aba7
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Mar 26 11:43:17 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Mar 26 11:43:17 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/app/DAGAppMaster.java | 29 +++++-
.../org/apache/tez/dag/app/RecoveryParser.java | 57 +++++++++--
.../tez/dag/app/dag/DAGTerminationCause.java | 5 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 101 +++++++++++++------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 42 +++++---
.../tez/dag/history/HistoryEventHandler.java | 14 ++-
.../dag/history/recovery/RecoveryService.java | 44 +++++++-
.../mapreduce/examples/OrderedWordCount.java | 4 +
8 files changed, 237 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 45a4f18..526bea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -444,6 +444,11 @@ public class DAGAppMaster extends AbstractService {
+ ", dagState=" + finishEvt.getDAGState());
lastDAGCompletionTime = clock.getTime();
_updateLoggers(currentDAG, "_post");
+ if (this.historyEventHandler.hasRecoveryFailed()) {
+ LOG.warn("Recovery had a fatal error, shutting down session after" +
+ " DAG completion");
+ sessionStopped.set(true);
+ }
switch(finishEvt.getDAGState()) {
case SUCCEEDED:
if (!currentDAG.getName().startsWith(
@@ -451,7 +456,6 @@ public class DAGAppMaster extends AbstractService {
successfulDAGs.incrementAndGet();
}
break;
- case ERROR:
case FAILED:
if (!currentDAG.getName().startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
@@ -464,6 +468,11 @@ public class DAGAppMaster extends AbstractService {
killedDAGs.incrementAndGet();
}
break;
+ case ERROR:
+ if (!currentDAG.getName().startsWith(
+ TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+ failedDAGs.incrementAndGet();
+ }
default:
LOG.fatal("Received a DAG Finished Event with state="
+ finishEvt.getDAGState()
@@ -481,7 +490,11 @@ public class DAGAppMaster extends AbstractService {
} else {
LOG.info("Session shutting down now.");
this.taskSchedulerEventHandler.setShouldUnregisterFlag();
- state = DAGAppMasterState.SUCCEEDED;
+ if (this.historyEventHandler.hasRecoveryFailed()) {
+ state = DAGAppMasterState.FAILED;
+ } else {
+ state = DAGAppMasterState.SUCCEEDED;
+ }
shutdownHandler.shutdown();
}
}
@@ -1418,7 +1431,17 @@ public class DAGAppMaster extends AbstractService {
this.lastDAGCompletionTime = clock.getTime();
- RecoveredDAGData recoveredDAGData = recoverDAG();
+ RecoveredDAGData recoveredDAGData;
+ try {
+ recoveredDAGData = recoverDAG();
+ } catch (IOException e) {
+ LOG.error("Error occurred when trying to recover data from previous attempt."
+ + " Shutting down AM", e);
+ this.state = DAGAppMasterState.ERROR;
+ this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ shutdownHandler.shutdown();
+ return;
+ }
if (!isSession) {
LOG.info("In Non-Session mode.");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 7e1feca..45c98e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -60,6 +60,7 @@ import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
@@ -81,7 +82,7 @@ public class RecoveryParser {
public RecoveryParser(DAGAppMaster dagAppMaster,
FileSystem recoveryFS,
Path recoveryDataDir,
- int currentAttemptId) {
+ int currentAttemptId) throws IOException {
this.dagAppMaster = dagAppMaster;
this.recoveryFS = recoveryFS;
this.recoveryDataDir = recoveryDataDir;
@@ -91,6 +92,8 @@ public class RecoveryParser {
recoveryBufferSize = dagAppMaster.getConfig().getInt(
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+
+ this.recoveryFS.mkdirs(currentAttemptRecoveryDataDir);
}
public static class RecoveredDAGData {
@@ -308,22 +311,57 @@ public class RecoveryParser {
return inProgressDAG;
}
- private Path getPreviousAttemptRecoveryDataDir() {
+ private Path getPreviousAttemptRecoveryDataDir() throws IOException {
+ LOG.info("Looking for the correct attempt directory to recover from");
int foundPreviousAttempt = -1;
for (int i = currentAttemptId - 1; i > 0; --i) {
Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+ LOG.info("Looking at attempt directory, path=" + attemptPath);
+ Path fatalErrorOccurred = new Path(attemptPath,
+ RecoveryService.RECOVERY_FATAL_OCCURRED_DIR);
+ if (recoveryFS.exists(fatalErrorOccurred)) {
+ throw new IOException("Found that a fatal error occurred in"
+ + " recovery during previous attempt, foundFile="
+ + fatalErrorOccurred.toString());
+ }
+
Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag);
try {
if (recoveryFS.exists(dataRecoveredFile)) {
+ LOG.info("Found data recovered file in attempt directory"
+ + ", dataRecoveredFile=" + dataRecoveredFile
+ + ", path=" + attemptPath);
foundPreviousAttempt = i;
break;
}
+ LOG.info("Skipping attempt directory as data recovered file does not exist"
+ + ", dataRecoveredFile=" + dataRecoveredFile
+ + ", path=" + attemptPath);
} catch (IOException e) {
LOG.warn("Exception when checking previous attempt dir for "
+ dataRecoveredFile.toString(), e);
}
}
if (foundPreviousAttempt == -1) {
+ // Look for oldest summary file and use that
+ LOG.info("Did not find any attempt dir that had data recovered file."
+ + " Looking for oldest summary file");
+ for (int i = 1; i < currentAttemptId; ++i) {
+ Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+ Path summaryPath = getSummaryPath(attemptPath);
+ if (recoveryFS.exists(summaryPath)) {
+ LOG.info("Found summary file in attempt directory"
+ + ", summaryFile=" + summaryPath
+ + ", path=" + attemptPath);
+ foundPreviousAttempt = i;
+ break;
+ }
+ LOG.info("Skipping attempt directory as no summary file found"
+ + ", summaryFile=" + summaryPath
+ + ", path=" + attemptPath);
+ }
+ }
+ if (foundPreviousAttempt == -1) {
LOG.info("Falling back to first attempt as no other recovered attempts"
+ " found");
foundPreviousAttempt = 1;
@@ -466,6 +504,7 @@ public class RecoveryParser {
if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
LOG.info("Nothing to recover as previous attempt data does not exist"
+ ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
+ createDataRecoveredFlagFile();
return null;
}
@@ -476,6 +515,7 @@ public class RecoveryParser {
LOG.info("Nothing to recover as summary file does not exist"
+ ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
+ ", summaryPath=" + summaryPath.toString());
+ createDataRecoveredFlagFile();
return null;
}
@@ -832,18 +872,23 @@ public class RecoveryParser {
}
}
+ LOG.info("Finished copying data from previous attempt into current attempt");
+ createDataRecoveredFlagFile();
+
+ return recoveredDAGData;
+ }
+
+ private void createDataRecoveredFlagFile() throws IOException {
Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
dataRecoveredFileFlag);
- LOG.info("Finished copying data from previous attempt into current attempt"
- + " - setting flag by creating file"
- + ", path=" + dataCopiedFlagPath.toString());
+ LOG.info("Trying to create data recovered flag file"
+ + ", filePath=" + dataCopiedFlagPath.toString());
FSDataOutputStream flagFile =
recoveryFS.create(dataCopiedFlagPath, true, recoveryBufferSize);
flagFile.writeInt(1);
flagFile.hsync();
flagFile.close();
- return recoveredDAGData;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index f7020da..d01fb2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -38,6 +38,9 @@ public enum DAGTerminationCause {
/** DAG failed during output commit. */
COMMIT_FAILURE,
-
+
+ /** DAG failed while trying to write recovery events */
+ RECOVERY_FAILURE,
+
INTERNAL_ERROR
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index ed73433..0e4c504 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -250,7 +250,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// Transitions from TERMINATING state.
.addTransition
(DAGState.TERMINATING,
- EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED),
+ EnumSet.of(DAGState.TERMINATING, DAGState.KILLED, DAGState.FAILED,
+ DAGState.ERROR),
DAGEventType.DAG_VERTEX_COMPLETED,
new VertexCompletedTransition())
.addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
@@ -734,6 +735,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
new DAGCommitStartedEvent(getID(), clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to send commit event to history/recovery handler", e);
+ trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
return false;
}
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
@@ -887,11 +889,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
finishTime = clock.getTime();
}
- void logJobHistoryFinishedEvent() {
+ void logJobHistoryFinishedEvent() throws IOException {
this.setFinishTime();
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
finishTime, DAGState.SUCCEEDED, "", getAllCounters());
- this.appContext.getHistoryHandler().handle(
+ this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
@@ -909,12 +911,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
new DAGHistoryEvent(dagId, startEvt));
}
- void logJobHistoryUnsuccesfulEvent(DAGState state) {
+ void logJobHistoryUnsuccesfulEvent(DAGState state) throws IOException {
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
clock.getTime(), state,
StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
getAllCounters());
- this.appContext.getHistoryHandler().handle(
+ this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
@@ -967,7 +969,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.addDiagnostic(diagnosticMsg);
return dag.finished(DAGState.FAILED);
}
-
+ if(dag.terminationCause == DAGTerminationCause.RECOVERY_FAILURE ){
+ String diagnosticMsg = "DAG failed due to failure in recovery handling." +
+ " failedVertices:" + dag.numFailedVertices +
+ " killedVertices:" + dag.numKilledVertices;
+ LOG.info(diagnosticMsg);
+ dag.addDiagnostic(diagnosticMsg);
+ return dag.finished(DAGState.FAILED);
+ }
+
// catch all
String diagnosticMsg = "All vertices complete, but cannot determine final state of DAG"
+ ", numCompletedVertices=" + dag.numCompletedVertices
@@ -1016,14 +1026,26 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
finalState = DAGState.FAILED;
trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
}
-
- if (finalState == DAGState.SUCCEEDED) {
- logJobHistoryFinishedEvent();
+
+ boolean recoveryError = false;
+ try {
+ if (finalState == DAGState.SUCCEEDED) {
+ logJobHistoryFinishedEvent();
+ } else {
+ logJobHistoryUnsuccesfulEvent(finalState);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to persist recovery event for DAG completion"
+ + ", dagId=" + dagId
+ + ", finalState=" + finalState);
+ recoveryError = true;
+ }
+
+ if (recoveryError) {
+ eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), DAGState.ERROR));
} else {
- logJobHistoryUnsuccesfulEvent(finalState);
+ eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
}
-
- eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
LOG.info("DAG: " + getID() + " finished with state: " + finalState);
return finalState;
@@ -1339,10 +1361,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
desiredState));
}
- dag.logJobHistoryUnsuccesfulEvent(DAGState.FAILED);
+ DAGState endState = DAGState.FAILED;
+ try {
+ dag.logJobHistoryUnsuccesfulEvent(endState);
+ } catch (IOException e) {
+ LOG.warn("Failed to persist recovery event for DAG completion"
+ + ", dagId=" + dag.dagId
+ + ", finalState=" + endState);
+ }
dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
- DAGState.FAILED));
- return DAGState.FAILED;
+ endState));
+ return endState;
}
for (Vertex v : dag.vertices.values()) {
@@ -1387,9 +1416,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// Error state
LOG.warn("Trying to recover DAG, failed to recover"
+ " from non-handled state" + dag.recoveredState);
+ // Tell AM ERROR so that it can shutdown
dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
DAGState.ERROR));
- return DAGState.ERROR;
+ return DAGState.FAILED;
}
}
@@ -1494,23 +1524,27 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
private static class KillNewJobTransition
- implements SingleArcTransition<DAGImpl, DAGEvent> {
+ implements SingleArcTransition<DAGImpl, DAGEvent> {
+
@Override
- public void transition(DAGImpl job, DAGEvent event) {
- job.setFinishTime();
- job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
- job.finished(DAGState.KILLED);
+ public void transition(DAGImpl dag, DAGEvent dagEvent) {
+ dag.setFinishTime();
+ dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+ dag.finished(DAGState.KILLED);
}
+
}
private static class KillInitedJobTransition
- implements SingleArcTransition<DAGImpl, DAGEvent> {
+ implements SingleArcTransition<DAGImpl, DAGEvent> {
+
@Override
- public void transition(DAGImpl job, DAGEvent event) {
- job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
- job.addDiagnostic("Job received Kill in INITED state.");
- job.finished(DAGState.KILLED);
+ public void transition(DAGImpl dag, DAGEvent dagEvent) {
+ dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+ dag.addDiagnostic("Job received Kill in INITED state.");
+ dag.finished(DAGState.KILLED);
}
+
}
private static class DAGKilledTransition
@@ -1610,6 +1644,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private boolean vertexSucceeded(Vertex vertex) {
numSuccessfulVertices++;
boolean failedCommit = false;
+ boolean recoveryFailed = false;
if (!commitAllOutputsOnSuccess) {
// committing successful outputs immediately. check for shared outputs
List<VertexGroupInfo> groupsList = vertexGroupInfo.get(vertex.getName());
@@ -1640,6 +1675,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to send commit recovery event to handler", e);
+ recoveryFailed = true;
failedCommit = true;
}
if (!failedCommit) {
@@ -1662,6 +1698,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to send commit recovery event to handler", e);
+ recoveryFailed = true;
failedCommit = true;
}
}
@@ -1670,10 +1707,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (failedCommit) {
LOG.info("Aborting job due to failure in commit.");
- enactKill(DAGTerminationCause.COMMIT_FAILURE,
- VertexTerminationCause.COMMIT_FAILURE);
+ if (!recoveryFailed) {
+ enactKill(DAGTerminationCause.COMMIT_FAILURE,
+ VertexTerminationCause.COMMIT_FAILURE);
+ } else {
+ LOG.info("Recovery failure occurred during commit");
+ enactKill(DAGTerminationCause.RECOVERY_FAILURE,
+ VertexTerminationCause.COMMIT_FAILURE);
+ }
}
-
+
return !failedCommit;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 7f94ed7..6f1398a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1276,12 +1276,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
new DAGHistoryEvent(getDAGId(), finishEvt));
}
- void logJobHistoryVertexFailedEvent(VertexState state) {
+ void logJobHistoryVertexFailedEvent(VertexState state) throws IOException {
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
getDiagnostics()), getAllCounters(), getVertexStats());
- this.appContext.getHistoryHandler().handle(
+ this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
@@ -1331,10 +1331,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
if (firstCommit) {
// Log commit start event on first actual commit
- vertex.appContext.getHistoryHandler().handle(
- new DAGHistoryEvent(vertex.getDAGId(),
- new VertexCommitStartedEvent(vertex.vertexId,
- vertex.clock.getTime())));
+ try {
+ vertex.appContext.getHistoryHandler().handleCriticalEvent(
+ new DAGHistoryEvent(vertex.getDAGId(),
+ new VertexCommitStartedEvent(vertex.vertexId,
+ vertex.clock.getTime())));
+ } catch (IOException e) {
+ LOG.error("Failed to persist commit start event to recovery, vertexId="
+ + vertex.logIdentifier, e);
+ vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
+ return vertex.finished(VertexState.FAILED);
+ }
} else {
firstCommit = false;
}
@@ -1430,24 +1437,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case ERROR:
eventHandler.handle(new DAGEvent(getDAGId(),
DAGEventType.INTERNAL_ERROR));
- logJobHistoryVertexFailedEvent(finalState);
+ try {
+ logJobHistoryVertexFailedEvent(finalState);
+ } catch (IOException e) {
+ LOG.error("Failed to send vertex finished event to recovery", e);
+ }
break;
case KILLED:
case FAILED:
eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
finalState, terminationCause));
- logJobHistoryVertexFailedEvent(finalState);
+ try {
+ logJobHistoryVertexFailedEvent(finalState);
+ } catch (IOException e) {
+ LOG.error("Failed to send vertex finished event to recovery", e);
+ }
break;
case SUCCEEDED:
- eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
- finalState));
try {
logJobHistoryVertexFinishedEvent();
+ eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+ finalState));
} catch (IOException e) {
LOG.error("Failed to send vertex finished event to recovery", e);
- finalState = VertexState.ERROR;
- eventHandler.handle(new DAGEvent(getDAGId(),
- DAGEventType.INTERNAL_ERROR));
+ finalState = VertexState.FAILED;
+ this.terminationCause = VertexTerminationCause.INTERNAL_ERROR;
+ eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
+ finalState));
}
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 413d4ef..4eb094f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -37,7 +37,6 @@ public class HistoryEventHandler extends CompositeService {
private final AppContext context;
private boolean yarnATSEnabled;
- private AtomicBoolean stopped = new AtomicBoolean(false);
private ATSService atsService;
private RecoveryService recoveryService;
private boolean recoveryEnabled;
@@ -114,12 +113,17 @@ public class HistoryEventHandler extends CompositeService {
try {
handleCriticalEvent(event);
} catch (IOException e) {
- throw new RuntimeException(e);
+ LOG.warn("Failed to handle recovery event"
+ + ", eventType=" + event.getHistoryEvent().getEventType(), e);
}
}
-
-
-
+ public boolean hasRecoveryFailed() {
+ if (recoveryEnabled) {
+ return recoveryService.hasRecoveryFailed();
+ } else {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 110da65..840ad1d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -48,6 +48,9 @@ public class RecoveryService extends AbstractService {
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
private final AppContext appContext;
+ public static final String RECOVERY_FATAL_OCCURRED_DIR =
+ "RecoveryFatalErrorOccurred";
+
private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
new LinkedBlockingQueue<DAGHistoryEvent>();
private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
@@ -69,6 +72,7 @@ public class RecoveryService extends AbstractService {
private long lastFlushTime = -1;
private int maxUnflushedEvents;
private int flushInterval;
+ private AtomicBoolean recoveryFatalErrorOccurred = new AtomicBoolean(false);
public RecoveryService(AppContext appContext) {
super(RecoveryService.class.getName());
@@ -99,6 +103,13 @@ public class RecoveryService extends AbstractService {
DAGHistoryEvent event;
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ if (recoveryFatalErrorOccurred.get()) {
+ LOG.error("Recovery failure occurred. Stopping recovery thread."
+ + " Current eventQueueSize=" + eventQueue.size());
+ eventQueue.clear();
+ return;
+ }
+
// Log the size of the event-queue every so often.
if (eventCounter != 0 && eventCounter % 1000 == 0) {
LOG.info("Event queue stats"
@@ -169,6 +180,11 @@ public class RecoveryService extends AbstractService {
return;
}
HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
+ if (recoveryFatalErrorOccurred.get()) {
+ return;
+ }
+
if (!started.get()) {
LOG.warn("Adding event of type " + eventType
+ " to queue as service not started");
@@ -233,9 +249,29 @@ public class RecoveryService extends AbstractService {
}
}
} catch (IOException ioe) {
- LOG.warn("Error handling summary event"
+ LOG.error("Error handling summary event"
+ ", eventType=" + event.getHistoryEvent().getEventType(), ioe);
- throw ioe;
+ Path fatalErrorDir = new Path(recoveryPath, RECOVERY_FATAL_OCCURRED_DIR);
+ try {
+ LOG.error("Adding a flag to ensure next AM attempt does not start up"
+ + ", flagFile=" + fatalErrorDir.toString());
+ recoveryFatalErrorOccurred.set(true);
+ recoveryDirFS.mkdirs(fatalErrorDir);
+ if (recoveryDirFS.exists(fatalErrorDir)) {
+ LOG.error("Recovery failure occurred. Skipping all events");
+ } else {
+ // throw error if fatal error flag could not be set
+ throw ioe;
+ }
+ } catch (IOException e) {
+ LOG.fatal("Failed to create fatal error flag dir "
+ + fatalErrorDir.toString(), e);
+ throw ioe;
+ }
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+ // Throw error to tell client that dag submission failed
+ throw ioe;
+ }
}
}
} else {
@@ -374,4 +410,8 @@ public class RecoveryService extends AbstractService {
lastFlushTime = currentTime;
}
+ public boolean hasRecoveryFailed() {
+ return recoveryFatalErrorOccurred.get();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5a6f42a8/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 93a3a48..9874f7b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -509,11 +509,15 @@ public class OrderedWordCount {
+ dagStatus.getDiagnostics());
}
}
+ } catch (Exception e) {
+ LOG.error("Error occurred when submitting/running DAGs", e);
+ throw e;
} finally {
if (!retainStagingDir) {
fs.delete(stagingDir, true);
}
if (useTezSession) {
+ LOG.info("Shutting down session");
tezSession.stop();
}
}