You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/14 02:47:32 UTC
[1/2] TEZ-904. Committer recovery events should be out-of-band.
(hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master 693f2ca92 -> f58508a56
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 5986657..0074a4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -18,6 +18,16 @@
package org.apache.tez.dag.history.recovery;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -33,16 +43,6 @@ import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class RecoveryService extends AbstractService {
private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@@ -120,7 +120,7 @@ public class RecoveryService extends AbstractService {
synchronized (lock) {
try {
++eventsProcessed;
- handleEvent(event);
+ handleRecoveryEvent(event);
} catch (Exception e) {
// TODO handle failures - treat as fatal or ignore?
LOG.warn("Error handling recovery event", e);
@@ -175,26 +175,56 @@ public class RecoveryService extends AbstractService {
return;
}
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
- || eventType.equals(HistoryEventType.DAG_FINISHED)) {
- // handle submissions and completion immediately
+ TezDAGID dagId = event.getDagID();
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+ DAGSubmittedEvent dagSubmittedEvent =
+ (DAGSubmittedEvent) event.getHistoryEvent();
+ String dagName = dagSubmittedEvent.getDAGName();
+ if (dagName != null
+ && dagName.startsWith(
+ TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+ // Skip recording pre-warm DAG events
+ skippedDAGs.add(dagId);
+ return;
+ }
+ }
+ if (dagId == null || skippedDAGs.contains(dagId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping event for DAG"
+ + ", eventType=" + eventType
+ + ", dagId=" + (dagId == null ? "null" : dagId.toString())
+ + ", isSkippedDAG=" + (dagId == null ? "null"
+ : skippedDAGs.contains(dagId)));
+ }
+ return;
+ }
+
+ if (event.getHistoryEvent() instanceof SummaryEvent) {
synchronized (lock) {
try {
- handleEvent(event);
+ SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
+ handleSummaryEvent(dagId, eventType, summaryEvent);
summaryStream.hsync();
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
- if (outputStreamMap.containsKey(event.getDagID())) {
- doFlush(outputStreamMap.get(event.getDagID()),
- appContext.getClock().getTime(), true);
+ if (summaryEvent.writeToRecoveryImmediately()) {
+ handleRecoveryEvent(event);
+ doFlush(outputStreamMap.get(event.getDagID()),
+ appContext.getClock().getTime(), true);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
+ + eventType.name());
}
- } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
- completedDAGs.add(event.getDagID());
- if (outputStreamMap.containsKey(event.getDagID())) {
+ eventQueue.add(event);
+ }
+ if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+ LOG.info("DAG completed"
+ + ", dagId=" + event.getDagID()
+ + ", queueSize=" + eventQueue.size());
+ completedDAGs.add(dagId);
+ if (outputStreamMap.containsKey(dagId)) {
try {
- doFlush(outputStreamMap.get(event.getDagID()),
- appContext.getClock().getTime(), true);
- outputStreamMap.get(event.getDagID()).close();
- outputStreamMap.remove(event.getDagID());
+ outputStreamMap.get(dagId).close();
+ outputStreamMap.remove(dagId);
} catch (IOException ioe) {
LOG.warn("Error when trying to flush/close recovery file for"
+ " dag, dagId=" + event.getDagID());
@@ -207,87 +237,71 @@ public class RecoveryService extends AbstractService {
LOG.warn("Error handling recovery event", e);
}
}
- LOG.info("DAG completed"
- + ", dagId=" + event.getDagID()
- + ", queueSize=" + eventQueue.size());
} else {
// All other events just get queued
if (LOG.isDebugEnabled()) {
- LOG.debug("Queueing Recovery event of type " + eventType.name());
+ LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
}
eventQueue.add(event);
}
}
+ private void handleSummaryEvent(TezDAGID dagID,
+ HistoryEventType eventType,
+ SummaryEvent summaryEvent) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handling summary event"
+ + ", dagID=" + dagID
+ + ", eventType=" + eventType);
+ }
+ try {
+ if (summaryStream == null) {
+ Path summaryPath = new Path(recoveryPath,
+ appContext.getApplicationID()
+ + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ if (!recoveryDirFS.exists(summaryPath)) {
+ summaryStream = recoveryDirFS.create(summaryPath, false,
+ bufferSize);
+ } else {
+ summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing recovery event to summary stream"
+ + ", dagId=" + dagID
+ + ", eventType=" + eventType);
+ }
+ summaryEvent.toSummaryProtoStream(summaryStream);
+ } catch (IOException ioe) {
+ // FIXME handle failures
+ LOG.warn("Failed to write to stream", ioe);
+ }
+
- private void handleEvent(DAGHistoryEvent event) {
+ }
+
+ private void handleRecoveryEvent(DAGHistoryEvent event) {
HistoryEventType eventType = event.getHistoryEvent().getEventType();
if (LOG.isDebugEnabled()) {
LOG.debug("Handling recovery event of type "
+ event.getHistoryEvent().getEventType());
}
- if (event.getDagID() == null) {
- // AM event
- // anything to be done?
- // TODO
- LOG.info("Skipping Recovery Event as DAG is null"
- + ", eventType=" + event.getHistoryEvent().getEventType());
- return;
- }
-
TezDAGID dagID = event.getDagID();
- if (completedDAGs.contains(dagID)
- || skippedDAGs.contains(dagID)) {
- // Skip events for completed and skipped DAGs
+
+ if (completedDAGs.contains(dagID)) {
// no need to recover completed DAGs
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping Recovery Event as either completed or skipped"
+ LOG.debug("Skipping Recovery Event as DAG completed"
+ ", dagId=" + dagID
+ ", completed=" + completedDAGs.contains(dagID)
+ ", skipped=" + skippedDAGs.contains(dagID)
- + ", eventType=" + event.getHistoryEvent().getEventType());
+ + ", eventType=" + eventType);
}
return;
}
try {
- if (summaryStream == null) {
- Path summaryPath = new Path(recoveryPath,
- appContext.getApplicationID()
- + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
- if (!recoveryDirFS.exists(summaryPath)) {
- summaryStream = recoveryDirFS.create(summaryPath, false,
- bufferSize);
- } else {
- summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
- }
- }
-
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
- || eventType.equals(HistoryEventType.DAG_FINISHED)) {
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
- DAGSubmittedEvent dagSubmittedEvent =
- (DAGSubmittedEvent) event.getHistoryEvent();
- String dagName = dagSubmittedEvent.getDAGName();
- if (dagName != null
- && dagName.startsWith(
- TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
- // Skip recording pre-warm DAG events
- skippedDAGs.add(dagID);
- return;
- }
- }
- SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing recovery event to summary stream"
- + ", dagId=" + dagID
- + ", type="
- + event.getHistoryEvent().getEventType());
- }
- summaryEvent.toSummaryProtoStream(summaryStream);
- }
-
if (!outputStreamMap.containsKey(dagID)) {
Path dagFilePath = new Path(recoveryPath,
dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
@@ -313,8 +327,7 @@ public class RecoveryService extends AbstractService {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing recovery event to output stream"
+ ", dagId=" + dagID
- + ", type="
- + event.getHistoryEvent().getEventType());
+ + ", eventType=" + eventType);
}
++unflushedEventsCount;
outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
@@ -339,11 +352,9 @@ public class RecoveryService extends AbstractService {
&& ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
doFlush = true;
}
-
if (!doFlush) {
return;
}
-
doFlush(outputStream, currentTime, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 65f3aaf..c640baa 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -59,6 +59,7 @@ message DAGStartedProto {
message DAGCommitStartedProto {
optional string dag_id = 1;
}
+
message DAGFinishedProto {
optional string dag_id = 1;
optional int64 finish_time = 2;
@@ -99,6 +100,20 @@ message VertexCommitStartedProto {
optional string vertex_id = 1;
}
+message VertexCommitFinishedProto {
+ optional string vertex_id = 1;
+}
+
+message VertexGroupCommitStartedProto {
+ optional string dag_id = 1;
+ optional string vertex_group_name = 2;
+}
+
+message VertexGroupCommitFinishedProto {
+ optional string dag_id = 1;
+ optional string vertex_group_name = 2;
+}
+
message VertexFinishedProto {
optional string vertex_name = 1;
optional string vertex_id = 2;
@@ -162,4 +177,10 @@ message SummaryEventProto {
optional string dag_id = 1;
optional int64 timestamp = 2;
optional int32 event_type = 3;
+ optional bytes event_payload = 4;
+}
+
+message VertexFinishStateProto {
+ optional string vertex_id = 1;
+ optional int32 state = 2;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index f03863c..2cf3eaf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -35,10 +35,12 @@ import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -78,6 +80,26 @@ public class TestHistoryEventsProtoConversion {
return deserializedEvent;
}
+ private HistoryEvent testSummaryProtoConversion(HistoryEvent historyEvent)
+ throws IOException {
+ SummaryEvent event = (SummaryEvent) historyEvent;
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ HistoryEvent deserializedEvent = null;
+ event.toSummaryProtoStream(os);
+ os.flush();
+ os.close();
+ LOG.info("Serialized event to byte array"
+ + ", eventType=" + historyEvent.getEventType()
+ + ", bufLen=" + os.toByteArray().length);
+ SummaryEventProto summaryEventProto =
+ SummaryEventProto.parseDelimitedFrom(
+ new ByteArrayInputStream(os.toByteArray()));
+ deserializedEvent = RuntimeUtils.createClazzInstance(
+ event.getClass().getName());
+ ((SummaryEvent)deserializedEvent).fromSummaryProtoStream(summaryEventProto);
+ return deserializedEvent;
+ }
+
private void logEvents(HistoryEvent event,
HistoryEvent deserializedEvent) {
LOG.info("Initial Event toString: " + event.toString());
@@ -485,7 +507,7 @@ public class TestHistoryEventsProtoConversion {
private void testDAGCommitStartedEvent() throws Exception {
DAGCommitStartedEvent event = new DAGCommitStartedEvent(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1));
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100l);
DAGCommitStartedEvent deserializedEvent =
(DAGCommitStartedEvent) testProtoConversion(event);
Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
@@ -495,13 +517,56 @@ public class TestHistoryEventsProtoConversion {
private void testVertexCommitStartedEvent() throws Exception {
VertexCommitStartedEvent event = new VertexCommitStartedEvent(
TezVertexID.getInstance(
- TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1));
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), 100l);
VertexCommitStartedEvent deserializedEvent =
(VertexCommitStartedEvent) testProtoConversion(event);
Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
logEvents(event, deserializedEvent);
}
+ private void testVertexGroupCommitStartedEvent() throws Exception {
+ VertexGroupCommitStartedEvent event = new VertexGroupCommitStartedEvent(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
+ "fooGroup", 1000344l);
+ {
+ VertexGroupCommitStartedEvent deserializedEvent =
+ (VertexGroupCommitStartedEvent) testProtoConversion(event);
+ Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+ Assert.assertEquals(event.getVertexGroupName(),
+ deserializedEvent.getVertexGroupName());
+ logEvents(event, deserializedEvent);
+ }
+ {
+ VertexGroupCommitStartedEvent deserializedEvent =
+ (VertexGroupCommitStartedEvent) testSummaryProtoConversion(event);
+ Assert.assertEquals(event.getVertexGroupName(),
+ deserializedEvent.getVertexGroupName());
+ logEvents(event, deserializedEvent);
+ }
+ }
+
+ private void testVertexGroupCommitFinishedEvent() throws Exception {
+ VertexGroupCommitFinishedEvent event = new VertexGroupCommitFinishedEvent(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
+ "fooGroup", 1000344l);
+ {
+ VertexGroupCommitFinishedEvent deserializedEvent =
+ (VertexGroupCommitFinishedEvent) testProtoConversion(event);
+ Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+ Assert.assertEquals(event.getVertexGroupName(),
+ deserializedEvent.getVertexGroupName());
+ logEvents(event, deserializedEvent);
+ }
+ {
+ VertexGroupCommitFinishedEvent deserializedEvent =
+ (VertexGroupCommitFinishedEvent) testSummaryProtoConversion(event);
+ Assert.assertEquals(event.getVertexGroupName(),
+ deserializedEvent.getVertexGroupName());
+ logEvents(event, deserializedEvent);
+ }
+ }
+
+
@Test
public void testDefaultProtoConversion() throws Exception {
for (HistoryEventType eventType : HistoryEventType.values()) {
@@ -560,6 +625,12 @@ public class TestHistoryEventsProtoConversion {
case VERTEX_COMMIT_STARTED:
testVertexCommitStartedEvent();
break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ testVertexGroupCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ testVertexGroupCommitFinishedEvent();
+ break;
default:
throw new Exception("Unhandled Event type in Unit tests: " + eventType);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
new file mode 100644
index 0000000..2716fdd
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.apache.tez.test.dag.SimpleVTestDAG;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+public class TestDAGRecovery2 {
+
+ private static final Log LOG = LogFactory.getLog(TestDAGRecovery2.class);
+
+ private static Configuration conf = new Configuration();
+ private static MiniTezCluster miniTezCluster;
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestDAGRecovery2.class.getName() + "-tmpDir";
+ protected static MiniDFSCluster dfsCluster;
+
+ private static TezSession tezSession = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ LOG.info("Starting mini clusters");
+ FileSystem remoteFs = null;
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .format(true).racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ if (miniTezCluster == null) {
+ miniTezCluster = new MiniTezCluster(TestDAGRecovery2.class.getName(),
+ 1, 1, 1);
+ Configuration miniTezconf = new Configuration(conf);
+ miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+ miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ miniTezCluster.init(miniTezconf);
+ miniTezCluster.start();
+
+ Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+ TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
+ tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+ tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+ tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+ tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+ AMConfiguration amConfig = new AMConfiguration(
+ new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("TestDAGRecovery2", tezSessionConfig);
+ tezSession.start();
+ }
+ }
+ void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+ TezSessionStatus status = tezSession.getSessionStatus();
+ while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
+ LOG.info("Waiting for session to be ready. Current: " + status);
+ Thread.sleep(100);
+ status = tezSession.getSessionStatus();
+ }
+ if (status == TezSessionStatus.SHUTDOWN) {
+ throw new TezUncheckedException("Unexpected Session shutdown");
+ }
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+ + " DAG name: " + dag.getName()
+ + " DAG appId: " + dagClient.getApplicationId()
+ + " Current state: " + dagStatus.getState());
+ Thread.sleep(100);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+
+ Assert.assertEquals(finalState, dagStatus.getState());
+ }
+
+ @Test(timeout=120000)
+ public void testBasicRecovery() throws Exception {
+ DAG dag = SimpleVTestDAG.createDAG("FailingCommitterDAG", null);
+ OutputDescriptor od =
+ new OutputDescriptor(MultiAttemptDAG.NoOpOutput.class.getName());
+ od.setUserPayload(new
+ MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true)
+ .toUserPayload());
+ dag.getVertex("v3").addOutput("FailingOutput", od,
+ MultiAttemptDAG.FailingOutputCommitter.class);
+ runDAGAndVerify(dag, State.FAILED);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index 086e458..6a25f13 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -216,6 +216,11 @@ public class TestProcessor implements LogicalIOProcessor {
LOG.info(msg);
}
for (Map.Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+ if (!(entry.getValue() instanceof TestOutput)) {
+ LOG.info("Ignoring non TestOutput: " + entry.getKey()
+ + " outputClass= " + entry.getValue().getClass().getSimpleName());
+ continue;
+ }
LOG.info("Writing output: " + entry.getKey() + " sum= " + sum);
TestOutput output = (TestOutput) entry.getValue();
output.write(sum);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 65c8383..80d8588 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -18,6 +18,8 @@
package org.apache.tez.test.dag;
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Ints;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -30,15 +32,24 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.apache.tez.test.TestProcessor;
+import org.apache.tez.test.dag.MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig;
import java.util.ArrayList;
import java.util.List;
@@ -56,6 +67,10 @@ public class MultiAttemptDAG {
"tez.multi-attempt-dag.vertex.num-tasks";
public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
+ public static String MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER =
+ "tez.multi-attempt-dag.use-failing-committer";
+ public static boolean MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER_DEFAULT = false;
+
public static class FailOnAttemptVertexManagerPlugin
implements VertexManagerPlugin {
private int numSourceTasks = 0;
@@ -130,6 +145,100 @@ public class MultiAttemptDAG {
}
}
+ public static class FailingOutputCommitter extends OutputCommitter {
+
+ boolean failOnCommit = false;
+
+ @Override
+ public void initialize(OutputCommitterContext context) throws Exception {
+ FailingOutputCommitterConfig config = new
+ FailingOutputCommitterConfig();
+ config.fromUserPayload(context.getUserPayload());
+ failOnCommit = config.failOnCommit;
+ }
+
+ @Override
+ public void setupOutput() throws Exception {
+
+ }
+
+ @Override
+ public void commitOutput() throws Exception {
+ if (failOnCommit) {
+ LOG.info("Committer causing AM to shutdown");
+ Runtime.getRuntime().halt(-1);
+ }
+ }
+
+ @Override
+ public void abortOutput(State finalState) throws Exception {
+
+ }
+
+ public static class FailingOutputCommitterConfig {
+ boolean failOnCommit;
+
+ public FailingOutputCommitterConfig() {
+ this(false);
+ }
+
+ public FailingOutputCommitterConfig(boolean failOnCommit) {
+ this.failOnCommit = failOnCommit;
+ }
+
+ public byte[] toUserPayload() {
+ return Ints.toByteArray((failOnCommit ? 1 : 0));
+ }
+
+ public void fromUserPayload(byte[] userPayload) {
+ int failInt = Ints.fromByteArray(userPayload);
+ if (failInt == 0) {
+ failOnCommit = false;
+ } else {
+ failOnCommit = true;
+ }
+ }
+ }
+ }
+
+ public static class NoOpOutput implements LogicalOutput, MemoryUpdateCallback {
+
+ @Override
+ public void setNumPhysicalOutputs(int numOutputs) {
+
+ }
+
+ @Override
+ public List<Event> initialize(TezOutputContext outputContext) throws Exception {
+ outputContext.requestInitialMemory(1l, this);
+ return null;
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ }
+
+ @Override
+ public Writer getWriter() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+
+ }
+
+ @Override
+ public List<Event> close() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void memoryAssigned(long assignedSize) {
+ }
+ }
+
public static DAG createDAG(String name,
Configuration conf) throws Exception {
[2/2] git commit: TEZ-904. Committer recovery events should be
out-of-band. (hitesh)
Posted by hi...@apache.org.
TEZ-904. Committer recovery events should be out-of-band. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f58508a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f58508a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f58508a5
Branch: refs/heads/master
Commit: f58508a5692c817a11b7996ba563ba8a26b85560
Parents: 693f2ca
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Mar 13 18:47:05 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Mar 13 18:47:05 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/app/DAGAppMaster.java | 77 ++--
.../org/apache/tez/dag/app/RecoveryParser.java | 354 +++++++++++++++----
.../dag/app/dag/event/DAGEventRecoverEvent.java | 37 ++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 73 +++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 114 +++---
.../apache/tez/dag/history/HistoryEvent.java | 8 +-
.../tez/dag/history/HistoryEventType.java | 4 +-
.../apache/tez/dag/history/SummaryEvent.java | 11 +
.../history/events/DAGCommitStartedEvent.java | 34 +-
.../dag/history/events/DAGFinishedEvent.java | 33 +-
.../dag/history/events/DAGSubmittedEvent.java | 23 +-
.../events/VertexCommitStartedEvent.java | 41 ++-
.../dag/history/events/VertexFinishedEvent.java | 51 ++-
.../events/VertexGroupCommitFinishedEvent.java | 132 +++++++
.../events/VertexGroupCommitStartedEvent.java | 132 +++++++
.../dag/history/recovery/RecoveryService.java | 181 +++++-----
tez-dag/src/main/proto/HistoryEvents.proto | 21 ++
.../TestHistoryEventsProtoConversion.java | 75 +++-
.../org/apache/tez/test/TestDAGRecovery2.java | 144 ++++++++
.../java/org/apache/tez/test/TestProcessor.java | 5 +
.../apache/tez/test/dag/MultiAttemptDAG.java | 109 ++++++
21 files changed, 1390 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9df8752..9a01090 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app;
import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -99,6 +100,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.dag.app.RecoveryParser.RecoveredDAGData;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
@@ -108,6 +110,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -1371,28 +1374,19 @@ public class DAGAppMaster extends AbstractService {
}
}
- private DAG recoverDAG() throws IOException {
- DAG recoveredDAG = null;
+ private RecoveredDAGData recoverDAG() throws IOException {
if (recoveryEnabled) {
if (this.appAttemptID.getAttemptId() > 1) {
+ LOG.info("Recovering data from previous attempts"
+ + ", currentAttemptId=" + this.appAttemptID.getAttemptId());
this.state = DAGAppMasterState.RECOVERING;
RecoveryParser recoveryParser = new RecoveryParser(
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
- recoveredDAG = recoveryParser.parseRecoveryData();
- if (recoveredDAG != null) {
- LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
- _updateLoggers(recoveredDAG, "");
- DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
- DAGEventType.DAG_RECOVER);
- dagEventDispatcher.handle(recoverDAGEvent);
- this.state = DAGAppMasterState.RUNNING;
- } else {
- LOG.info("No DAG to recover");
- this.state = DAGAppMasterState.IDLE;
- }
+ RecoveredDAGData recoveredDAGData = recoveryParser.parseRecoveryData();
+ return recoveredDAGData;
}
}
- return recoveredDAG;
+ return null;
}
@SuppressWarnings("unchecked")
@@ -1415,20 +1409,56 @@ public class DAGAppMaster extends AbstractService {
this.lastDAGCompletionTime = clock.getTime();
+ RecoveredDAGData recoveredDAGData = recoverDAG();
+
if (!isSession) {
- DAG recoveredDAG = null;
- if (appAttemptID.getAttemptId() != 1) {
- recoveredDAG = recoverDAG();
+ LOG.info("In Non-Session mode.");
+ } else {
+ LOG.info("In Session mode. Waiting for DAG over RPC");
+ this.state = DAGAppMasterState.IDLE;
+ }
+
+ if (recoveredDAGData != null) {
+ if (recoveredDAGData.isCompleted
+ || recoveredDAGData.nonRecoverable) {
+ LOG.info("Found previous DAG in completed or non-recoverable state"
+ + ", dagId=" + recoveredDAGData.recoveredDagID
+ + ", isCompleted=" + recoveredDAGData.isCompleted
+ + ", isNonRecoverable=" + recoveredDAGData.nonRecoverable
+ + ", state=" + (recoveredDAGData.dagState == null ? "null" :
+ recoveredDAGData.dagState)
+ + ", failureReason=" + recoveredDAGData.reason);
+ _updateLoggers(recoveredDAGData.recoveredDAG, "");
+ if (recoveredDAGData.nonRecoverable) {
+ DAGEventRecoverEvent recoverDAGEvent =
+ new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+ DAGState.FAILED);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
+ } else {
+ DAGEventRecoverEvent recoverDAGEvent =
+ new DAGEventRecoverEvent(recoveredDAGData.recoveredDAG.getID(),
+ recoveredDAGData.dagState);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
+ }
+ } else {
+ LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
+ _updateLoggers(recoveredDAGData.recoveredDAG, "");
+ DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAGData.recoveredDAG.getID(),
+ DAGEventType.DAG_RECOVER);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
}
- if (recoveredDAG == null) {
+ } else {
+ if (!isSession) {
+ // No dag recovered - in non-session, just restart the original DAG
dagCounter.set(0);
startDAG();
}
- } else {
- LOG.info("In Session mode. Waiting for DAG over RPC");
- this.state = DAGAppMasterState.IDLE;
- recoverDAG();
+ }
+ if (isSession) {
this.dagSubmissionTimer = new Timer(true);
this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@Override
@@ -1436,7 +1466,6 @@ public class DAGAppMaster extends AbstractService {
checkAndHandleSessionTimeout();
}
}, sessionTimeoutInterval, sessionTimeoutInterval / 10);
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 9e59849..7e1feca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -18,6 +18,13 @@
package org.apache.tez.dag.app;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,7 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
@@ -37,9 +44,9 @@ import org.apache.tez.dag.history.events.AMLaunchedEvent;
import org.apache.tez.dag.history.events.AMStartedEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
@@ -48,17 +55,15 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
public class RecoveryParser {
@@ -88,6 +93,15 @@ public class RecoveryParser {
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
}
+ public static class RecoveredDAGData {
+ public TezDAGID recoveredDagID = null;
+ public DAGImpl recoveredDAG = null;
+ public DAGState dagState = null;
+ public boolean isCompleted = false;
+ public boolean nonRecoverable = false;
+ public String reason = null;
+ }
+
private static void parseSummaryFile(FSDataInputStream inputStream)
throws IOException {
while (inputStream.available() > 0) {
@@ -149,6 +163,12 @@ public class RecoveryParser {
case VERTEX_COMMIT_STARTED:
event = new VertexCommitStartedEvent();
break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ event = new VertexGroupCommitStartedEvent();
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ event = new VertexGroupCommitFinishedEvent();
+ break;
case VERTEX_FINISHED:
event = new VertexFinishedEvent();
break;
@@ -266,18 +286,25 @@ public class RecoveryParser {
return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
}
- private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
- TezDAGID inProgressDAG = null;
- for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
- if (!entry.getValue().booleanValue()) {
+ private DAGSummaryData getLastCompletedOrInProgressDAG(
+ Map<TezDAGID, DAGSummaryData> dagSummaryDataMap) {
+ DAGSummaryData inProgressDAG = null;
+ DAGSummaryData lastCompletedDAG = null;
+ for (Map.Entry<TezDAGID, DAGSummaryData> entry : dagSummaryDataMap.entrySet()) {
+ if (!entry.getValue().completed) {
if (inProgressDAG != null) {
throw new RuntimeException("Multiple in progress DAGs seen"
- + ", dagId=" + inProgressDAG
+ + ", dagId=" + inProgressDAG.dagId
+ ", dagId=" + entry.getKey());
}
- inProgressDAG = entry.getKey();
+ inProgressDAG = entry.getValue();
+ } else {
+ lastCompletedDAG = entry.getValue();
}
}
+ if (inProgressDAG == null) {
+ return lastCompletedDAG;
+ }
return inProgressDAG;
}
@@ -305,8 +332,134 @@ public class RecoveryParser {
return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
}
+ private static class DAGSummaryData {
+
+ final TezDAGID dagId;
+ boolean completed = false;
+ boolean dagCommitCompleted = true;
+ DAGState dagState;
+ Map<TezVertexID, Boolean> vertexCommitStatus =
+ new HashMap<TezVertexID, Boolean>();
+ Map<String, Boolean> vertexGroupCommitStatus =
+ new HashMap<String, Boolean>();
+ List<HistoryEvent> bufferedSummaryEvents =
+ new ArrayList<HistoryEvent>();
+
+ DAGSummaryData(TezDAGID dagId) {
+ this.dagId = dagId;
+ }
+
+ void handleSummaryEvent(SummaryEventProto proto) throws IOException {
+ HistoryEventType eventType =
+ HistoryEventType.values()[proto.getEventType()];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[RECOVERY SUMMARY]"
+ + " dagId=" + proto.getDagId()
+ + ", timestamp=" + proto.getTimestamp()
+ + ", event=" + eventType);
+ }
+ switch (eventType) {
+ case DAG_SUBMITTED:
+ completed = false;
+ break;
+ case DAG_FINISHED:
+ completed = true;
+ dagCommitCompleted = true;
+ DAGFinishedEvent dagFinishedEvent = new DAGFinishedEvent();
+ dagFinishedEvent.fromSummaryProtoStream(proto);
+ dagState = dagFinishedEvent.getState();
+ break;
+ case DAG_COMMIT_STARTED:
+ dagCommitCompleted = false;
+ break;
+ case VERTEX_COMMIT_STARTED:
+ VertexCommitStartedEvent vertexCommitStartedEvent =
+ new VertexCommitStartedEvent();
+ vertexCommitStartedEvent.fromSummaryProtoStream(proto);
+ vertexCommitStatus.put(
+ vertexCommitStartedEvent.getVertexID(), false);
+ break;
+ case VERTEX_FINISHED:
+ VertexFinishedEvent vertexFinishedEvent =
+ new VertexFinishedEvent();
+ vertexFinishedEvent.fromSummaryProtoStream(proto);
+ if (vertexCommitStatus.containsKey(vertexFinishedEvent.getVertexID())) {
+ vertexCommitStatus.put(
+ vertexFinishedEvent.getVertexID(), true);
+ bufferedSummaryEvents.add(vertexFinishedEvent);
+ }
+ break;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+ new VertexGroupCommitStartedEvent();
+ vertexGroupCommitStartedEvent.fromSummaryProtoStream(proto);
+ bufferedSummaryEvents.add(vertexGroupCommitStartedEvent);
+ vertexGroupCommitStatus.put(
+ vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+ new VertexGroupCommitFinishedEvent();
+ vertexGroupCommitFinishedEvent.fromSummaryProtoStream(proto);
+ bufferedSummaryEvents.add(vertexGroupCommitFinishedEvent);
+ vertexGroupCommitStatus.put(
+ vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+ break;
+ }
+ }
- public DAG parseRecoveryData() throws IOException {
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("dagId=").append(dagId);
+ sb.append(", dagCompleted=").append(completed);
+ if (!vertexCommitStatus.isEmpty()) {
+ sb.append(", vertexCommitStatuses=[");
+ for (Entry<TezVertexID, Boolean> entry : vertexCommitStatus.entrySet()) {
+ sb.append("{ vertexId=").append(entry.getKey())
+ .append(", committed=").append(entry.getValue()).append("}, ");
+ }
+ sb.append("]");
+ }
+ if (!vertexGroupCommitStatus.isEmpty()) {
+ sb.append(", vertexGroupCommitStatuses=[");
+ for (Entry<String, Boolean> entry : vertexGroupCommitStatus.entrySet()) {
+ sb.append("{ vertexGroup=").append(entry.getKey())
+ .append(", committed=").append(entry.getValue()).append("}, ");
+ }
+ sb.append("]");
+ }
+ return sb.toString();
+ }
+ }
+
+ private String isDAGRecoverable(DAGSummaryData data) {
+ if (!data.dagCommitCompleted) {
+ return "DAG Commit was in progress, not recoverable"
+ + ", dagId=" + data.dagId;
+ }
+ if (!data.vertexCommitStatus.isEmpty()) {
+ for (Entry<TezVertexID, Boolean> entry : data.vertexCommitStatus.entrySet()) {
+ if (!(entry.getValue().booleanValue())) {
+ return "Vertex Commit was in progress, not recoverable"
+ + ", dagId=" + data.dagId
+ + ", vertexId=" + entry.getKey();
+ }
+ }
+ }
+ if (!data.vertexGroupCommitStatus.isEmpty()) {
+ for (Entry<String, Boolean> entry : data.vertexGroupCommitStatus.entrySet()) {
+ if (!(entry.getValue().booleanValue())) {
+ return "Vertex Group Commit was in progress, not recoverable"
+ + ", dagId=" + data.dagId
+ + ", vertexGroup=" + entry.getKey();
+ }
+ }
+ }
+ return null;
+ }
+
+ public RecoveredDAGData parseRecoveryData() throws IOException {
Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
+ " for recovering data from previous attempt");
@@ -330,8 +483,6 @@ public class RecoveryParser {
FSDataOutputStream newSummaryStream =
getSummaryOutputStream(newSummaryPath);
- Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
-
FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
LOG.info("Parsing summary file"
+ ", path=" + summaryPath.toString()
@@ -339,6 +490,8 @@ public class RecoveryParser {
+ ", lastModTime=" + summaryFileStatus.getModificationTime());
int dagCounter = 0;
+ Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
+ new HashMap<TezDAGID, DAGSummaryData>();
while (summaryStream.available() > 0) {
RecoveryProtos.SummaryEventProto proto =
RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
@@ -354,11 +507,10 @@ public class RecoveryParser {
if (dagCounter < dagId.getId()) {
dagCounter = dagId.getId();
}
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
- seenDAGs.put(dagId, false);
- } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
- seenDAGs.put(dagId, true);
+ if (!dagSummaryDataMap.containsKey(dagId)) {
+ dagSummaryDataMap.put(dagId, new DAGSummaryData(dagId));
}
+ dagSummaryDataMap.get(dagId).handleSummaryEvent(proto);
proto.writeDelimitedTo(newSummaryStream);
}
newSummaryStream.hsync();
@@ -367,12 +519,35 @@ public class RecoveryParser {
// Set counter for next set of DAGs
dagAppMaster.setDAGCounter(dagCounter);
- TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+ DAGSummaryData lastInProgressDAGData =
+ getLastCompletedOrInProgressDAG(dagSummaryDataMap);
+ if (lastInProgressDAGData == null) {
+ LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
+ return null;
+ }
+ TezDAGID lastInProgressDAG = lastInProgressDAGData.dagId;
if (lastInProgressDAG == null) {
- LOG.info("Nothing to recover as no uncompleted DAGs found");
+ LOG.info("Nothing to recover as no uncompleted/completed DAGs found");
return null;
}
+ LOG.info("Checking if DAG is in recoverable state"
+ + ", dagId=" + lastInProgressDAGData.dagId);
+
+ final RecoveredDAGData recoveredDAGData = new RecoveredDAGData();
+ if (lastInProgressDAGData.completed) {
+ recoveredDAGData.isCompleted = true;
+ recoveredDAGData.dagState = lastInProgressDAGData.dagState;
+ }
+
+ String nonRecoverableReason = isDAGRecoverable(lastInProgressDAGData);
+ if (nonRecoverableReason != null) {
+ LOG.warn("Found last inProgress DAG but not recoverable: "
+ + lastInProgressDAGData);
+ recoveredDAGData.nonRecoverable = true;
+ recoveredDAGData.reason = nonRecoverableReason;
+ }
+
LOG.info("Trying to recover dag from recovery file"
+ ", dagId=" + lastInProgressDAG.toString()
+ ", dataDir=" + previousAttemptRecoveryDataDir
@@ -387,14 +562,13 @@ public class RecoveryParser {
+ ", dagId=" + lastInProgressDAG);
}
- DAGImpl recoveredDAG = null;
-
LOG.info("Copying DAG data into Current Attempt directory"
+ ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
lastInProgressDAG));
FSDataOutputStream newDAGRecoveryStream =
getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
+ boolean skipAllOtherEvents = false;
while (dagRecoveryStream.available() > 0) {
HistoryEvent event;
try {
@@ -403,8 +577,9 @@ public class RecoveryParser {
LOG.warn("Corrupt data found when trying to read next event", ioe);
break;
}
- if (event == null) {
+ if (event == null || skipAllOtherEvents) {
// reached end of data
+ event = null;
break;
}
HistoryEventType eventType = event.getEventType();
@@ -414,9 +589,13 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+ recoveredDAGData.recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
lastInProgressDAG);
- dagAppMaster.setCurrentDAG(recoveredDAG);
+ recoveredDAGData.recoveredDagID = recoveredDAGData.recoveredDAG.getID();
+ dagAppMaster.setCurrentDAG(recoveredDAGData.recoveredDAG);
+ if (recoveredDAGData.nonRecoverable) {
+ skipAllOtherEvents = true;
+ }
break;
}
case DAG_INITIALIZED:
@@ -424,8 +603,8 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_STARTED:
@@ -433,8 +612,8 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_COMMIT_STARTED:
@@ -442,8 +621,26 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case VERTEX_GROUP_COMMIT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
break;
}
case DAG_FINISHED:
@@ -452,13 +649,16 @@ public class RecoveryParser {
+ ", eventType=" + eventType
+ ", event=" + event.toString());
// If this is seen, nothing to recover
- assert recoveredDAG != null;
- recoveredDAG.restoreFromEvent(event);
- return recoveredDAG;
+ assert recoveredDAGData.recoveredDAG != null;
+ recoveredDAGData.recoveredDAG.restoreFromEvent(event);
+ recoveredDAGData.isCompleted = true;
+ recoveredDAGData.dagState =
+ ((DAGFinishedEvent) event).getState();
+ skipAllOtherEvents = true;
}
case CONTAINER_LAUNCHED:
{
- // Nothing to do?
+ // Nothing to do for now
break;
}
case VERTEX_INITIALIZED:
@@ -466,9 +666,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -477,9 +677,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexStartedEvent vEvent = (VertexStartedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -488,9 +688,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -499,9 +699,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -510,9 +710,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -521,9 +721,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskStartedEvent tEvent = (TaskStartedEvent) event;
- Task task = recoveredDAG.getVertex(
+ Task task = recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
task.restoreFromEvent(tEvent);
break;
@@ -533,9 +733,9 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
- Task task = recoveredDAG.getVertex(
+ Task task = recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
task.restoreFromEvent(tEvent);
break;
@@ -545,10 +745,10 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
Task task =
- recoveredDAG.getVertex(
+ recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskAttemptID().getTaskID().getVertexID())
.getTask(tEvent.getTaskAttemptID().getTaskID());
task.restoreFromEvent(tEvent);
@@ -559,10 +759,10 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
Task task =
- recoveredDAG.getVertex(
+ recoveredDAGData.recoveredDAG.getVertex(
tEvent.getTaskAttemptID().getTaskID().getVertexID())
.getTask(tEvent.getTaskAttemptID().getTaskID());
task.restoreFromEvent(tEvent);
@@ -573,10 +773,10 @@ public class RecoveryParser {
LOG.info("Recovering from event"
+ ", eventType=" + eventType
+ ", event=" + event.toString());
- assert recoveredDAG != null;
+ assert recoveredDAGData.recoveredDAG != null;
VertexDataMovementEventsGeneratedEvent vEvent =
(VertexDataMovementEventsGeneratedEvent) event;
- Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ Vertex v = recoveredDAGData.recoveredDAG.getVertex(vEvent.getVertexID());
v.restoreFromEvent(vEvent);
break;
}
@@ -596,6 +796,42 @@ public class RecoveryParser {
newDAGRecoveryStream.hsync();
newDAGRecoveryStream.close();
+ if (!recoveredDAGData.isCompleted
+ && !recoveredDAGData.nonRecoverable) {
+ if (lastInProgressDAGData.bufferedSummaryEvents != null
+ && !lastInProgressDAGData.bufferedSummaryEvents.isEmpty()) {
+ for (HistoryEvent bufferedEvent : lastInProgressDAGData.bufferedSummaryEvents) {
+ assert recoveredDAGData.recoveredDAG != null;
+ switch (bufferedEvent.getEventType()) {
+ case VERTEX_GROUP_COMMIT_STARTED:
+ recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+ break;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+ break;
+ case VERTEX_FINISHED:
+ VertexFinishedEvent vertexFinishedEvent =
+ (VertexFinishedEvent) bufferedEvent;
+ Vertex vertex = recoveredDAGData.recoveredDAG.getVertex(
+ vertexFinishedEvent.getVertexID());
+ if (vertex == null) {
+ recoveredDAGData.nonRecoverable = true;
+ recoveredDAGData.reason = "All state could not be recovered"
+ + ", vertex completed but events not flushed"
+ + ", vertexId=" + vertexFinishedEvent.getVertexID();
+ } else {
+ vertex.restoreFromEvent(vertexFinishedEvent);
+ }
+ break;
+ default:
+ throw new RuntimeException("Invalid data found in buffered summary events"
+ + ", unknown event type "
+ + bufferedEvent.getEventType());
+ }
+ }
+ }
+ }
+
Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
dataRecoveredFileFlag);
LOG.info("Finished copying data from previous attempt into current attempt"
@@ -607,7 +843,7 @@ public class RecoveryParser {
flagFile.hsync();
flagFile.close();
- return recoveredDAG;
+ return recoveredDAGData;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
new file mode 100644
index 0000000..e64ad13
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventRecoverEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventRecoverEvent extends DAGEvent {
+
+ private final DAGState desiredState;
+
+ public DAGEventRecoverEvent(TezDAGID dagId, DAGState desiredState) {
+ super(dagId, DAGEventType.DAG_RECOVER);
+ this.desiredState = desiredState;
+ }
+
+ public DAGState getDesiredState() {
+ return desiredState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 432c189..8fc278f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -54,11 +55,11 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.ProgressBuilder;
@@ -73,32 +74,35 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
+import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -357,6 +361,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
private DAGState recoveredState = DAGState.NEW;
private boolean recoveryCommitInProgress = false;
+ Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
static class VertexGroupInfo {
String groupName;
@@ -497,18 +502,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
recoveredState = DAGState.RUNNING;
return recoveredState;
case DAG_COMMIT_STARTED:
- if (recoveredState != DAGState.RUNNING) {
- throw new RuntimeException("Commit Started Event seen but"
- + " recovered state is not RUNNING"
- + ", recoveredState=" + recoveredState);
- }
recoveryCommitInProgress = true;
return recoveredState;
+ case VERTEX_GROUP_COMMIT_STARTED:
+ VertexGroupCommitStartedEvent vertexGroupCommitStartedEvent =
+ (VertexGroupCommitStartedEvent) historyEvent;
+ recoveredGroupCommits.put(
+ vertexGroupCommitStartedEvent.getVertexGroupName(), false);
+ return recoveredState;
+ case VERTEX_GROUP_COMMIT_FINISHED:
+ VertexGroupCommitFinishedEvent vertexGroupCommitFinishedEvent =
+ (VertexGroupCommitFinishedEvent) historyEvent;
+ recoveredGroupCommits.put(
+ vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
+ return recoveredState;
case DAG_FINISHED:
- if (!recoveryStartEventSeen) {
- throw new RuntimeException("Finished Event seen but"
- + " no Start Event was encountered earlier");
- }
recoveryCommitInProgress = false;
DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
this.finishTime = finishedEvent.getFinishTime();
@@ -722,7 +730,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
// commit all shared outputs
appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
- new DAGCommitStartedEvent(getID())));
+ new DAGCommitStartedEvent(getID(), clock.getTime())));
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
if (failedWhileCommitting) {
break;
@@ -1266,6 +1274,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@Override
public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+ if (dagEvent instanceof DAGEventRecoverEvent) {
+ // DAG completed or final end state known
+ DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
+ dag.recoveredState = recoverEvent.getDesiredState();
+ }
+
switch (dag.recoveredState) {
case NEW:
// send DAG an Init and start events
@@ -1292,8 +1306,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
case RUNNING:
// if commit is in progress, DAG should fail as commits are not
// recoverable
- if (dag.recoveryCommitInProgress) {
- // Fail the DAG as we have not seen a completion
+ boolean groupCommitInProgress = false;
+ if (!dag.recoveredGroupCommits.isEmpty()) {
+ for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
+ if (!entry.getValue().booleanValue()) {
+ LOG.info("Found a pending Vertex Group commit"
+ + ", vertexGroup=" + entry.getKey());
+ }
+ groupCommitInProgress = true;
+ }
+ }
+
+ if (groupCommitInProgress || dag.recoveryCommitInProgress) {
+ // Fail the DAG as we have not seen a commit completion
dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
dag.setFinishTime();
// Recover all other data for all vertices
@@ -1314,6 +1339,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGState.FAILED));
return DAGState.FAILED;
}
+
for (Vertex v : dag.vertices.values()) {
if (v.getInputVerticesCount() == 0) {
if (LOG.isDebugEnabled()) {
@@ -1596,10 +1622,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
for (VertexGroupInfo groupInfo : commitList) {
+ if (recoveredGroupCommits.containsKey(groupInfo.groupName)) {
+ LOG.info("VertexGroup was already committed as per recovery"
+ + " data, groupName=" + groupInfo.groupName);
+ continue;
+ }
groupInfo.committed = true;
Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
- new DAGCommitStartedEvent(dagId)));
+ new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
+ clock.getTime())));
for (String outputName : groupInfo.outputs) {
OutputCommitter committer = v.getOutputCommitters().get(outputName);
LOG.info("Committing output: " + outputName);
@@ -1612,6 +1644,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
if (failedCommit) {
break;
}
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+ new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
+ clock.getTime())));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3ca9e11..7b3b6b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -94,6 +94,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
@@ -106,7 +107,6 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
@@ -131,9 +131,9 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -146,7 +146,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
-
/** Implementation of Vertex interface. Maintains the state machines of Vertex.
* The read and write calls use ReadWriteLock for concurrency.
*/
@@ -529,6 +528,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private String logIdentifier;
private boolean recoveryCommitInProgress = false;
+ private boolean summaryCompleteSeen = false;
+ private boolean hasCommitter = false;
+ private boolean vertexCompleteSeen = false;
private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
// Recovery related flags
@@ -905,20 +907,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
return recoveredState;
case VERTEX_COMMIT_STARTED:
- if (recoveredState != VertexState.RUNNING) {
- throw new RuntimeException("Commit Started Event seen but"
- + " recovered state is not RUNNING"
- + ", recoveredState=" + recoveredState);
- }
recoveryCommitInProgress = true;
+ hasCommitter = true;
return recoveredState;
case VERTEX_FINISHED:
- if (!recoveryStartEventSeen) {
- throw new RuntimeException("Finished Event seen but"
- + " no Started Event was encountered earlier");
+ VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+ if (finishedEvent.isFromSummary()) {
+ summaryCompleteSeen = true;
+ } else {
+ vertexCompleteSeen = true;
}
recoveryCommitInProgress = false;
- VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
recoveredState = finishedEvent.getState();
diagnostics.add(finishedEvent.getDiagnostics());
finishTime = finishedEvent.getFinishTime();
@@ -1280,16 +1279,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
LOG.info("Vertex succeeded: " + vertex.logIdentifier);
try {
- if (vertex.outputCommitters != null) {
- vertex.appContext.getHistoryHandler().handle(
- new DAGHistoryEvent(vertex.getDAGId(),
- new VertexCommitStartedEvent(vertex.vertexId)));
- }
if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
// commit only once. Dont commit shared outputs
LOG.info("Invoking committer commit for vertex, vertexId="
+ vertex.logIdentifier);
- if (vertex.outputCommitters != null) {
+ if (vertex.outputCommitters != null
+ && !vertex.outputCommitters.isEmpty()) {
+ boolean firstCommit = true;
for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
final OutputCommitter committer = entry.getValue();
final String outputName = entry.getKey();
@@ -1297,6 +1293,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// dont commit shared committers. Will be committed by the DAG
continue;
}
+ if (firstCommit) {
+ // Log commit start event on first actual commit
+ vertex.appContext.getHistoryHandler().handle(
+ new DAGHistoryEvent(vertex.getDAGId(),
+ new VertexCommitStartedEvent(vertex.vertexId,
+ vertex.clock.getTime())));
+ } else {
+ firstCommit = false;
+ }
vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
@@ -1806,31 +1811,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case SUCCEEDED:
case FAILED:
case KILLED:
- vertex.tasksNotYetScheduled = false;
- // recover tasks
- if (vertex.tasks != null) {
- TaskState taskState = TaskState.KILLED;
- switch (vertex.recoveredState) {
- case SUCCEEDED:
- taskState = TaskState.SUCCEEDED;
- break;
- case KILLED:
- taskState = TaskState.KILLED;
- break;
- case FAILED:
- taskState = TaskState.FAILED;
- break;
- }
- for (Task task : vertex.tasks.values()) {
- vertex.eventHandler.handle(
- new TaskEventRecoverTask(task.getTaskId(),
- taskState));
- }
- vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
- endState = VertexState.RUNNING;
+ if (vertex.recoveredState == VertexState.SUCCEEDED
+ && vertex.hasCommitter
+ && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
+ LOG.warn("Cannot recover vertex as all recovery events not"
+ + " found, vertex=" + vertex.logIdentifier
+ + ", hasCommitters=" + vertex.hasCommitter
+ + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen
+ + ", finalCompletionSeen=" + vertex.vertexCompleteSeen);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.COMMIT_FAILURE);
+ endState = VertexState.FAILED;
} else {
- endState = vertex.recoveredState;
- vertex.finished(endState);
+ vertex.tasksNotYetScheduled = false;
+ // recover tasks
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (vertex.recoveredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = vertex.recoveredState;
+ vertex.finished(endState);
+ }
}
break;
default:
@@ -2318,12 +2336,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
" asked to split by: " + originalSplitSource +
" but was already split by:" + vertex.originalOneToOneSplitSource);
}
- Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
- " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
- " in state " + vertex.getState() +
- " . Split in vertex " + originalSplitSource +
- " sent by vertex " + splitEvent.getSenderVertex() +
- " numTasks " + splitEvent.getNumTasks());
+ Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING,
+ " Unexpected 1-1 split for vertex " + vertex.getVertexId() +
+ " in state " + vertex.getState() +
+ " . Split in vertex " + originalSplitSource +
+ " sent by vertex " + splitEvent.getSenderVertex() +
+ " numTasks " + splitEvent.getNumTasks());
LOG.info("Splitting vertex " + vertex.getVertexId() +
" because of split in vertex " + originalSplitSource +
" sent by vertex " + splitEvent.getSenderVertex() +
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 78d1208..3f756c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -18,16 +18,16 @@
package org.apache.tez.dag.history;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
public interface HistoryEvent {
- HistoryEventType getEventType();
+ public HistoryEventType getEventType();
public JSONObject convertToATSJSON() throws JSONException;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 7b2087a..219bfe3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -36,5 +36,7 @@ public enum HistoryEventType {
CONTAINER_LAUNCHED,
VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
DAG_COMMIT_STARTED,
- VERTEX_COMMIT_STARTED
+ VERTEX_COMMIT_STARTED,
+ VERTEX_GROUP_COMMIT_STARTED,
+ VERTEX_GROUP_COMMIT_FINISHED
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
index 587ee3e..eaae813 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/SummaryEvent.java
@@ -21,8 +21,19 @@ package org.apache.tez.dag.history;
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+
public interface SummaryEvent {
public void toSummaryProtoStream(OutputStream outputStream) throws IOException;
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException;
+
+ /**
+ * Whether to write this event immediately to the DAG recovery file
+ * Summary events are always written immediately to summary file.
+ * @return
+ */
+ public boolean writeToRecoveryImmediately();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 6d5a769..627751a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -18,26 +18,31 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.utils.ProtoUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class DAGCommitStartedEvent implements HistoryEvent {
+public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
+ private long commitStartTime;
public DAGCommitStartedEvent() {
}
- public DAGCommitStartedEvent(TezDAGID dagID) {
+ public DAGCommitStartedEvent(TezDAGID dagID, long commitStartTime) {
this.dagID = dagID;
+ this.commitStartTime = commitStartTime;
}
@Override
@@ -91,4 +96,21 @@ public class DAGCommitStartedEvent implements HistoryEvent {
return dagID;
}
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ ProtoUtils.toSummaryEventProto(dagID, commitStartTime,
+ getEventType()).writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.commitStartTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 38e0702..14381b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -18,6 +18,10 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.DAGState;
@@ -28,15 +32,15 @@ import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGFinishedProto;
-import org.apache.tez.dag.utils.ProtoUtils;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.ByteString;
public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
@@ -166,8 +170,25 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
@Override
public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
- ProtoUtils.toSummaryEventProto(dagID, finishTime,
- HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setTimestamp(finishTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(ByteString.copyFrom(Ints.toByteArray(state.ordinal())));
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.finishTime = proto.getTimestamp();
+ this.state = DAGState.values()[
+ Ints.fromByteArray(proto.getEventPayload().toByteArray())];
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return true;
}
public long getFinishTime() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 853bea7..af83dc8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -18,6 +18,10 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.records.DAGProtos;
@@ -29,16 +33,13 @@ import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.utils.ProtoUtils;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
- public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
+public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
private long submitTime;
@@ -168,6 +169,17 @@ import java.io.OutputStream;
HistoryEventType.DAG_SUBMITTED).writeDelimitedTo(outputStream);
}
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ throw new UnsupportedOperationException("Cannot re-initialize event from"
+ + " summary stream");
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return true;
+ }
+
public String getDAGName() {
if (dagPlan != null && dagPlan.hasName()) {
return dagPlan.getName();
@@ -190,4 +202,5 @@ import java.io.OutputStream;
public long getSubmitTime() {
return submitTime;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 564f9ed..387bff1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -18,26 +18,33 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import com.google.protobuf.ByteString;
-public class VertexCommitStartedEvent implements HistoryEvent {
+public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
private TezVertexID vertexID;
+ private long commitStartTime;
public VertexCommitStartedEvent() {
}
- public VertexCommitStartedEvent(TezVertexID vertexId) {
+ public VertexCommitStartedEvent(TezVertexID vertexId, long commitStartTime) {
this.vertexID = vertexId;
+ this.commitStartTime = commitStartTime;
}
@Override
@@ -91,4 +98,28 @@ public class VertexCommitStartedEvent implements HistoryEvent {
return this.vertexID;
}
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(vertexID.getDAGId().toString())
+ .setTimestamp(commitStartTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(
+ ByteString.copyFrom(vertexID.toString().getBytes()));
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ this.vertexID = TezVertexID.fromString(
+ new String(proto.getEventPayload().toByteArray()));
+ this.commitStartTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 2366eb1..6f07c91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -18,25 +18,29 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishedProto;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class VertexFinishedEvent implements HistoryEvent {
+public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
private TezVertexID vertexID;
private String vertexName;
@@ -48,6 +52,7 @@ public class VertexFinishedEvent implements HistoryEvent {
private VertexState state;
private String diagnostics;
private TezCounters tezCounters;
+ private boolean fromSummary = false;
public VertexFinishedEvent(TezVertexID vertexId,
String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
@@ -186,4 +191,40 @@ public class VertexFinishedEvent implements HistoryEvent {
public TezCounters getTezCounters() {
return tezCounters;
}
+
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ VertexFinishStateProto finishStateProto =
+ VertexFinishStateProto.newBuilder()
+ .setState(state.ordinal())
+ .setVertexId(vertexID.toString())
+ .build();
+
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(vertexID.getDAGId().toString())
+ .setTimestamp(finishTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(finishStateProto.toByteString());
+ builder.build().writeDelimitedTo(outputStream);
+
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ VertexFinishStateProto finishStateProto =
+ VertexFinishStateProto.parseFrom(proto.getEventPayload());
+ this.vertexID = TezVertexID.fromString(finishStateProto.getVertexId());
+ this.state = VertexState.values()[finishStateProto.getState()];
+ this.finishTime = proto.getTimestamp();
+ this.fromSummary = true;
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+ public boolean isFromSummary() {
+ return fromSummary;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
new file mode 100644
index 0000000..99a5288
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
+
+ private TezDAGID dagID;
+ private String vertexGroupName;
+ private long commitFinishTime;
+
+ public VertexGroupCommitFinishedEvent() {
+ }
+
+ public VertexGroupCommitFinishedEvent(TezDAGID dagID,
+ String vertexGroupName, long commitFinishTime) {
+ this.dagID = dagID;
+ this.vertexGroupName = vertexGroupName;
+ this.commitFinishTime = commitFinishTime;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexGroupCommitFinishedProto toProto() {
+ return VertexGroupCommitFinishedProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setVertexGroupName(vertexGroupName)
+ .build();
+ }
+
+ public void fromProto(VertexGroupCommitFinishedProto proto) {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.vertexGroupName = proto.getVertexGroupName();
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "dagId=" + dagID
+ + ", vertexGroup=" + vertexGroupName;
+ }
+
+ public String getVertexGroupName() {
+ return this.vertexGroupName;
+ }
+
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setTimestamp(commitFinishTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(toProto().toByteString());
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ VertexGroupCommitFinishedProto vertexGroupCommitFinishedProto =
+ VertexGroupCommitFinishedProto.parseFrom(proto.getEventPayload());
+ fromProto(vertexGroupCommitFinishedProto);
+ this.commitFinishTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
new file mode 100644
index 0000000..04d6276
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
+
+ private TezDAGID dagID;
+ private String vertexGroupName;
+ private long commitStartTime;
+
+ public VertexGroupCommitStartedEvent() {
+ }
+
+ public VertexGroupCommitStartedEvent(TezDAGID dagID,
+ String vertexGroupName, long commitStartTime) {
+ this.dagID = dagID;
+ this.vertexGroupName = vertexGroupName;
+ this.commitStartTime = commitStartTime;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_GROUP_COMMIT_STARTED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexGroupCommitStartedProto toProto() {
+ return VertexGroupCommitStartedProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setVertexGroupName(vertexGroupName)
+ .build();
+ }
+
+ public void fromProto(VertexGroupCommitStartedProto proto) {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ this.vertexGroupName = proto.getVertexGroupName();
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "dagId=" + dagID
+ + ", vertexGroup=" + vertexGroupName;
+ }
+
+ public String getVertexGroupName() {
+ return this.vertexGroupName;
+ }
+
+ @Override
+ public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
+ SummaryEventProto.Builder builder = RecoveryProtos.SummaryEventProto.newBuilder()
+ .setDagId(dagID.toString())
+ .setTimestamp(commitStartTime)
+ .setEventType(getEventType().ordinal())
+ .setEventPayload(toProto().toByteString());
+ builder.build().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
+ VertexGroupCommitStartedProto vertexGroupCommitStartedProto =
+ VertexGroupCommitStartedProto.parseFrom(proto.getEventPayload());
+ fromProto(vertexGroupCommitStartedProto);
+ this.commitStartTime = proto.getTimestamp();
+ }
+
+ @Override
+ public boolean writeToRecoveryImmediately() {
+ return false;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+}