You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:16 UTC
[1/4] TEZ-847. Support basic AM recovery. (hitesh)
Repository: incubator-tez
Updated Branches:
refs/heads/master 18290c848 -> 5b464f27d
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
new file mode 100644
index 0000000..f03863c
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License: Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing: software
+ * distributed under the License is distributed on an "AS IS" BASIS:
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND: either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.RuntimeUtils;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestHistoryEventsProtoConversion {
+
+ private static final Log LOG = LogFactory.getLog(
+ TestHistoryEventsProtoConversion.class);
+
+
+ private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ HistoryEvent deserializedEvent = null;
+ event.toProtoStream(os);
+ os.flush();
+ os.close();
+ deserializedEvent = RuntimeUtils.createClazzInstance(
+ event.getClass().getName());
+ LOG.info("Serialized event to byte array"
+ + ", eventType=" + event.getEventType()
+ + ", bufLen=" + os.toByteArray().length);
+ deserializedEvent.fromProtoStream(
+ new ByteArrayInputStream(os.toByteArray()));
+ return deserializedEvent;
+ }
+
+ private void logEvents(HistoryEvent event,
+ HistoryEvent deserializedEvent) {
+ LOG.info("Initial Event toString: " + event.toString());
+ LOG.info("Deserialized Event toString: " + deserializedEvent.toString());
+ }
+
+ private void testAMLaunchedEvent() throws Exception {
+ AMLaunchedEvent event = new AMLaunchedEvent(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1),
+ 100, 100);
+ AMLaunchedEvent deserializedEvent = (AMLaunchedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getApplicationAttemptId(),
+ deserializedEvent.getApplicationAttemptId());
+ Assert.assertEquals(event.getAppSubmitTime(),
+ deserializedEvent.getAppSubmitTime());
+ Assert.assertEquals(event.getLaunchTime(),
+ deserializedEvent.getLaunchTime());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testAMStartedEvent() throws Exception {
+ AMStartedEvent event = new AMStartedEvent(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 100);
+ AMStartedEvent deserializedEvent = (AMStartedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getApplicationAttemptId(),
+ deserializedEvent.getApplicationAttemptId());
+ Assert.assertEquals(event.getStartTime(),
+ deserializedEvent.getStartTime());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testDAGSubmittedEvent() throws Exception {
+ DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance(
+ ApplicationId.newInstance(0, 1), 1), 1001l,
+ DAGPlan.newBuilder().setName("foo").build(),
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1));
+ DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getApplicationAttemptId(),
+ deserializedEvent.getApplicationAttemptId());
+ Assert.assertEquals(event.getDagID(),
+ deserializedEvent.getDagID());
+ Assert.assertEquals(event.getDAGName(),
+ deserializedEvent.getDAGName());
+ Assert.assertEquals(event.getSubmitTime(),
+ deserializedEvent.getSubmitTime());
+ Assert.assertEquals(event.getDAGPlan(),
+ deserializedEvent.getDAGPlan());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testDAGInitializedEvent() throws Exception {
+ DAGInitializedEvent event = new DAGInitializedEvent(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+ DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getDagID(),
+ deserializedEvent.getDagID());
+ Assert.assertEquals(event.getInitTime(), deserializedEvent.getInitTime());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testDAGStartedEvent() throws Exception {
+ DAGStartedEvent event = new DAGStartedEvent(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+ DAGStartedEvent deserializedEvent = (DAGStartedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getDagID(),
+ deserializedEvent.getDagID());
+ Assert.assertEquals(event.getStartTime(), deserializedEvent.getStartTime());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testDAGFinishedEvent() throws Exception {
+ {
+ DAGFinishedEvent event = new DAGFinishedEvent(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
+ DAGState.FAILED, null, null);
+ DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(
+ event.getDagID(),
+ deserializedEvent.getDagID());
+ Assert.assertEquals(event.getState(), deserializedEvent.getState());
+ Assert.assertNotEquals(event.getStartTime(), deserializedEvent.getStartTime());
+ Assert.assertEquals(event.getFinishTime(), deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+ Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+ logEvents(event, deserializedEvent);
+ }
+ {
+ TezCounters tezCounters = new TezCounters();
+ tezCounters.addGroup("foo", "bar");
+ tezCounters.getGroup("foo").addCounter("c1", "c1", 100);
+ tezCounters.getGroup("foo").findCounter("c1").increment(1);
+ DAGFinishedEvent event = new DAGFinishedEvent(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
+ DAGState.FAILED, "bad diagnostics", tezCounters);
+ DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(
+ event.getDagID(),
+ deserializedEvent.getDagID());
+ Assert.assertEquals(event.getState(), deserializedEvent.getState());
+ Assert.assertNotEquals(event.getStartTime(), deserializedEvent.getStartTime());
+ Assert.assertEquals(event.getFinishTime(), deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+ Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+ Assert.assertEquals(101,
+ deserializedEvent.getTezCounters().getGroup("foo").findCounter("c1").getValue());
+ logEvents(event, deserializedEvent);
+ }
+ }
+
+ private void testVertexInitializedEvent() throws Exception {
+ VertexInitializedEvent event = new VertexInitializedEvent(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+ "vertex1", 1000l, 15000l, 100, "procName", null);
+ VertexInitializedEvent deserializedEvent = (VertexInitializedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ Assert.assertEquals(event.getInitRequestedTime(),
+ deserializedEvent.getInitRequestedTime());
+ Assert.assertEquals(event.getInitedTime(),
+ deserializedEvent.getInitedTime());
+ Assert.assertEquals(event.getNumTasks(),
+ deserializedEvent.getNumTasks());
+ Assert.assertEquals(event.getAdditionalInputs(),
+ deserializedEvent.getAdditionalInputs());
+ Assert.assertNull(deserializedEvent.getProcessorName());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testVertexStartedEvent() throws Exception {
+ VertexStartedEvent event = new VertexStartedEvent(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+ 145553l, 12334455l);
+ VertexStartedEvent deserializedEvent = (VertexStartedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ Assert.assertEquals(event.getStartRequestedTime(),
+ deserializedEvent.getStartRequestedTime());
+ Assert.assertEquals(event.getStartTime(),
+ deserializedEvent.getStartTime());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testVertexParallelismUpdatedEvent() throws Exception {
+ {
+ VertexParallelismUpdatedEvent event =
+ new VertexParallelismUpdatedEvent(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+ 100, null, null);
+ VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
+ Assert.assertEquals(event.getSourceEdgeManagers(),
+ deserializedEvent.getSourceEdgeManagers());
+ Assert.assertEquals(event.getVertexLocationHint(),
+ deserializedEvent.getVertexLocationHint());
+ logEvents(event, deserializedEvent);
+ }
+ {
+ Map<String,EdgeManagerDescriptor> sourceEdgeManagers
+ = new LinkedHashMap<String, EdgeManagerDescriptor>();
+ sourceEdgeManagers.put("foo", new EdgeManagerDescriptor("bar"));
+ sourceEdgeManagers.put("foo1", new EdgeManagerDescriptor("bar1").setUserPayload(
+ new String("payload").getBytes()));
+ VertexParallelismUpdatedEvent event =
+ new VertexParallelismUpdatedEvent(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+ 100, new VertexLocationHint(Arrays.asList(new TaskLocationHint(
+ new HashSet<String>(Arrays.asList("h1")),
+ new HashSet<String>(Arrays.asList("r1"))))),
+ sourceEdgeManagers);
+
+ VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
+ Assert.assertEquals(event.getSourceEdgeManagers().size(),
+ deserializedEvent.getSourceEdgeManagers().size());
+ Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
+ deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
+ Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo").getUserPayload(),
+ deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
+ Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
+ deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
+ Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload(),
+ deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload());
+ Assert.assertEquals(event.getVertexLocationHint(),
+ deserializedEvent.getVertexLocationHint());
+ logEvents(event, deserializedEvent);
+ }
+ }
+
+ private void testVertexFinishedEvent() throws Exception {
+ {
+ VertexFinishedEvent event =
+ new VertexFinishedEvent(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+ "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+ null, null);
+ VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ Assert.assertEquals(event.getFinishTime(),
+ deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getState(), deserializedEvent.getState());
+ Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+ Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+ logEvents(event, deserializedEvent);
+ }
+ {
+ VertexFinishedEvent event =
+ new VertexFinishedEvent(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+ "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+ "diagnose", new TezCounters());
+ VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ Assert.assertEquals(event.getFinishTime(),
+ deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getState(), deserializedEvent.getState());
+ Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+ Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+ logEvents(event, deserializedEvent);
+ }
+ }
+
+ private void testTaskStartedEvent() throws Exception {
+ TaskStartedEvent event = new TaskStartedEvent(
+ TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+ "vertex1", 1000l, 100000l);
+ TaskStartedEvent deserializedEvent = (TaskStartedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+ Assert.assertEquals(event.getScheduledTime(),
+ deserializedEvent.getScheduledTime());
+ Assert.assertEquals(event.getStartTime(),
+ deserializedEvent.getStartTime());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testTaskFinishedEvent() throws Exception {
+ {
+ TaskFinishedEvent event = new TaskFinishedEvent(
+ TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+ "vertex1", 11000l, 1000000l, null, TaskState.FAILED, null);
+ TaskFinishedEvent deserializedEvent = (TaskFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+ Assert.assertEquals(event.getFinishTime(),
+ deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getState(),
+ deserializedEvent.getState());
+ Assert.assertEquals(event.getTezCounters(),
+ deserializedEvent.getTezCounters());
+ Assert.assertEquals(event.getSuccessfulAttemptID(),
+ deserializedEvent.getSuccessfulAttemptID());
+ logEvents(event, deserializedEvent);
+ }
+ {
+ TaskFinishedEvent event = new TaskFinishedEvent(
+ TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+ "vertex1", 11000l, 1000000l,
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+ TaskState.FAILED, new TezCounters());
+ TaskFinishedEvent deserializedEvent = (TaskFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+ Assert.assertEquals(event.getFinishTime(),
+ deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getState(),
+ deserializedEvent.getState());
+ Assert.assertEquals(event.getTezCounters(),
+ deserializedEvent.getTezCounters());
+ Assert.assertEquals(event.getSuccessfulAttemptID(),
+ deserializedEvent.getSuccessfulAttemptID());
+ logEvents(event, deserializedEvent);
+ }
+ }
+
+ private void testTaskAttemptStartedEvent() throws Exception {
+ TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+ "vertex1", 10009l, ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+ "host1", 19999), "inProgress", "Completed");
+ TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getTaskAttemptID(),
+ deserializedEvent.getTaskAttemptID());
+ Assert.assertEquals(event.getContainerId(),
+ deserializedEvent.getContainerId());
+ Assert.assertEquals(event.getNodeId(),
+ deserializedEvent.getNodeId());
+ Assert.assertEquals(event.getStartTime(),
+ deserializedEvent.getStartTime());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testTaskAttemptFinishedEvent() throws Exception {
+ {
+ TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+ "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+ null, null);
+ TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getTaskAttemptID(),
+ deserializedEvent.getTaskAttemptID());
+ Assert.assertEquals(event.getFinishTime(),
+ deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getDiagnostics(),
+ deserializedEvent.getDiagnostics());
+ Assert.assertEquals(event.getState(),
+ deserializedEvent.getState());
+ Assert.assertEquals(event.getCounters(),
+ deserializedEvent.getCounters());
+ logEvents(event, deserializedEvent);
+ }
+ {
+ TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+ "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+ "diagnose", new TezCounters());
+ TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getTaskAttemptID(),
+ deserializedEvent.getTaskAttemptID());
+ Assert.assertEquals(event.getFinishTime(),
+ deserializedEvent.getFinishTime());
+ Assert.assertEquals(event.getDiagnostics(),
+ deserializedEvent.getDiagnostics());
+ Assert.assertEquals(event.getState(),
+ deserializedEvent.getState());
+ Assert.assertEquals(event.getCounters(),
+ deserializedEvent.getCounters());
+ logEvents(event, deserializedEvent);
+ }
+ }
+
+ private void testContainerLaunchedEvent() throws Exception {
+ ContainerLaunchedEvent event = new ContainerLaunchedEvent(
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1), 1001), 100034566,
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 1), 1));
+ ContainerLaunchedEvent deserializedEvent = (ContainerLaunchedEvent)
+ testProtoConversion(event);
+ Assert.assertEquals(event.getContainerId(),
+ deserializedEvent.getContainerId());
+ Assert.assertEquals(event.getLaunchTime(),
+ deserializedEvent.getLaunchTime());
+ Assert.assertEquals(event.getApplicationAttemptId(),
+ deserializedEvent.getApplicationAttemptId());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
+ VertexDataMovementEventsGeneratedEvent event;
+ try {
+ event = new VertexDataMovementEventsGeneratedEvent(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
+ Assert.fail("Invalid creation should have errored out");
+ } catch (RuntimeException e) {
+ // Expected
+ }
+ List<TezEvent> events =
+ Arrays.asList(new TezEvent(new DataMovementEvent(1, null), new EventMetaData(
+ EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+ event = new VertexDataMovementEventsGeneratedEvent(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
+ VertexDataMovementEventsGeneratedEvent deserializedEvent =
+ (VertexDataMovementEventsGeneratedEvent) testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ Assert.assertEquals(1,
+ deserializedEvent.getTezEvents().size());
+ Assert.assertEquals(event.getTezEvents().get(0).getEventType(),
+ deserializedEvent.getTezEvents().get(0).getEventType());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testDAGCommitStartedEvent() throws Exception {
+ DAGCommitStartedEvent event = new DAGCommitStartedEvent(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1));
+ DAGCommitStartedEvent deserializedEvent =
+ (DAGCommitStartedEvent) testProtoConversion(event);
+ Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+ logEvents(event, deserializedEvent);
+ }
+
+ private void testVertexCommitStartedEvent() throws Exception {
+ VertexCommitStartedEvent event = new VertexCommitStartedEvent(
+ TezVertexID.getInstance(
+ TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1));
+ VertexCommitStartedEvent deserializedEvent =
+ (VertexCommitStartedEvent) testProtoConversion(event);
+ Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+ logEvents(event, deserializedEvent);
+ }
+
+ @Test
+ public void testDefaultProtoConversion() throws Exception {
+ for (HistoryEventType eventType : HistoryEventType.values()) {
+ switch (eventType) {
+ case AM_LAUNCHED:
+ testAMLaunchedEvent();
+ break;
+ case AM_STARTED:
+ testAMStartedEvent();
+ break;
+ case DAG_SUBMITTED:
+ testDAGSubmittedEvent();
+ break;
+ case DAG_INITIALIZED:
+ testDAGInitializedEvent();
+ break;
+ case DAG_STARTED:
+ testDAGStartedEvent();
+ break;
+ case DAG_FINISHED:
+ testDAGFinishedEvent();
+ break;
+ case VERTEX_INITIALIZED:
+ testVertexInitializedEvent();
+ break;
+ case VERTEX_STARTED:
+ testVertexStartedEvent();
+ break;
+ case VERTEX_PARALLELISM_UPDATED:
+ testVertexParallelismUpdatedEvent();
+ break;
+ case VERTEX_FINISHED:
+ testVertexFinishedEvent();
+ break;
+ case TASK_STARTED:
+ testTaskStartedEvent();
+ break;
+ case TASK_FINISHED:
+ testTaskFinishedEvent();
+ break;
+ case TASK_ATTEMPT_STARTED:
+ testTaskAttemptStartedEvent();
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ testTaskAttemptFinishedEvent();
+ break;
+ case CONTAINER_LAUNCHED:
+ testContainerLaunchedEvent();
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ testVertexDataMovementEventsGeneratedEvent();
+ break;
+ case DAG_COMMIT_STARTED:
+ testDAGCommitStartedEvent();
+ break;
+ case VERTEX_COMMIT_STARTED:
+ testVertexCommitStartedEvent();
+ break;
+ default:
+ throw new Exception("Unhandled Event type in Unit tests: " + eventType);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index edcfd9f..b315368 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -87,11 +87,11 @@
</configuration>
<executions>
<execution>
- <id>package-tez-full</id>
+ <id>package-tez</id>
<configuration>
- <finalName>tez-${project.version}-full</finalName>
+ <finalName>tez-${project.version}</finalName>
<descriptors>
- <descriptor>src/main/assembly/tez-dist-full.xml</descriptor>
+ <descriptor>src/main/assembly/tez-dist.xml</descriptor>
</descriptors>
<formats>
<format>${package.format}</format>
@@ -103,11 +103,11 @@
</goals>
</execution>
<execution>
- <id>package-tez</id>
+ <id>package-tez-full</id>
<configuration>
- <finalName>tez-${project.version}</finalName>
+ <finalName>tez-${project.version}-full</finalName>
<descriptors>
- <descriptor>src/main/assembly/tez-dist.xml</descriptor>
+ <descriptor>src/main/assembly/tez-dist-full.xml</descriptor>
</descriptors>
<formats>
<format>${package.format}</format>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 03f28be..e919516 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -47,10 +47,12 @@ public class MROutputCommitter extends OutputCommitter {
private static final Log LOG = LogFactory.getLog(MROutputCommitter.class);
+ private OutputCommitterContext context;
private org.apache.hadoop.mapreduce.OutputCommitter committer = null;
private JobContext jobContext = null;
private volatile boolean initialized = false;
private JobConf jobConf = null;
+ private boolean newApiCommitter;
@Override
public void initialize(OutputCommitterContext context) throws IOException {
@@ -66,7 +68,8 @@ public class MROutputCommitter extends OutputCommitter {
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
context.getDAGAttemptNumber());
- committer = getOutputCommitter(context);
+ this.context = context;
+ committer = getOutputCommitter(this.context);
jobContext = getJobContextFromVertexContext(context);
initialized = true;
}
@@ -101,7 +104,7 @@ public class MROutputCommitter extends OutputCommitter {
getOutputCommitter(OutputCommitterContext context) {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
- boolean newApiCommitter = false;
+ newApiCommitter = false;
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
@@ -118,8 +121,8 @@ public class MROutputCommitter extends OutputCommitter {
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
context.getApplicationId().getId(),
- (jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
- false) ? TaskType.MAP : TaskType.REDUCE),
+ ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
+ TaskType.MAP : TaskType.REDUCE)),
0, context.getDAGAttemptNumber());
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
@@ -179,6 +182,29 @@ public class MROutputCommitter extends OutputCommitter {
}
+ @Override
+ public boolean isTaskRecoverySupported() {
+ if (!initialized) {
+ throw new RuntimeException("Committer not initialized");
+ }
+ return committer.isRecoverySupported();
+ }
+ @Override
+ public void recoverTask(int taskIndex, int attemptId) throws IOException {
+ if (!initialized) {
+ throw new RuntimeException("Committer not initialized");
+ }
+ TaskAttemptID taskAttemptID = new TaskAttemptID(
+ Long.toString(context.getApplicationId().getClusterTimestamp())
+ + String.valueOf(context.getVertexIndex()),
+ context.getApplicationId().getId(),
+ ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
+ TaskType.MAP : TaskType.REDUCE)),
+ taskIndex, attemptId);
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
+ taskAttemptID);
+ committer.recoverTask(taskContext);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
index c05ec57..159934a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -135,7 +135,7 @@ public class EventMetaData implements Writable {
return "{ producerConsumerType=" + producerConsumerType
+ ", taskVertexName=" + taskVertexName
+ ", edgeVertexName=" + edgeVertexName
- + ", taskAttemptId=" + taskAttemptID
+ + ", taskAttemptId=" + (taskAttemptID == null? "null" : taskAttemptID)
+ " }";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
index 1e9265a..1ee7b44 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
@@ -50,6 +50,10 @@ public class InputSpec implements Writable {
return inputDescriptor;
}
+ public void setInputDescriptor(InputDescriptor inputDescriptor) {
+ this.inputDescriptor = inputDescriptor;
+ }
+
public int getPhysicalEdgeCount() {
return physicalEdgeCount;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index f4e1957..e0e19c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -454,7 +454,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
context.scheduleVertexTasks(scheduledTasks);
}
- void schedulePendingTasks() {
+ void schedulePendingTasks() {
int numPendingTasks = pendingTasks.size();
if (numPendingTasks == 0) {
return;
@@ -526,7 +526,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
}
this.context = context;
-
+
this.slowStartMinSrcCompletionFraction = conf
.getFloat(
ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
new file mode 100644
index 0000000..29b6b5e
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+public class TestDAGRecovery {
+
+ private static final Log LOG = LogFactory.getLog(TestDAGRecovery.class);
+
+ private static Configuration conf = new Configuration();
+ private static MiniTezCluster miniTezCluster;
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestDAGRecovery.class.getName() + "-tmpDir";
+ protected static MiniDFSCluster dfsCluster;
+
+ private static TezSession tezSession = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ LOG.info("Starting mini clusters");
+ FileSystem remoteFs = null;
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .format(true).racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ if (miniTezCluster == null) {
+ miniTezCluster = new MiniTezCluster(TestDAGRecovery.class.getName(),
+ 1, 1, 1);
+ Configuration miniTezconf = new Configuration(conf);
+ miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+ miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ miniTezCluster.init(miniTezconf);
+ miniTezCluster.start();
+
+ Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+ TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+ tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+ tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+ tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+ tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+ tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+ AMConfiguration amConfig = new AMConfiguration(
+ new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("TestDAGRecovery", tezSessionConfig);
+ tezSession.start();
+ }
+ }
+ void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+ TezSessionStatus status = tezSession.getSessionStatus();
+ while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
+ LOG.info("Waiting for session to be ready. Current: " + status);
+ Thread.sleep(100);
+ status = tezSession.getSessionStatus();
+ }
+ if (status == TezSessionStatus.SHUTDOWN) {
+ throw new TezUncheckedException("Unexpected Session shutdown");
+ }
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+ + " DAG name: " + dag.getName()
+ + " DAG appId: " + dagClient.getApplicationId()
+ + " Current state: " + dagStatus.getState());
+ Thread.sleep(100);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+
+ Assert.assertEquals(finalState, dagStatus.getState());
+ }
+
+ @Test(timeout=120000)
+ public void testBasicRecovery() throws Exception {
+ DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
new file mode 100644
index 0000000..65c8383
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MultiAttemptDAG {
+
+ private static final Log LOG =
+ LogFactory.getLog(MultiAttemptDAG.class);
+
+ static Resource defaultResource = Resource.newInstance(100, 0);
+ public static String MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS =
+ "tez.multi-attempt-dag.vertex.num-tasks";
+ public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
+
+ public static class FailOnAttemptVertexManagerPlugin
+ implements VertexManagerPlugin {
+ private int numSourceTasks = 0;
+ private AtomicInteger numCompletions = new AtomicInteger();
+ private VertexManagerPluginContext context;
+ private boolean tasksScheduled = false;
+
+ @Override
+ public void initialize(VertexManagerPluginContext context) {
+ this.context = context;
+ for (String input :
+ context.getInputVertexEdgeProperties().keySet()) {
+ LOG.info("Adding sourceTasks for Vertex " + input);
+ numSourceTasks += context.getVertexNumTasks(input);
+ LOG.info("Current numSourceTasks=" + numSourceTasks);
+ }
+ }
+
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions) {
+ if (completions != null) {
+ for (Entry<String, List<Integer>> entry : completions.entrySet()) {
+ LOG.info("Received completion events on vertexStarted"
+ + ", vertex=" + entry.getKey()
+ + ", completions=" + entry.getValue().size());
+ numCompletions.addAndGet(entry.getValue().size());
+ }
+ }
+ maybeScheduleTasks();
+ }
+
+ private synchronized void maybeScheduleTasks() {
+ if (numCompletions.get() >= numSourceTasks
+ && !tasksScheduled) {
+ tasksScheduled = true;
+ String payload = new String(context.getUserPayload());
+ int successAttemptId = Integer.valueOf(payload);
+ LOG.info("Checking whether to crash AM or schedule tasks"
+ + ", successfulAttemptID=" + successAttemptId
+ + ", currentAttempt=" + context.getDAGAttemptNumber());
+ if (successAttemptId > context.getDAGAttemptNumber()) {
+ Runtime.getRuntime().halt(-1);
+ } else if (successAttemptId == context.getDAGAttemptNumber()) {
+ LOG.info("Scheduling tasks for vertex=" + context.getVertexName());
+ int numTasks = context.getVertexNumTasks(context.getVertexName());
+ List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+ for (int i=0; i<numTasks; ++i) {
+ scheduledTasks.add(new Integer(i));
+ }
+ context.scheduleVertexTasks(scheduledTasks);
+ }
+ }
+ }
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ LOG.info("Received completion events for source task"
+ + ", vertex=" + srcVertexName
+ + ", taskIdx=" + taskId);
+ numCompletions.incrementAndGet();
+ maybeScheduleTasks();
+ }
+
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ // Nothing to do
+ }
+
+ @Override
+ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<Event> events) {
+ // Do nothing
+ }
+ }
+
+
+ public static DAG createDAG(String name,
+ Configuration conf) throws Exception {
+ byte[] payload = null;
+ int taskCount = MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT;
+ if (conf != null) {
+ taskCount = conf.getInt(MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS, MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT);
+ payload = TezUtils.createUserPayloadFromConf(conf);
+ }
+ DAG dag = new DAG(name);
+ Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+
+ // Make each vertex manager fail on appropriate attempt
+ v1.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+ FailOnAttemptVertexManagerPlugin.class.getName())
+ .setUserPayload(new String("1").getBytes()));
+ v2.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+ FailOnAttemptVertexManagerPlugin.class.getName())
+ .setUserPayload(new String("2").getBytes()));
+ v3.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+ FailOnAttemptVertexManagerPlugin.class.getName())
+ .setUserPayload(new String("3").getBytes()));
+ dag.addVertex(v1).addVertex(v2).addVertex(v3);
+ dag.addEdge(new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ dag.addEdge(new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ return dag;
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("SimpleVTestDAG", conf);
+ }
+
+}
[2/4] TEZ-847. Support basic AM recovery. (hitesh)
Posted by hi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
new file mode 100644
index 0000000..6d5a769
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class DAGCommitStartedEvent implements HistoryEvent {
+
+ private TezDAGID dagID;
+
+ public DAGCommitStartedEvent() {
+ }
+
+ public DAGCommitStartedEvent(TezDAGID dagID) {
+ this.dagID = dagID;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.DAG_COMMIT_STARTED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public DAGCommitStartedProto toProto() {
+ return DAGCommitStartedProto.newBuilder()
+ .setDagId(dagID.toString())
+ .build();
+ }
+
+ public void fromProto(DAGCommitStartedProto proto) {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "dagID=" + dagID;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index cc9c3ad..38e0702 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
@@ -43,7 +43,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
private long startTime;
private long finishTime;
- private DAGStatus.State state;
+ private DAGState state;
private String diagnostics;
private TezCounters tezCounters;
@@ -51,7 +51,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
}
public DAGFinishedEvent(TezDAGID dagId, long startTime,
- long finishTime, DAGStatus.State state,
+ long finishTime, DAGState state,
String diagnostics, TezCounters counters) {
this.dagID = dagId;
this.startTime = startTime;
@@ -111,22 +111,33 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
}
public DAGFinishedProto toProto() {
- return DAGFinishedProto.newBuilder()
- .setDagId(dagID.toString())
+ DAGFinishedProto.Builder builder = DAGFinishedProto.newBuilder();
+
+ builder.setDagId(dagID.toString())
.setState(state.ordinal())
- .setDiagnostics(diagnostics)
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+
+ return builder.build();
}
public void fromProto(DAGFinishedProto proto) {
this.dagID = TezDAGID.fromString(proto.getDagId());
this.finishTime = proto.getFinishTime();
- this.state = DAGStatus.State.values()[proto.getState()];
- this.diagnostics = proto.getDiagnostics();
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
+ this.state = DAGState.values()[proto.getState()];
+ if (proto.hasDiagnostics()) {
+ this.diagnostics = proto.getDiagnostics();
+ }
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ proto.getCounters());
+ }
}
@Override
@@ -148,9 +159,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", counters=" + ((tezCounters == null) ? "null" :
+ (tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " ")));
}
@Override
@@ -159,4 +170,28 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
}
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public DAGState getState() {
+ return state;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public TezCounters getTezCounters() {
+ return tezCounters;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 20479e6..9b001b6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -94,4 +94,11 @@ public class DAGInitializedEvent implements HistoryEvent {
fromProto(proto);
}
+ public long getInitTime() {
+ return this.initTime;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index 4574753..a1bcdf2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -113,4 +113,11 @@ public class DAGStartedEvent implements HistoryEvent {
+ ", startTime=" + startTime;
}
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 3d24105..853bea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -179,4 +179,15 @@ import java.io.OutputStream;
return this.dagPlan;
}
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 97f3be3..ecb6818 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -110,22 +110,31 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
}
public TaskAttemptFinishedProto toProto() {
- return TaskAttemptFinishedProto.newBuilder()
- .setTaskAttemptId(taskAttemptId.toString())
+ TaskAttemptFinishedProto.Builder builder =
+ TaskAttemptFinishedProto.newBuilder();
+ builder.setTaskAttemptId(taskAttemptId.toString())
.setState(state.ordinal())
- .setDiagnostics(diagnostics)
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+ return builder.build();
}
public void fromProto(TaskAttemptFinishedProto proto) {
this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
this.finishTime = proto.getFinishTime();
this.state = TaskAttemptState.values()[proto.getState()];
- this.diagnostics = proto.getDiagnostics();
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ if (proto.hasDiagnostics()) {
+ this.diagnostics = proto.getDiagnostics();
+ }
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
+ }
}
@Override
@@ -149,9 +158,29 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", counters=" + (tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptId;
+ }
+
+ public TezCounters getCounters() {
+ return tezCounters;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public TaskAttemptState getState() {
+ return state;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 11a1c62..ba91db8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -112,7 +112,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return false;
+ return true;
}
@Override
@@ -158,4 +158,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
+ ", completedLogs=" + completedLogsUrl;
}
+ public TezTaskAttemptID getTaskAttemptID() {
+ return this.taskAttemptId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index dac2f9a..74c804c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.history.events;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -26,6 +27,7 @@ import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskFinishedProto;
import org.codehaus.jettison.json.JSONArray;
@@ -44,9 +46,11 @@ public class TaskFinishedEvent implements HistoryEvent {
private long finishTime;
private TaskState state;
private TezCounters tezCounters;
+ private TezTaskAttemptID successfulAttemptID;
public TaskFinishedEvent(TezTaskID taskID,
String vertexName, long startTime, long finishTime,
+ TezTaskAttemptID successfulAttemptID,
TaskState state, TezCounters counters) {
this.vertexName = vertexName;
this.taskID = taskID;
@@ -103,20 +107,31 @@ public class TaskFinishedEvent implements HistoryEvent {
}
public TaskFinishedProto toProto() {
- return TaskFinishedProto.newBuilder()
- .setTaskId(taskID.toString())
+ TaskFinishedProto.Builder builder = TaskFinishedProto.newBuilder();
+ builder.setTaskId(taskID.toString())
.setState(state.ordinal())
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+ if (successfulAttemptID != null) {
+ builder.setSuccessfulTaskAttemptId(successfulAttemptID.toString());
+ }
+ return builder.build();
}
public void fromProto(TaskFinishedProto proto) {
this.taskID = TezTaskID.fromString(proto.getTaskId());
this.finishTime = proto.getFinishTime();
this.state = TaskState.values()[proto.getState()];
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ proto.getCounters());
+ }
+ if (proto.hasSuccessfulTaskAttemptId()) {
+ this.successfulAttemptID =
+ TezTaskAttemptID.fromString(proto.getSuccessfulTaskAttemptId());
+ }
}
@Override
@@ -138,9 +153,30 @@ public class TaskFinishedEvent implements HistoryEvent {
+ ", finishTime=" + finishTime
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", successfulAttemptID=" + (successfulAttemptID == null ? "null" :
+ successfulAttemptID.toString())
+ + ", counters=" + ( tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
}
+ public TezTaskID getTaskID() {
+ return taskID;
+ }
+
+ public TaskState getState() {
+ return state;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public TezCounters getTezCounters() {
+ return tezCounters;
+ }
+
+ public TezTaskAttemptID getSuccessfulAttemptID() {
+ return successfulAttemptID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index 9efa357..c2a380b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -91,7 +91,7 @@ public class TaskStartedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return false;
+ return true;
}
@Override
@@ -132,4 +132,16 @@ public class TaskStartedEvent implements HistoryEvent {
+ ", launchTime=" + startTime;
}
+ public TezTaskID getTaskID() {
+ return taskID;
+ }
+
+ public long getScheduledTime() {
+ return scheduledTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
new file mode 100644
index 0000000..564f9ed
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VertexCommitStartedEvent implements HistoryEvent {
+
+ private TezVertexID vertexID;
+
+ public VertexCommitStartedEvent() {
+ }
+
+ public VertexCommitStartedEvent(TezVertexID vertexId) {
+ this.vertexID = vertexId;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_COMMIT_STARTED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexCommitStartedProto toProto() {
+ return VertexCommitStartedProto.newBuilder()
+ .setVertexId(vertexID.toString())
+ .build();
+ }
+
+ public void fromProto(VertexCommitStartedProto proto) {
+ this.vertexID = TezVertexID.fromString(proto.getVertexId());
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "vertexId=" + vertexID;
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 7c2a16f..035c9ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -206,4 +206,12 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
}
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public List<TezEvent> getTezEvents() {
+ return this.events;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 0f2b8a1..2366eb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.ats.EntityTypes;
@@ -45,13 +45,13 @@ public class VertexFinishedEvent implements HistoryEvent {
private long startRequestedTime;
private long startTime;
private long finishTime;
- private VertexStatus.State state;
+ private VertexState state;
private String diagnostics;
private TezCounters tezCounters;
public VertexFinishedEvent(TezVertexID vertexId,
String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
- VertexStatus.State state, String diagnostics,
+ VertexState state, String diagnostics,
TezCounters counters) {
this.vertexName = vertexName;
this.vertexID = vertexId;
@@ -111,24 +111,32 @@ public class VertexFinishedEvent implements HistoryEvent {
}
public VertexFinishedProto toProto() {
- return VertexFinishedProto.newBuilder()
- .setVertexName(vertexName)
+ VertexFinishedProto.Builder builder = VertexFinishedProto.newBuilder();
+ builder.setVertexName(vertexName)
.setVertexId(vertexID.toString())
.setState(state.ordinal())
- .setDiagnostics(diagnostics)
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+ return builder.build();
}
public void fromProto(VertexFinishedProto proto) {
this.vertexName = proto.getVertexName();
this.vertexID = TezVertexID.fromString(proto.getVertexId());
this.finishTime = proto.getFinishTime();
- this.state = VertexStatus.State.values()[proto.getState()];
- this.diagnostics = proto.getDiagnostics();
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
+ this.state = VertexState.values()[proto.getState()];
+ if (proto.hasDiagnostics()) {
+ this.diagnostics = proto.getDiagnostics();
+ }
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ proto.getCounters());
+ }
}
@Override
@@ -154,9 +162,28 @@ public class VertexFinishedEvent implements HistoryEvent {
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", counters=" + ( tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
}
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public VertexState getState() {
+ return this.state;
+ }
+
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public TezCounters getTezCounters() {
+ return tezCounters;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 21c7587..e9e4a8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -18,12 +18,19 @@
package org.apache.tez.dag.history.events;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -31,28 +38,35 @@ import org.codehaus.jettison.json.JSONObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
public class VertexInitializedEvent implements HistoryEvent {
+ private static final Log LOG = LogFactory.getLog(VertexInitializedEvent.class);
+
private TezVertexID vertexID;
private String vertexName;
private long initRequestedTime;
private long initedTime;
- private long numTasks;
+ private int numTasks;
private String processorName;
+ private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs;
public VertexInitializedEvent() {
}
public VertexInitializedEvent(TezVertexID vertexId,
String vertexName, long initRequestedTime, long initedTime,
- long numTasks, String processorName) {
+ int numTasks, String processorName,
+ Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs) {
this.vertexName = vertexName;
this.vertexID = vertexId;
this.initRequestedTime = initRequestedTime;
this.initedTime = initedTime;
this.numTasks = numTasks;
this.processorName = processorName;
+ this.additionalInputs = additionalInputs;
}
@Override
@@ -107,8 +121,23 @@ public class VertexInitializedEvent implements HistoryEvent {
}
public RecoveryProtos.VertexInitializedProto toProto() {
- return RecoveryProtos.VertexInitializedProto.newBuilder()
- .setVertexId(vertexID.toString())
+ VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder();
+ if (additionalInputs != null
+ && !additionalInputs.isEmpty()) {
+ for (RootInputLeafOutputDescriptor<InputDescriptor> input :
+ additionalInputs.values()) {
+ RootInputLeafOutputProto.Builder inputBuilder
+ = RootInputLeafOutputProto.newBuilder();
+ inputBuilder.setName(input.getEntityName());
+ if (input.getInitializerClassName() != null) {
+ inputBuilder.setInitializerClassName(input.getInitializerClassName());
+ }
+ inputBuilder.setEntityDescriptor(
+ DagTypeConverters.convertToDAGPlan(input.getDescriptor()));
+ builder.addInputs(inputBuilder.build());
+ }
+ }
+ return builder.setVertexId(vertexID.toString())
.setVertexName(vertexName)
.setInitRequestedTime(initRequestedTime)
.setInitTime(initedTime)
@@ -122,6 +151,20 @@ public class VertexInitializedEvent implements HistoryEvent {
this.initRequestedTime = proto.getInitRequestedTime();
this.initedTime = proto.getInitTime();
this.numTasks = proto.getNumTasks();
+ if (proto.getInputsCount() > 0) {
+ this.additionalInputs =
+ new LinkedHashMap<String, RootInputLeafOutputDescriptor<InputDescriptor>>();
+ for (RootInputLeafOutputProto inputProto : proto.getInputsList()) {
+ RootInputLeafOutputDescriptor<InputDescriptor> input =
+ new RootInputLeafOutputDescriptor<InputDescriptor>(
+ inputProto.getName(),
+ DagTypeConverters.convertInputDescriptorFromDAGPlan(
+ inputProto.getEntityDescriptor()),
+ inputProto.hasInitializerClassName() ?
+ inputProto.getInitializerClassName() : null);
+ additionalInputs.put(input.getEntityName(), input);
+ }
+ }
}
@Override
@@ -143,7 +186,32 @@ public class VertexInitializedEvent implements HistoryEvent {
+ ", initRequestedTime=" + initRequestedTime
+ ", initedTime=" + initedTime
+ ", numTasks=" + numTasks
- + ", processorName=" + processorName;
+ + ", processorName=" + processorName
+ + ", additionalInputsCount="
+ + (additionalInputs != null ? additionalInputs.size() : 0);
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public long getInitRequestedTime() {
+ return initRequestedTime;
+ }
+
+ public long getInitedTime() {
+ return initedTime;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
}
+ public Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs() {
+ return additionalInputs;
+ }
+
+ public String getProcessorName() {
+ return processorName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
new file mode 100644
index 0000000..43cc787
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class VertexParallelismUpdatedEvent implements HistoryEvent {
+
+ private TezVertexID vertexID;
+ private int numTasks;
+ private VertexLocationHint vertexLocationHint;
+ private Map<String, EdgeManagerDescriptor> sourceEdgeManagers;
+
+ public VertexParallelismUpdatedEvent() {
+ }
+
+ public VertexParallelismUpdatedEvent(TezVertexID vertexID,
+ int numTasks, VertexLocationHint vertexLocationHint,
+ Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+ this.vertexID = vertexID;
+ this.numTasks = numTasks;
+ this.vertexLocationHint = vertexLocationHint;
+ this.sourceEdgeManagers = sourceEdgeManagers;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_PARALLELISM_UPDATED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ throw new UnsupportedOperationException("VertexParallelismUpdatedEvent"
+ + " not a History event");
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexParallelismUpdatedProto toProto() {
+ VertexParallelismUpdatedProto.Builder builder =
+ VertexParallelismUpdatedProto.newBuilder();
+ builder.setVertexId(vertexID.toString())
+ .setNumTasks(numTasks);
+ if (vertexLocationHint != null) {
+ builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
+ this.vertexLocationHint));
+ }
+ if (sourceEdgeManagers != null) {
+ for (Entry<String, EdgeManagerDescriptor> entry :
+ sourceEdgeManagers.entrySet()) {
+ EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
+ EdgeManagerDescriptorProto.newBuilder();
+ edgeMgrBuilder.setEdgeName(entry.getKey());
+ edgeMgrBuilder.setEntityDescriptor(
+ DagTypeConverters.convertToDAGPlan(entry.getValue()));
+ builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
+ }
+ }
+ return builder.build();
+ }
+
+ public void fromProto(VertexParallelismUpdatedProto proto) {
+ this.vertexID = TezVertexID.fromString(proto.getVertexId());
+ this.numTasks = proto.getNumTasks();
+ if (proto.hasVertexLocationHint()) {
+ this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
+ proto.getVertexLocationHint());
+ }
+ if (proto.getEdgeManagerDescriptorsCount() > 0) {
+ this.sourceEdgeManagers = new HashMap<String, EdgeManagerDescriptor>(
+ proto.getEdgeManagerDescriptorsCount());
+ for (EdgeManagerDescriptorProto edgeManagerProto :
+ proto.getEdgeManagerDescriptorsList()) {
+ EdgeManagerDescriptor edgeManagerDescriptor =
+ DagTypeConverters.convertEdgeManagerDescriptorFromDAGPlan(
+ edgeManagerProto.getEntityDescriptor());
+ sourceEdgeManagers.put(edgeManagerProto.getEdgeName(),
+ edgeManagerDescriptor);
+ }
+ }
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "vertexId=" + vertexID
+ + ", numTasks=" + numTasks
+ + ", vertexLocationHint=" +
+ (vertexLocationHint == null? "null" : vertexLocationHint)
+ + ", edgeManagersCount=" +
+ (sourceEdgeManagers == null? "null" : sourceEdgeManagers.size());
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public VertexLocationHint getVertexLocationHint() {
+ return vertexLocationHint;
+ }
+
+ public Map<String, EdgeManagerDescriptor> getSourceEdgeManagers() {
+ return sourceEdgeManagers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 6bb383d..e6023f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -88,7 +88,7 @@ public class VertexStartedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return false;
+ return true;
}
@Override
@@ -128,4 +128,16 @@ public class VertexStartedEvent implements HistoryEvent {
+ ", startedTime=" + startTime;
}
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public long getStartRequestedTime() {
+ return startRequestedTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 807ad81..5986657 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -34,9 +34,11 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +51,7 @@ public class RecoveryService extends AbstractService {
private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
new LinkedBlockingQueue<DAGHistoryEvent>();
private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
+ private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
private Thread eventHandlingThread;
private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -60,9 +63,12 @@ public class RecoveryService extends AbstractService {
Path recoveryPath;
Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
HashMap<TezDAGID, FSDataOutputStream>();
- // FSDataOutputStream metaInfoStream;
private int bufferSize;
private FSDataOutputStream summaryStream;
+ private int unflushedEventsCount = 0;
+ private long lastFlushTime = -1;
+ private int maxUnflushedEvents;
+ private int flushInterval;
public RecoveryService(AppContext appContext) {
super(RecoveryService.class.getName());
@@ -76,11 +82,17 @@ public class RecoveryService extends AbstractService {
recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+
+ flushInterval = conf.getInt(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS,
+ TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT);
+ maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
+ TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
}
@Override
public void serviceStart() {
LOG.info("Starting RecoveryService");
+ lastFlushTime = appContext.getClock().getTime();
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -128,18 +140,21 @@ public class RecoveryService extends AbstractService {
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
}
+
if (summaryStream != null) {
try {
- summaryStream.flush();
+ LOG.info("Closing Summary Stream");
+ summaryStream.hsync();
summaryStream.close();
} catch (IOException ioe) {
LOG.warn("Error when closing summary stream", ioe);
}
}
- for (FSDataOutputStream outputStream : outputStreamMap.values()) {
+ for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
try {
- outputStream.flush();
- outputStream.close();
+ LOG.info("Closing Output Stream for DAG " + entry.getKey());
+ entry.getValue().hsync();
+ entry.getValue().close();
} catch (IOException ioe) {
LOG.warn("Error when closing output stream", ioe);
}
@@ -152,38 +167,44 @@ public class RecoveryService extends AbstractService {
+ event.getHistoryEvent().getEventType());
return;
}
+ HistoryEventType eventType = event.getHistoryEvent().getEventType();
if (!started.get()) {
+ LOG.warn("Adding event of type " + eventType
+ + " to queue as service not started");
eventQueue.add(event);
return;
}
- HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
|| eventType.equals(HistoryEventType.DAG_FINISHED)) {
// handle submissions and completion immediately
synchronized (lock) {
try {
handleEvent(event);
- summaryStream.flush();
+ summaryStream.hsync();
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
- outputStreamMap.get(event.getDagID()).flush();
+ if (outputStreamMap.containsKey(event.getDagID())) {
+ doFlush(outputStreamMap.get(event.getDagID()),
+ appContext.getClock().getTime(), true);
+ }
} else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
completedDAGs.add(event.getDagID());
if (outputStreamMap.containsKey(event.getDagID())) {
try {
- outputStreamMap.get(event.getDagID()).flush();
+ doFlush(outputStreamMap.get(event.getDagID()),
+ appContext.getClock().getTime(), true);
outputStreamMap.get(event.getDagID()).close();
outputStreamMap.remove(event.getDagID());
} catch (IOException ioe) {
LOG.warn("Error when trying to flush/close recovery file for"
+ " dag, dagId=" + event.getDagID());
+ // FIXME handle error ?
}
- } else {
- // TODO this is an error
}
}
} catch (Exception e) {
- // TODO handle failures - treat as fatal or ignore?
- LOG.warn("Error handling recovery event", e);
+ // FIXME handle failures
+ LOG.warn("Error handling recovery event", e);
}
}
LOG.info("DAG completed"
@@ -191,6 +212,9 @@ public class RecoveryService extends AbstractService {
+ ", queueSize=" + eventQueue.size());
} else {
// All other events just get queued
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queueing Recovery event of type " + eventType.name());
+ }
eventQueue.add(event);
}
}
@@ -206,27 +230,42 @@ public class RecoveryService extends AbstractService {
// AM event
// anything to be done?
// TODO
+ LOG.info("Skipping Recovery Event as DAG is null"
+ + ", eventType=" + event.getHistoryEvent().getEventType());
return;
}
TezDAGID dagID = event.getDagID();
- if (completedDAGs.contains(dagID)) {
- // Skip events for completed DAGs
+ if (completedDAGs.contains(dagID)
+ || skippedDAGs.contains(dagID)) {
+ // Skip events for completed and skipped DAGs
// no need to recover completed DAGs
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping Recovery Event as either completed or skipped"
+ + ", dagId=" + dagID
+ + ", completed=" + completedDAGs.contains(dagID)
+ + ", skipped=" + skippedDAGs.contains(dagID)
+ + ", eventType=" + event.getHistoryEvent().getEventType());
+ }
return;
}
try {
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
- || eventType.equals(HistoryEventType.DAG_FINISHED)) {
- if (summaryStream == null) {
- Path summaryPath = new Path(recoveryPath,
- appContext.getApplicationID()
- + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ if (summaryStream == null) {
+ Path summaryPath = new Path(recoveryPath,
+ appContext.getApplicationID()
+ + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ if (!recoveryDirFS.exists(summaryPath)) {
summaryStream = recoveryDirFS.create(summaryPath, false,
bufferSize);
+ } else {
+ summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
}
+ }
+
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
+ || eventType.equals(HistoryEventType.DAG_FINISHED)) {
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
DAGSubmittedEvent dagSubmittedEvent =
(DAGSubmittedEvent) event.getHistoryEvent();
@@ -235,33 +274,97 @@ public class RecoveryService extends AbstractService {
&& dagName.startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
// Skip recording pre-warm DAG events
+ skippedDAGs.add(dagID);
return;
}
- Path dagFilePath = new Path(recoveryPath,
- dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
- FSDataOutputStream outputStream =
- recoveryDirFS.create(dagFilePath, false, bufferSize);
- outputStreamMap.put(dagID, outputStream);
}
+ SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing recovery event to summary stream"
+ + ", dagId=" + dagID
+ + ", type="
+ + event.getHistoryEvent().getEventType());
+ }
+ summaryEvent.toSummaryProtoStream(summaryStream);
+ }
- if (outputStreamMap.containsKey(dagID)) {
- SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
- summaryEvent.toSummaryProtoStream(summaryStream);
+ if (!outputStreamMap.containsKey(dagID)) {
+ Path dagFilePath = new Path(recoveryPath,
+ dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ FSDataOutputStream outputStream;
+ if (recoveryDirFS.exists(dagFilePath)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening DAG recovery file in append mode"
+ + ", filePath=" + dagFilePath);
+ }
+ outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening DAG recovery file in create mode"
+ + ", filePath=" + dagFilePath);
+ }
+ outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
}
+ outputStreamMap.put(dagID, outputStream);
}
FSDataOutputStream outputStream = outputStreamMap.get(dagID);
- if (outputStream == null) {
- return;
- }
- outputStream.write(event.getHistoryEvent().getEventType().ordinal());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing recovery event to output stream"
+ + ", dagId=" + dagID
+ + ", type="
+ + event.getHistoryEvent().getEventType());
+ }
+ ++unflushedEventsCount;
+ outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
event.getHistoryEvent().toProtoStream(outputStream);
+ if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
+ HistoryEventType.DAG_FINISHED).contains(eventType)) {
+ maybeFlush(outputStream);
+ }
} catch (IOException ioe) {
- // TODO handle failures - treat as fatal or ignore?
+ // FIXME handle failures
LOG.warn("Failed to write to stream", ioe);
}
}
+ private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
+ long currentTime = appContext.getClock().getTime();
+ boolean doFlush = false;
+ if (unflushedEventsCount >= maxUnflushedEvents) {
+ doFlush = true;
+ } else if (flushInterval >= 0
+ && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
+ doFlush = true;
+ }
+
+ if (!doFlush) {
+ return;
+ }
+
+ doFlush(outputStream, currentTime, false);
+ }
+
+ private void doFlush(FSDataOutputStream outputStream,
+ long currentTime, boolean sync) throws IOException {
+ if (sync) {
+ outputStream.hsync();
+ } else {
+ outputStream.hflush();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flushing output stream"
+ + ", lastTimeSinceFLush=" + lastFlushTime
+ + ", unflushedEventsCount=" + unflushedEventsCount
+ + ", maxUnflushedEvents=" + maxUnflushedEvents
+ + ", currentTime=" + currentTime);
+ }
+
+ unflushedEventsCount = 0;
+ lastFlushTime = currentTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
deleted file mode 100644
index 5b87bc8..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.recovery;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.AMLaunchedEvent;
-import org.apache.tez.dag.history.events.AMStartedEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
-import org.apache.tez.dag.history.events.DAGFinishedEvent;
-import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.history.events.TaskStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
-import org.apache.tez.dag.history.events.VertexFinishedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexStartedEvent;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-
-public class RecoveryParser {
-
- private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
-
- Path recoveryDirectory;
- FileSystem recoveryDirFS;
-
- public RecoveryParser(Path recoveryDirectory, Configuration conf)
- throws IOException {
- this.recoveryDirectory = recoveryDirectory;
- recoveryDirFS = FileSystem.get(recoveryDirectory.toUri(), conf);
-
- }
-
- public void parse() throws IOException {
- RemoteIterator<LocatedFileStatus> locatedFilesStatus =
- recoveryDirFS.listFiles(recoveryDirectory, false);
- while (locatedFilesStatus.hasNext()) {
- LocatedFileStatus fileStatus = locatedFilesStatus.next();
- String fileName = fileStatus.getPath().getName();
- if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX)) {
- FSDataInputStream inputStream =
- recoveryDirFS.open(fileStatus.getPath());
- LOG.info("Parsing DAG file " + fileName);
- parseDAGRecoveryFile(inputStream);
- } else if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX)) {
- FSDataInputStream inputStream =
- recoveryDirFS.open(fileStatus.getPath());
- LOG.info("Parsing Summary file " + fileName);
- parseSummaryFile(inputStream);
- } else {
- LOG.warn("Encountered unknown file in recovery dir, fileName="
- + fileName);
- continue;
- }
- }
- }
-
- private void parseSummaryFile(FSDataInputStream inputStream)
- throws IOException {
- int counter = 0;
- while (inputStream.available() > 0) {
- RecoveryProtos.SummaryEventProto proto =
- RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
- LOG.info("[SUMMARY]"
- + " dagId=" + proto.getDagId()
- + ", timestamp=" + proto.getTimestamp()
- + ", event=" + HistoryEventType.values()[proto.getEventType()]);
- }
- }
-
- private void parseDAGRecoveryFile(FSDataInputStream inputStream)
- throws IOException {
- int counter = 0;
- while (inputStream.available() > 0) {
- int eventTypeOrdinal = inputStream.read();
- if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
- HistoryEventType.values().length) {
- // Corrupt data
- // reached end
- LOG.warn("Corrupt data found when trying to read next event type");
- break;
- }
- HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
- HistoryEvent event = null;
- switch (eventType) {
- case AM_LAUNCHED:
- event = new AMLaunchedEvent();
- break;
- case AM_STARTED:
- event = new AMStartedEvent();
- break;
- case DAG_SUBMITTED:
- event = new DAGSubmittedEvent();
- break;
- case DAG_INITIALIZED:
- event = new DAGInitializedEvent();
- break;
- case DAG_STARTED:
- event = new DAGStartedEvent();
- break;
- case DAG_FINISHED:
- event = new DAGFinishedEvent();
- break;
- case CONTAINER_LAUNCHED:
- event = new ContainerLaunchedEvent();
- break;
- case VERTEX_INITIALIZED:
- event = new VertexInitializedEvent();
- break;
- case VERTEX_STARTED:
- event = new VertexStartedEvent();
- break;
- case VERTEX_FINISHED:
- event = new VertexFinishedEvent();
- break;
- case TASK_STARTED:
- event = new TaskStartedEvent();
- break;
- case TASK_FINISHED:
- event = new TaskFinishedEvent();
- break;
- case TASK_ATTEMPT_STARTED:
- event = new TaskAttemptStartedEvent();
- break;
- case TASK_ATTEMPT_FINISHED:
- event = new TaskAttemptFinishedEvent();
- break;
- case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
- break;
- default:
- throw new IOException("Invalid data found, unknown event type "
- + eventType);
-
- }
- ++counter;
- LOG.info("Parsing event from input stream"
- + ", eventType=" + eventType
- + ", eventIndex=" + counter);
- event.fromProtoStream(inputStream);
- LOG.info("Parsed event from input stream"
- + ", eventType=" + eventType
- + ", eventIndex=" + counter
- + ", event=" + event.toString());
- }
- }
-
- public static void main(String argv[]) throws IOException {
- // TODO clean up with better usage and error handling
- Configuration conf = new Configuration();
- String dir = argv[0];
- RecoveryParser parser = new RecoveryParser(new Path(dir), conf);
- parser.parse();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e21f5df..65f3aaf 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -56,6 +56,9 @@ message DAGStartedProto {
optional int64 start_time = 3;
}
+message DAGCommitStartedProto {
+ optional string dag_id = 1;
+}
message DAGFinishedProto {
optional string dag_id = 1;
optional int64 finish_time = 2;
@@ -69,7 +72,8 @@ message VertexInitializedProto {
optional string vertex_id = 2;
optional int64 init_requested_time = 3;
optional int64 init_time = 4;
- optional int64 num_tasks = 5;
+ optional int32 num_tasks = 5;
+ repeated RootInputLeafOutputProto inputs = 6;
}
message VertexStartedProto {
@@ -79,6 +83,22 @@ message VertexStartedProto {
optional int64 start_time = 4;
}
+message EdgeManagerDescriptorProto {
+ optional string edge_name = 1;
+ optional TezEntityDescriptorProto entity_descriptor = 2;
+}
+
+message VertexParallelismUpdatedProto {
+ optional string vertex_id = 1;
+ optional int32 num_tasks = 2;
+ optional VertexLocationHintProto vertex_location_hint = 3;
+ repeated EdgeManagerDescriptorProto edge_manager_descriptors = 4;
+}
+
+message VertexCommitStartedProto {
+ optional string vertex_id = 1;
+}
+
message VertexFinishedProto {
optional string vertex_name = 1;
optional string vertex_id = 2;
@@ -100,6 +120,7 @@ message TaskFinishedProto {
optional int32 state = 3;
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
+ optional string successful_task_attempt_id = 6;
}
message TaskAttemptStartedProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
index e965bca..0a3b9e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.api.client;
+import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.dag.VertexState;
import org.junit.Assert;
@@ -32,7 +33,12 @@ public class TestVertexStatusBuilder {
VertexStatusBuilder.getProtoState(state);
VertexStatus.State clientState =
VertexStatus.getState(stateProto);
- Assert.assertEquals(state.name(), clientState.name());
+ if (state.equals(VertexState.RECOVERING)) {
+ Assert.assertEquals(clientState.name(),
+ State.NEW.name());
+ } else {
+ Assert.assertEquals(state.name(), clientState.name());
+ }
}
}
[3/4] TEZ-847. Support basic AM recovery. (hitesh)
Posted by hi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7fe07af..9572f6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -25,6 +25,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -84,6 +85,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
@@ -91,6 +93,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -193,6 +196,14 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminateTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.NEW,
+ EnumSet.of(TaskAttemptStateInternal.NEW,
+ TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptStateInternal.SUCCEEDED),
+ TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
+
.addTransition(TaskAttemptStateInternal.START_WAIT,
TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
@@ -462,6 +473,8 @@ public class TaskAttemptImpl implements TaskAttempt,
.installTopology();
+ private TaskAttemptState recoveredState = TaskAttemptState.NEW;
+ private boolean recoveryStartEventSeen = false;
@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
@@ -782,6 +795,39 @@ public class TaskAttemptImpl implements TaskAttempt,
return isRescheduled;
}
+ @Override
+ public TaskAttemptState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case TASK_ATTEMPT_STARTED:
+ {
+ TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) historyEvent;
+ this.launchTime = tEvent.getStartTime();
+ recoveryStartEventSeen = true;
+ recoveredState = TaskAttemptState.RUNNING;
+ return recoveredState;
+ }
+ case TASK_ATTEMPT_FINISHED:
+ {
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Started Event was encountered earlier");
+ }
+ TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) historyEvent;
+ this.finishTime = tEvent.getFinishTime();
+ this.reportedStatus.counters = tEvent.getCounters();
+ this.reportedStatus.progress = 1f;
+ this.reportedStatus.state = tEvent.getState();
+ this.diagnostics.add(tEvent.getDiagnostics());
+ this.recoveredState = tEvent.getState();
+ return recoveredState;
+ }
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+
+ }
+ }
+
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
this.eventHandler.handle(event);
@@ -1366,7 +1412,46 @@ public class TaskAttemptImpl implements TaskAttempt,
}
}
-
+
+ protected static class RecoverTransition implements
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+
+ @Override
+ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+ TaskAttemptStateInternal endState = TaskAttemptStateInternal.FAILED;
+ switch(taskAttempt.recoveredState) {
+ case NEW:
+ case RUNNING:
+ // FIXME once running containers can be recovered, this
+ // should be handled differently
+ // TODO abort taskattempt
+ taskAttempt.sendEvent(new TaskEventTAUpdate(taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_FAILED));
+ endState = TaskAttemptStateInternal.FAILED;
+ break;
+ case SUCCEEDED:
+ // Do not inform Task as it already knows about completed attempts
+ endState = TaskAttemptStateInternal.SUCCEEDED;
+ break;
+ case FAILED:
+ // Do not inform Task as it already knows about completed attempts
+ endState = TaskAttemptStateInternal.FAILED;
+ break;
+ case KILLED:
+ // Do not inform Task as it already knows about completed attempts
+ endState = TaskAttemptStateInternal.KILLED;
+ break;
+ default:
+ throw new RuntimeException("Failed to recover from non-handled state"
+ + ", taskAttemptId=" + taskAttempt.getID()
+ + ", state=" + taskAttempt.recoveredState);
+ }
+
+ return endState;
+ }
+
+ }
+
protected static class TerminatedAfterSuccessTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 793c12a..16c063a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -24,6 +24,7 @@ import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -63,7 +64,9 @@ import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -74,11 +77,15 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
@@ -130,6 +137,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final SingleArcTransition<TaskImpl, TaskEvent>
ADD_TEZ_EVENT_TRANSITION = new AddTezEventTransition();
+ // Recovery related flags
+ boolean recoveryStartEventSeen = false;
+
private static final StateMachineFactory
<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory
@@ -142,11 +152,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED,
- TaskEventType.T_TERMINATE,
- new KillNewTransition())
+ TaskEventType.T_TERMINATE,
+ new KillNewTransition())
.addTransition(TaskStateInternal.NEW, TaskStateInternal.NEW,
TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
+ // Recover transition
+ .addTransition(TaskStateInternal.NEW,
+ EnumSet.of(TaskStateInternal.NEW,
+ TaskStateInternal.SCHEDULED,
+ TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED,
+ TaskStateInternal.FAILED, TaskStateInternal.KILLED),
+ TaskEventType.T_RECOVER, new RecoverTransition())
// Transitions from SCHEDULED state
//when the first attempt is launched, the task state is set to RUNNING
@@ -154,7 +171,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
TaskEventType.T_TERMINATE,
- KILL_TRANSITION)
+ KILL_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
.addTransition(TaskStateInternal.SCHEDULED,
@@ -190,7 +207,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
KILL_TRANSITION)
.addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION)
-
+ .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+ TaskEventType.T_SCHEDULE)
// Transitions from KILL_WAIT state
.addTransition(TaskStateInternal.KILL_WAIT,
@@ -235,6 +253,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventType.T_TERMINATE,
TaskEventType.T_ATTEMPT_SUCCEEDED, // Maybe track and reuse later
TaskEventType.T_ATTEMPT_LAUNCHED))
+ .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+ TaskEventType.T_SCHEDULE)
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -251,6 +271,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
EnumSet.of(
TaskEventType.T_TERMINATE,
TaskEventType.T_ADD_SPEC_ATTEMPT))
+ .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+ TaskEventType.T_SCHEDULE)
// create the topology tables
.installTopology();
@@ -292,6 +314,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private int finishedAttempts;//finish are total of success, failed and killed
private final boolean leafVertex;
+ private TaskState recoveredState = TaskState.NEW;
@Override
public TaskState getState() {
@@ -507,6 +530,92 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
return diagnostics;
}
+ private TaskAttempt createRecoveredEvent(TaskAttemptStartedEvent
+ taskAttemptStartedEvent) {
+ TaskAttempt taskAttempt = createAttempt(
+ taskAttemptStartedEvent.getTaskAttemptID().getId());
+ return taskAttempt;
+ }
+
+ @Override
+ public TaskState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case TASK_STARTED:
+ {
+ TaskStartedEvent tEvent = (TaskStartedEvent) historyEvent;
+ recoveryStartEventSeen = true;
+ this.scheduledTime = tEvent.getScheduledTime();
+ if (this.attempts == null
+ || this.attempts.isEmpty()) {
+ this.attempts = new LinkedHashMap<TezTaskAttemptID, TaskAttempt>();
+ }
+ recoveredState = TaskState.SCHEDULED;
+ finishedAttempts = 0;
+ return recoveredState;
+ }
+ case TASK_FINISHED:
+ {
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Started Event was encountered earlier");
+ }
+ TaskFinishedEvent tEvent = (TaskFinishedEvent) historyEvent;
+ recoveredState = tEvent.getState();
+ if (tEvent.getState() == TaskState.SUCCEEDED
+ && tEvent.getSuccessfulAttemptID() != null) {
+ successfulAttempt = tEvent.getSuccessfulAttemptID();
+ }
+ return recoveredState;
+ }
+ case TASK_ATTEMPT_STARTED:
+ {
+ TaskAttemptStartedEvent taskAttemptStartedEvent =
+ (TaskAttemptStartedEvent) historyEvent;
+ TaskAttempt recoveredAttempt = createRecoveredEvent(taskAttemptStartedEvent);
+ recoveredAttempt.restoreFromEvent(taskAttemptStartedEvent);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding restored attempt into known attempts map"
+ + ", taskAttemptId=" + taskAttemptStartedEvent.getTaskAttemptID());
+ }
+ this.attempts.put(taskAttemptStartedEvent.getTaskAttemptID(),
+ recoveredAttempt);
+ ++numberUncompletedAttempts;
+ this.recoveredState = TaskState.RUNNING;
+ return recoveredState;
+ }
+ case TASK_ATTEMPT_FINISHED:
+ {
+ finishedAttempts++;
+ --numberUncompletedAttempts;
+ if (numberUncompletedAttempts < 0) {
+ throw new RuntimeException("Invalid recovery event for attempt finished"
+ + ", more completions than starts encountered"
+ + ", finishedAttempts=" + finishedAttempts
+ + ", incompleteAttempts=" + numberUncompletedAttempts);
+ }
+ TaskAttemptFinishedEvent taskAttemptFinishedEvent =
+ (TaskAttemptFinishedEvent) historyEvent;
+ TaskAttempt taskAttempt = this.attempts.get(
+ taskAttemptFinishedEvent.getTaskAttemptID());
+ if (taskAttempt == null) {
+ throw new RuntimeException("Could not find task attempt"
+ + " when trying to recover"
+ + ", taskAttemptId=" + taskAttemptFinishedEvent.getTaskAttemptID());
+ }
+ TaskAttemptState taskAttemptState = taskAttempt.restoreFromEvent(
+ taskAttemptFinishedEvent);
+ if (taskAttemptState.equals(TaskAttemptState.SUCCEEDED)) {
+ recoveredState = TaskState.SUCCEEDED;
+ successfulAttempt = taskAttempt.getID();
+ }
+ return recoveredState;
+ }
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+ }
+ }
+
@VisibleForTesting
public TaskStateInternal getInternalState() {
readLock.lock();
@@ -851,6 +960,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// is called from within a transition
TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
getVertex().getName(), getLaunchTime(), clock.getTime(),
+ successfulAttempt,
TaskState.SUCCEEDED, getCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -858,7 +968,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
- getVertex().getName(), getLaunchTime(), clock.getTime(),
+ getVertex().getName(), getLaunchTime(), clock.getTime(), null,
finalState, getCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(taskId.getVertexID().getDAGId(), finishEvt));
@@ -992,6 +1102,120 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
+ private static class RecoverTransition implements
+ MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
+
+ @Override
+ public TaskStateInternal transition(TaskImpl task, TaskEvent taskEvent) {
+ TaskStateInternal endState = TaskStateInternal.NEW;
+ if (task.attempts != null) {
+ for (TaskAttempt taskAttempt : task.attempts.values()) {
+ task.eventHandler.handle(new TaskAttemptEvent(
+ taskAttempt.getID(), TaskAttemptEventType.TA_RECOVER));
+ }
+ }
+ LOG.info("Trying to recover task"
+ + ", taskId=" + task.getTaskId()
+ + ", recoveredState=" + task.recoveredState);
+ switch(task.recoveredState) {
+ case NEW:
+ // Nothing to do until the vertex schedules this task
+ endState = TaskStateInternal.NEW;
+ break;
+ case SCHEDULED:
+ case RUNNING:
+ case SUCCEEDED:
+ if (task.successfulAttempt != null) {
+ //Found successful attempt
+ //Recover data
+ boolean recoveredData = true;
+ if (task.getVertex().getOutputCommitters() != null
+ && !task.getVertex().getOutputCommitters().isEmpty()) {
+ for (Entry<String, OutputCommitter> entry
+ : task.getVertex().getOutputCommitters().entrySet()) {
+ LOG.info("Recovering data for task from previous DAG attempt"
+ + ", taskId=" + task.getTaskId()
+ + ", output=" + entry.getKey());
+ OutputCommitter committer = entry.getValue();
+ if (!committer.isTaskRecoverySupported()) {
+ LOG.info("Task recovery not supported by committer"
+ + ", failing task attempt"
+ + ", taskId=" + task.getTaskId()
+ + ", attemptId=" + task.successfulAttempt
+ + ", output=" + entry.getKey());
+ recoveredData = false;
+ break;
+ }
+ try {
+ committer.recoverTask(task.getTaskId().getId(),
+ task.appContext.getApplicationAttemptId().getAttemptId()-1);
+ } catch (Exception e) {
+ LOG.warn("Task recovery failed by committer"
+ + ", taskId=" + task.getTaskId()
+ + ", attemptId=" + task.successfulAttempt
+ + ", output=" + entry.getKey(), e);
+ recoveredData = false;
+ break;
+ }
+ }
+ }
+ if (!recoveredData) {
+ task.successfulAttempt = null;
+ } else {
+ LOG.info("Recovered a successful attempt"
+ + ", taskAttemptId=" + task.successfulAttempt.toString());
+ task.logJobHistoryTaskFinishedEvent();
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId,
+ getExternalState(TaskStateInternal.SUCCEEDED)));
+ task.eventHandler.handle(
+ new VertexEventTaskAttemptCompleted(
+ task.successfulAttempt, TaskAttemptStateInternal.SUCCEEDED));
+ endState = TaskStateInternal.SUCCEEDED;
+ break;
+ }
+ }
+
+ if (endState != TaskStateInternal.SUCCEEDED &&
+ task.attempts.size() >= task.maxAttempts) {
+ // Exceeded max attempts
+ task.finished(TaskStateInternal.FAILED);
+ endState = TaskStateInternal.FAILED;
+ break;
+ }
+
+ // no successful attempt and all attempts completed
+ // schedule a new one
+ // If any incomplete, the running attempt will moved to failed and its
+ // update will trigger a new attempt if possible
+ if (task.attempts.size() == task.finishedAttempts) {
+ task.addAndScheduleAttempt();
+ }
+ endState = TaskStateInternal.RUNNING;
+ break;
+ case KILLED:
+ // Nothing to do
+ // Inform vertex
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId,
+ getExternalState(TaskStateInternal.KILLED)));
+ endState = TaskStateInternal.KILLED;
+ break;
+ case FAILED:
+ // Nothing to do
+ // Inform vertex
+ task.eventHandler.handle(
+ new VertexEventTaskCompleted(task.taskId,
+ getExternalState(TaskStateInternal.FAILED)));
+
+ endState = TaskStateInternal.FAILED;
+ break;
+ }
+
+ return endState;
+ }
+ }
+
private static class KillWaitAttemptCompletedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index d3e07cb..67f978a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -52,11 +52,9 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -94,13 +92,16 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventSourceTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexRecovered;
import org.apache.tez.dag.app.dag.event.VertexEventSourceVertexStarted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
@@ -110,9 +111,12 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TezDAGID;
@@ -185,6 +189,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private int numStartedSourceVertices = 0;
private int numInitedSourceVertices = 0;
+ private int numRecoveredSourceVertices = 0;
+
private int distanceFromRoot = 0;
private final List<String> diagnostics = new ArrayList<String>();
@@ -209,6 +215,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new SourceTaskAttemptCompletedEventTransition();
+ private VertexState recoveredState = VertexState.NEW;
+ private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
+
protected static final
StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
stateMachineFactory
@@ -222,6 +231,33 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
VertexState.INITIALIZING, VertexState.FAILED),
VertexEventType.V_INIT,
new InitTransition())
+ .addTransition
+ (VertexState.NEW,
+ EnumSet.of(VertexState.NEW, VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.RUNNING,
+ VertexState.SUCCEEDED, VertexState.FAILED,
+ VertexState.KILLED, VertexState.ERROR,
+ VertexState.RECOVERING),
+ VertexEventType.V_RECOVER,
+ new StartRecoverTransition())
+ .addTransition
+ (VertexState.RECOVERING,
+ EnumSet.of(VertexState.NEW, VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.RUNNING,
+ VertexState.SUCCEEDED, VertexState.FAILED,
+ VertexState.KILLED, VertexState.ERROR,
+ VertexState.RECOVERING),
+ VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+ new RecoverTransition())
+ .addTransition
+ (VertexState.NEW,
+ EnumSet.of(VertexState.INITED,
+ VertexState.INITIALIZING, VertexState.RUNNING,
+ VertexState.SUCCEEDED, VertexState.FAILED,
+ VertexState.KILLED, VertexState.ERROR,
+ VertexState.RECOVERING),
+ VertexEventType.V_SOURCE_VERTEX_RECOVERED,
+ new RecoverTransition())
.addTransition(VertexState.NEW, VertexState.NEW,
VertexEventType.V_SOURCE_VERTEX_STARTED,
new SourceVertexStartedTransition())
@@ -367,6 +403,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// reruns.
VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
VertexEventType.V_TASK_COMPLETED))
+ .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
+ VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+ new TaskAttemptCompletedEventTransition())
// Transitions from FAILED state
.addTransition(
@@ -491,6 +530,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private VertexTerminationCause terminationCause;
private String logIdentifier;
+ private boolean recoveryCommitInProgress = false;
+ private Map<String,EdgeManagerDescriptor> recoveredSourceEdgeManagers = null;
+
+ // Recovery related flags
+ boolean recoveryInitEventSeen = false;
+ boolean recoveryStartEventSeen = false;
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, Configuration conf, EventHandler eventHandler,
@@ -515,7 +560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.vertexLocationHint = vertexLocationHint;
if (LOG.isDebugEnabled()) {
- logLocationHints(this.vertexLocationHint);
+ logLocationHints(this.vertexName, this.vertexLocationHint);
}
this.dagUgi = appContext.getCurrentDAG().getDagUGI();
@@ -800,6 +845,104 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return this.appContext;
}
+ private void handleParallelismUpdate(int newParallelism,
+ Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+ LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+ Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+ .iterator();
+ int i = 0;
+ while (iter.hasNext()) {
+ i++;
+ Map.Entry<TezTaskID, Task> entry = iter.next();
+ if (i <= newParallelism) {
+ continue;
+ }
+ iter.remove();
+ }
+ this.recoveredSourceEdgeManagers =
+ sourceEdgeManagers;
+ }
+
+ @Override
+ public VertexState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case VERTEX_INITIALIZED:
+ recoveryInitEventSeen = true;
+ recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
+ createTasks();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after Init event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_STARTED:
+ if (!recoveryInitEventSeen) {
+ throw new RuntimeException("Started Event seen but"
+ + " no Init Event was encountered earlier");
+ }
+ recoveryStartEventSeen = true;
+ VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
+ startTimeRequested = startedEvent.getStartRequestedTime();
+ startedTime = startedEvent.getStartTime();
+ recoveredState = VertexState.RUNNING;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after Started event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_PARALLELISM_UPDATED:
+ VertexParallelismUpdatedEvent updatedEvent =
+ (VertexParallelismUpdatedEvent) historyEvent;
+ if (updatedEvent.getVertexLocationHint() != null) {
+ vertexLocationHint = updatedEvent.getVertexLocationHint();
+ }
+ numTasks = updatedEvent.getNumTasks();
+ handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after parallelism updated event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_COMMIT_STARTED:
+ if (recoveredState != VertexState.RUNNING) {
+ throw new RuntimeException("Commit Started Event seen but"
+ + " recovered state is not RUNNING"
+ + ", recoveredState=" + recoveredState);
+ }
+ recoveryCommitInProgress = true;
+ return recoveredState;
+ case VERTEX_FINISHED:
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Started Event was encountered earlier");
+ }
+ recoveryCommitInProgress = false;
+ VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
+ recoveredState = finishedEvent.getState();
+ diagnostics.add(finishedEvent.getDiagnostics());
+ finishTime = finishedEvent.getFinishTime();
+ // TODO counters ??
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Recovered state for vertex after finished event"
+ + ", vertex=" + logIdentifier
+ + ", recoveredState=" + recoveredState);
+ }
+ return recoveredState;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ VertexDataMovementEventsGeneratedEvent vEvent =
+ (VertexDataMovementEventsGeneratedEvent) historyEvent;
+ this.recoveredEvents.addAll(vEvent.getTezEvents());
+ return recoveredState;
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+
+ }
+ }
+
// TODO Create InputReadyVertexManager that schedules when there is something
// to read and use that as default instead of ImmediateStart.TEZ-480
@Override
@@ -829,8 +972,40 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
- writeLock.lock();
+ return setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, false);
+ }
+
+ private boolean setParallelism(int parallelism, VertexLocationHint vertexLocationHint,
+ Map<String, EdgeManagerDescriptor> sourceEdgeManagers,
+ boolean recovering) {
+ if (recovering) {
+ writeLock.lock();
+ try {
+ if (sourceEdgeManagers != null) {
+ for(Map.Entry<String, EdgeManagerDescriptor> entry :
+ sourceEdgeManagers.entrySet()) {
+ LOG.info("Recovering edge manager for source:"
+ + entry.getKey() + " destination: " + getVertexId());
+ Vertex sourceVertex = appContext.getCurrentDAG().getVertex(entry.getKey());
+ Edge edge = sourceVertices.get(sourceVertex);
+ try {
+ edge.setCustomEdgeManager(entry.getValue());
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize edge manager for edge"
+ + ", sourceVertexName=" + sourceVertex.getName()
+ + ", destinationVertexName=" + edge.getDestinationVertexName(),
+ e);
+ return false;
+ }
+ }
+ }
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
setVertexLocationHint(vertexLocationHint);
+ writeLock.lock();
try {
if (parallelismSet == true) {
LOG.info("Parallelism can only be set dynamically once per vertex");
@@ -928,7 +1103,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
}
}
-
+
+ VertexParallelismUpdatedEvent parallelismUpdatedEvent =
+ new VertexParallelismUpdatedEvent(vertexId, numTasks,
+ vertexLocationHint,
+ sourceEdgeManagers);
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getDAGId(),
+ parallelismUpdatedEvent));
+
// stop buffering events
for (Edge edge : sourceVertices.values()) {
edge.stopEventBuffering();
@@ -962,7 +1144,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
try {
this.vertexLocationHint = vertexLocationHint;
if (LOG.isDebugEnabled()) {
- logLocationHints(this.vertexLocationHint);
+ logLocationHints(this.vertexName, this.vertexLocationHint);
}
} finally {
writeLock.unlock();
@@ -1039,7 +1221,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
void logJobHistoryVertexInitializedEvent() {
VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
initTimeRequested, initedTime, numTasks,
- getProcessorName());
+ getProcessorName(), getAdditionalInputs());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGId(), initEvt));
}
@@ -1055,13 +1237,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
this.setFinishTime();
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
- startedTime, finishTime, VertexStatus.State.SUCCEEDED, "",
+ startedTime, finishTime, VertexState.SUCCEEDED, "",
getAllCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGId(), finishEvt));
}
- void logJobHistoryVertexFailedEvent(VertexStatus.State state) {
+ void logJobHistoryVertexFailedEvent(VertexState state) {
VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId,
vertexName, initTimeRequested, initedTime, startTimeRequested,
startedTime, clock.getTime(), state, StringUtils.join(LINE_SEPARATOR,
@@ -1073,7 +1255,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
static VertexState checkVertexForCompletion(final VertexImpl vertex) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Checking for vertex completion"
+ LOG.debug("Checking for vertex completion for "
+ + vertex.logIdentifier
+ + ", numTasks=" + vertex.numTasks
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1084,6 +1268,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
//check for vertex failure first
if (vertex.completedTaskCount > vertex.tasks.size()) {
LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
+ + " for vertex " + vertex.logIdentifier
+ + ", numTasks=" + vertex.numTasks
+ ", failedTaskCount=" + vertex.failedTaskCount
+ ", killedTaskCount=" + vertex.killedTaskCount
+ ", successfulTaskCount=" + vertex.succeededTaskCount
@@ -1096,6 +1282,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
LOG.info("Vertex succeeded: " + vertex.logIdentifier);
try {
+ if (vertex.outputCommitters != null) {
+ vertex.appContext.getHistoryHandler().handle(
+ new DAGHistoryEvent(vertex.getDAGId(),
+ new VertexCommitStartedEvent(vertex.vertexId)));
+ }
if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
// commit only once. Dont commit shared outputs
LOG.info("Invoking committer commit for vertex, vertexId="
@@ -1197,20 +1388,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
if (finishTime == 0) setFinishTime();
switch (finalState) {
- case KILLED:
- eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
- finalState, terminationCause));
- logJobHistoryVertexFailedEvent(VertexStatus.State.KILLED);
- break;
case ERROR:
eventHandler.handle(new DAGEvent(getDAGId(),
DAGEventType.INTERNAL_ERROR));
- logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+ logJobHistoryVertexFailedEvent(finalState);
break;
+ case KILLED:
case FAILED:
eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
finalState, terminationCause));
- logJobHistoryVertexFailedEvent(VertexStatus.State.FAILED);
+ logJobHistoryVertexFailedEvent(finalState);
break;
case SUCCEEDED:
eventHandler.handle(new DAGEventVertexCompleted(getVertexId(),
@@ -1227,58 +1414,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return finished(finalState, null);
}
- private VertexState initializeVertex() {
+
+ private void initializeCommitters() throws Exception {
if (!this.additionalOutputSpecs.isEmpty()) {
- try {
- LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
- for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
+ LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
+ for (Entry<String, RootInputLeafOutputDescriptor<OutputDescriptor>> entry:
additionalOutputs.entrySet()) {
- final String outputName = entry.getKey();
- final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
- if (od.getInitializerClassName() == null
+ final String outputName = entry.getKey();
+ final RootInputLeafOutputDescriptor<OutputDescriptor> od = entry.getValue();
+ if (od.getInitializerClassName() == null
|| od.getInitializerClassName().isEmpty()) {
- LOG.info("Ignoring committer as none specified for output="
- + outputName
+ LOG.info("Ignoring committer as none specified for output="
+ + outputName
+ + ", vertexId=" + logIdentifier);
+ continue;
+ }
+ LOG.info("Instantiating committer for output=" + outputName
+ + ", vertexId=" + logIdentifier
+ + ", committerClass=" + od.getInitializerClassName());
+
+ dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
+ od.getInitializerClassName());
+ OutputCommitterContext outputCommitterContext =
+ new OutputCommitterContextImpl(appContext.getApplicationID(),
+ appContext.getApplicationAttemptId().getAttemptId(),
+ appContext.getCurrentDAG().getName(),
+ vertexName,
+ outputName,
+ od.getDescriptor().getUserPayload(),
+ vertexId.getId());
+
+ LOG.info("Invoking committer init for output=" + outputName
+ ", vertexId=" + logIdentifier);
- continue;
+ outputCommitter.initialize(outputCommitterContext);
+ outputCommitters.put(outputName, outputCommitter);
+ LOG.info("Invoking committer setup for output=" + outputName
+ + ", vertexId=" + logIdentifier);
+ outputCommitter.setupOutput();
+ return null;
}
- LOG.info("Instantiating committer for output=" + outputName
- + ", vertexId=" + logIdentifier
- + ", committerClass=" + od.getInitializerClassName());
-
- dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws Exception {
- OutputCommitter outputCommitter = RuntimeUtils.createClazzInstance(
- od.getInitializerClassName());
- OutputCommitterContext outputCommitterContext =
- new OutputCommitterContextImpl(appContext.getApplicationID(),
- appContext.getApplicationAttemptId().getAttemptId(),
- appContext.getCurrentDAG().getName(),
- vertexName,
- outputName,
- od.getDescriptor().getUserPayload());
-
- LOG.info("Invoking committer init for output=" + outputName
- + ", vertexId=" + logIdentifier);
- outputCommitter.initialize(outputCommitterContext);
- outputCommitters.put(outputName, outputCommitter);
- LOG.info("Invoking committer setup for output=" + outputName
- + ", vertexId=" + logIdentifier);
- outputCommitter.setupOutput();
- return null;
- }
- });
- }
- } catch (Exception e) {
- LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
- addDiagnostic("Vertex init failed : "
- + StringUtils.stringifyException(e));
- trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
- abortVertex(VertexStatus.State.FAILED);
- return finished(VertexState.FAILED);
+ });
}
}
+ }
+
+ private VertexState initializeVertex() {
+ try {
+ initializeCommitters();
+ } catch (Exception e) {
+ LOG.warn("Vertex Committer init failed, vertexId=" + logIdentifier, e);
+ addDiagnostic("Vertex init failed : "
+ + StringUtils.stringifyException(e));
+ trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
+ abortVertex(VertexStatus.State.FAILED);
+ return finished(VertexState.FAILED);
+ }
+
// TODO: Metrics
initedTime = clock.getTime();
@@ -1329,130 +1523,645 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
- public static class InitTransition implements
- MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+ private VertexState setupVertex() {
+ return setupVertex(null);
+ }
- @Override
- public VertexState transition(VertexImpl vertex, VertexEvent event) {
- VertexState vertexState = VertexState.NEW;
- vertex.numInitedSourceVertices++;
- if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
- vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
- vertexState = handleInitEvent(vertex, event);
- if (vertexState != VertexState.FAILED) {
- if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
- for (Vertex target : vertex.targetVertices.keySet()) {
- vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
- VertexEventType.V_INIT));
- }
- }
+ private VertexState setupVertex(VertexInitializedEvent event) {
+
+ if (event == null) {
+ initTimeRequested = clock.getTime();
+ } else {
+ initTimeRequested = event.getInitRequestedTime();
+ initedTime = event.getInitedTime();
+ }
+
+ // VertexManager needs to be setup before attempting to Initialize any
+ // Inputs - since events generated by them will be routed to the
+ // VertexManager for handling.
+
+ if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
+ List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
+ for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
+ if (groupInfo.edgeMergedInputs.containsKey(getName())) {
+ InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
+ groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
+ Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
}
}
- return vertexState;
+ if (!groupSpecList.isEmpty()) {
+ groupInputSpecList = groupSpecList;
+ }
}
- private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
- vertex.initTimeRequested = vertex.clock.getTime();
-
- // VertexManager needs to be setup before attempting to Initialize any
- // Inputs - since events generated by them will be routed to the
- // VertexManager for handling.
-
- if (vertex.dagVertexGroups != null && !vertex.dagVertexGroups.isEmpty()) {
- List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
- for (VertexGroupInfo groupInfo : vertex.dagVertexGroups.values()) {
- if (groupInfo.edgeMergedInputs.containsKey(vertex.getName())) {
- InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(vertex.getName());
- groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
- Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
+ // Check if any inputs need initializers
+ if (event != null) {
+ this.additionalInputs = event.getAdditionalInputs();
+ if (additionalInputs != null) {
+ // FIXME References to descriptor kept in both objects
+ for (InputSpec inputSpec : this.additionalInputSpecs) {
+ if (additionalInputs.containsKey(inputSpec.getSourceVertexName())
+ && additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor() != null) {
+ inputSpec.setInputDescriptor(
+ additionalInputs.get(inputSpec.getSourceVertexName()).getDescriptor());
}
}
- if (!groupSpecList.isEmpty()) {
- vertex.groupInputSpecList = groupSpecList;
- }
}
-
- // Check if any inputs need initializers
- if (vertex.additionalInputs != null) {
- LOG.info("Root Inputs exist for Vertex: " + vertex.getName() + " : "
- + vertex.additionalInputs);
- for (RootInputLeafOutputDescriptor<InputDescriptor> input : vertex.additionalInputs.values()) {
+ } else {
+ if (additionalInputs != null) {
+ LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+ + additionalInputs);
+ for (RootInputLeafOutputDescriptor<InputDescriptor> input : additionalInputs.values()) {
if (input.getInitializerClassName() != null) {
- if (vertex.inputsWithInitializers == null) {
- vertex.inputsWithInitializers = Sets.newHashSet();
+ if (inputsWithInitializers == null) {
+ inputsWithInitializers = Sets.newHashSet();
}
- vertex.inputsWithInitializers.add(input.getEntityName());
+ inputsWithInitializers.add(input.getEntityName());
LOG.info("Starting root input initializer for input: "
+ input.getEntityName() + ", with class: ["
+ input.getInitializerClassName() + "]");
}
}
}
+ }
+
+ boolean hasBipartite = false;
+ if (sourceVertices != null) {
+ for (Edge edge : sourceVertices.values()) {
+ if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
+ hasBipartite = true;
+ break;
+ }
+ }
+ }
+
+ if (hasBipartite && inputsWithInitializers != null) {
+ LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
+ if (event != null) {
+ return VertexState.FAILED;
+ } else {
+ return finished(VertexState.FAILED);
+ }
+ }
+
+ boolean hasUserVertexManager = vertexPlan.hasVertexManagerPlugin();
+
+ if (hasUserVertexManager) {
+ VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
+ .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan
+ .getVertexManagerPlugin());
+ LOG.info("Setting user vertex manager plugin: "
+ + pluginDesc.getClassName() + " on vertex: " + getName());
+ vertexManager = new VertexManager(pluginDesc, this, appContext);
+ } else {
+ if (hasBipartite) {
+ // setup vertex manager
+ // TODO this needs to consider data size and perhaps API.
+ // Currently implicitly BIPARTITE is the only edge type
+ LOG.info("Setting vertexManager to ShuffleVertexManager for "
+ + logIdentifier);
+ vertexManager = new VertexManager(new ShuffleVertexManager(),
+ this, appContext);
+ } else if (inputsWithInitializers != null) {
+ LOG.info("Setting vertexManager to RootInputVertexManager for "
+ + logIdentifier);
+ vertexManager = new VertexManager(new RootInputVertexManager(),
+ this, appContext);
+ } else {
+ // schedule all tasks upon vertex start. Default behavior.
+ LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+ + logIdentifier);
+ vertexManager = new VertexManager(
+ new ImmediateStartVertexManager(), this, appContext);
+ }
+ }
+
+ vertexManager.initialize();
+
+ // Setup tasks early if possible. If the VertexManager is not being used
+ // to set parallelism, sending events to Tasks is safe (and less confusing
+ // then relying on tasks to be created after TaskEvents are generated).
+ // For VertexManagers setting parallelism, the setParallelism call needs
+ // to be inline.
+ if (event != null) {
+ numTasks = event.getNumTasks();
+ } else {
+ numTasks = getVertexPlan().getTaskConfig().getNumTasks();
+ }
+
+ if (!(numTasks == -1 || numTasks >= 0)) {
+ addDiagnostic("Invalid task count for vertex"
+ + ", numTasks=" + numTasks);
+ trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
+ if (event != null) {
+ abortVertex(VertexStatus.State.FAILED);
+ return finished(VertexState.FAILED);
+ } else {
+ return VertexState.FAILED;
+ }
+ }
+
+ checkTaskLimits();
+ return VertexState.INITED;
+ }
+
+ public static class StartRecoverTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+ VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vertexEvent;
+ VertexState desiredState = recoverEvent.getDesiredState();
- boolean hasBipartite = false;
- if (vertex.sourceVertices != null) {
- for (Edge edge : vertex.sourceVertices.values()) {
- if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
- hasBipartite = true;
+ switch (desiredState) {
+ case RUNNING:
+ break;
+ case SUCCEEDED:
+ case KILLED:
+ case FAILED:
+ case ERROR:
+ switch (desiredState) {
+ case SUCCEEDED:
+ vertex.succeededTaskCount = vertex.numTasks;
+ vertex.completedTaskCount = vertex.numTasks;
+ break;
+ case KILLED:
+ vertex.killedTaskCount = vertex.numTasks;
+ break;
+ case FAILED:
+ case ERROR:
+ vertex.failedTaskCount = vertex.numTasks;
+ break;
+ }
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (desiredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ case ERROR:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ }
+ LOG.info("DAG informed Vertex of its final completed state"
+ + ", vertex=" + vertex.logIdentifier
+ + ", state=" + desiredState);
+ return desiredState;
+ default:
+ LOG.info("Unhandled desired state provided by DAG"
+ + ", vertex=" + vertex.logIdentifier
+ + ", state=" + desiredState);
+ vertex.finished(VertexState.ERROR);
+ }
+
+ VertexState endState;
+ switch (vertex.recoveredState) {
+ case NEW:
+ // Trigger init and start as desired state is RUNNING
+ // Drop all root events
+ Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getEventType().equals(
+ EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+ iterator.remove();
+ }
+ }
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_INIT));
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ endState = VertexState.NEW;
+ break;
+ case INITED:
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+
+ // Recover tasks
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ }
+ // Update tasks with their input payloads as needed
+
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ if (vertex.getInputVertices().isEmpty()) {
+ endState = VertexState.INITED;
+ } else {
+ endState = VertexState.RECOVERING;
+ }
+ break;
+ case RUNNING:
+ vertex.tasksNotYetScheduled = false;
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
break;
}
+
+ // if commit in progress and desired state is not a succeeded one,
+ // move to failed
+ if (vertex.recoveryCommitInProgress) {
+ LOG.info("Recovered vertex was in the middle of a commit"
+ + ", failing Vertex=" + vertex.logIdentifier);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.COMMIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ assert vertex.tasks.size() == vertex.numTasks;
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = VertexState.SUCCEEDED;
+ vertex.finished(endState);
+ }
+ break;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ vertex.tasksNotYetScheduled = false;
+ // recover tasks
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (vertex.recoveredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = vertex.recoveredState;
+ vertex.finished(endState);
+ }
+ break;
+ default:
+ LOG.warn("Invalid recoveredState found when trying to recover"
+ + " vertex, recoveredState=" + vertex.recoveredState);
+ vertex.finished(VertexState.ERROR);
+ endState = VertexState.ERROR;
+ break;
+ }
+ if (!endState.equals(VertexState.RECOVERING)) {
+ LOG.info("Recovered Vertex State"
+ + ", vertexId=" + vertex.logIdentifier
+ + ", state=" + endState
+ + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices
+ + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+ + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+ + ", recoveredEvents="
+ + ( vertex.recoveredEvents == null ? "null" : vertex.recoveredEvents.size())
+ + ", tasksIsNull=" + (vertex.tasks == null)
+ + ", numTasks=" + ( vertex.tasks == null ? "null" : vertex.tasks.size()));
+ for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+ vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+ entry.getKey().getVertexId(),
+ vertex.vertexId, endState, null));
}
}
-
- if (hasBipartite && vertex.inputsWithInitializers != null) {
- LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
- return vertex.finished(VertexState.FAILED);
+ if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+ .contains(endState)) {
+ // Send events downstream
+ vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+ vertex.recoveredEvents.clear();
+ } else {
+ // Ensure no recovered events
+ if (!vertex.recoveredEvents.isEmpty()) {
+ throw new RuntimeException("Invalid Vertex state"
+ + ", found non-zero recovered events in invalid state"
+ + ", recoveredState=" + endState
+ + ", recoveredEvents=" + vertex.recoveredEvents.size());
+ }
}
-
- boolean hasUserVertexManager = vertex.vertexPlan.hasVertexManagerPlugin();
-
- if (hasUserVertexManager) {
- VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
- .convertVertexManagerPluginDescriptorFromDAGPlan(vertex.vertexPlan
- .getVertexManagerPlugin());
- LOG.info("Setting user vertex manager plugin: "
- + pluginDesc.getClassName() + " on vertex: " + vertex.getName());
- vertex.vertexManager = new VertexManager(pluginDesc, vertex, vertex.appContext);
+ return endState;
+ }
+
+ }
+
+ private void routeRecoveredEvents(VertexState vertexState,
+ List<TezEvent> tezEvents) {
+ for (TezEvent tezEvent : tezEvents) {
+ EventMetaData sourceMeta = tezEvent.getSourceInfo();
+ TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
+ if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
+ ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
+ ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ } else if (tezEvent.getEventType() == EventType.INPUT_FAILED_EVENT) {
+ ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
+ } else if (tezEvent.getEventType() == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
+ if (vertexState == VertexState.RUNNING
+ || vertexState == VertexState.INITED) {
+ // Only routed if vertex is still running
+ eventHandler.handle(new VertexEventRouteEvent(
+ this.getVertexId(), Collections.singletonList(tezEvent), true));
+ }
+ continue;
+ }
+
+ Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
+ Edge destEdge = targetVertices.get(destVertex);
+ if (destEdge == null) {
+ throw new TezUncheckedException("Bad destination vertex: " +
+ sourceMeta.getEdgeVertexName() + " for event vertex: " +
+ getVertexId());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Routing recovered event"
+ + ", eventType=" + tezEvent.getEventType()
+ + ", sourceInfo=" + sourceMeta
+ + ", destinationVertex" + destVertex.getName());
+ }
+ eventHandler.handle(new VertexEventRouteEvent(destVertex
+ .getVertexId(), Collections.singletonList(tezEvent), true));
+ }
+ }
+
+ public static class RecoverTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
+ VertexEventSourceVertexRecovered sourceRecoveredEvent =
+ (VertexEventSourceVertexRecovered) vertexEvent;
+ ++vertex.numRecoveredSourceVertices;
+
+ switch (sourceRecoveredEvent.getSourceVertexState()) {
+ case NEW:
+ // Nothing to do
+ break;
+ case INITED:
+ ++vertex.numInitedSourceVertices;
+ break;
+ case RUNNING:
+ case SUCCEEDED:
+ ++vertex.numInitedSourceVertices;
+ ++vertex.numStartedSourceVertices;
+ if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) {
+ vertex.pendingReportedSrcCompletions.addAll(
+ sourceRecoveredEvent.getCompletedTaskAttempts());
+ }
+ break;
+ case FAILED:
+ case KILLED:
+ case ERROR:
+ // Nothing to do
+ // Recover as if source vertices have not inited/started
+ break;
+ default:
+ LOG.warn("Received invalid SourceVertexRecovered event"
+ + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID()
+ + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState());
+ return vertex.finished(VertexState.ERROR);
+ }
+
+ if (vertex.numRecoveredSourceVertices !=
+ vertex.getInputVerticesCount()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for source vertices to recover"
+ + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+ + ", totalSourceVertices=" + vertex.getInputVerticesCount());
+ }
+ return VertexState.RECOVERING;
+ }
+
+
+ // Complete recovery
+ VertexState endState = VertexState.NEW;
+ List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList();
+ switch (vertex.recoveredState) {
+ case NEW:
+ // Drop all root events if not inited properly
+ Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getEventType().equals(
+ EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+ iterator.remove();
+ }
+ }
+ // Trigger init if all sources initialized
+ if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) {
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_INIT));
+ }
+ if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ }
+ endState = VertexState.NEW;
+ break;
+ case INITED:
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ if (!vertex.setParallelism(0,
+ null, vertex.recoveredSourceEdgeManagers, true)) {
+ LOG.info("Failed to recover edge managers");
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ // Recover tasks
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ }
+ if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+ vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+ VertexEventType.V_START));
+ }
+ endState = VertexState.INITED;
+ break;
+ case RUNNING:
+ vertex.tasksNotYetScheduled = false;
+ // if commit in progress and desired state is not a succeeded one,
+ // move to failed
+ if (vertex.recoveryCommitInProgress) {
+ LOG.info("Recovered vertex was in the middle of a commit"
+ + ", failing Vertex=" + vertex.logIdentifier);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.COMMIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ try {
+ vertex.initializeCommitters();
+ } catch (Exception e) {
+ LOG.info("Failed to initialize committers", e);
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ if (!vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers, true)) {
+ LOG.info("Failed to recover edge managers");
+ vertex.finished(VertexState.FAILED,
+ VertexTerminationCause.INIT_FAILURE);
+ endState = VertexState.FAILED;
+ break;
+ }
+ assert vertex.tasks.size() == vertex.numTasks;
+ if (vertex.tasks != null) {
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId()));
+ }
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = VertexState.SUCCEEDED;
+ vertex.finished(endState);
+ }
+ break;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ vertex.tasksNotYetScheduled = false;
+ // recover tasks
+ assert vertex.tasks.size() == vertex.numTasks;
+ if (vertex.tasks != null) {
+ TaskState taskState = TaskState.KILLED;
+ switch (vertex.recoveredState) {
+ case SUCCEEDED:
+ taskState = TaskState.SUCCEEDED;
+ break;
+ case KILLED:
+ taskState = TaskState.KILLED;
+ break;
+ case FAILED:
+ taskState = TaskState.FAILED;
+ break;
+ }
+ for (Task task : vertex.tasks.values()) {
+ vertex.eventHandler.handle(
+ new TaskEventRecoverTask(task.getTaskId(),
+ taskState));
+ }
+ // Wait for all tasks to recover and report back
+ vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
+ endState = VertexState.RUNNING;
+ } else {
+ endState = vertex.recoveredState;
+ vertex.finished(endState);
+ }
+ break;
+ default:
+ LOG.warn("Invalid recoveredState found when trying to recover"
+ + " vertex, recoveredState=" + vertex.recoveredState);
+ vertex.finished(VertexState.ERROR);
+ endState = VertexState.ERROR;
+ break;
+ }
+
+ LOG.info("Recovered Vertex State"
+ + ", vertexId=" + vertex.logIdentifier
+ + ", state=" + endState
+ + ", numInitedSourceVertices" + vertex.numInitedSourceVertices
+ + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
+ + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
+ + ", tasksIsNull=" + (vertex.tasks == null)
+ + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size()));
+
+ for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
+ vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
+ entry.getKey().getVertexId(),
+ vertex.vertexId, endState, completedTaskAttempts));
+ }
+ if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
+ .contains(endState)) {
+ // Send events downstream
+ vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
+ vertex.recoveredEvents.clear();
} else {
- if (hasBipartite) {
- // setup vertex manager
- // TODO this needs to consider data size and perhaps API.
- // Currently implicitly BIPARTITE is the only edge type
- LOG.info("Setting vertexManager to ShuffleVertexManager for "
- + vertex.logIdentifier);
- vertex.vertexManager = new VertexManager(new ShuffleVertexManager(),
- vertex, vertex.appContext);
- } else if (vertex.inputsWithInitializers != null) {
- LOG.info("Setting vertexManager to RootInputVertexManager for "
- + vertex.logIdentifier);
- vertex.vertexManager = new VertexManager(new RootInputVertexManager(),
- vertex, vertex.appContext);
- } else {
- // schedule all tasks upon vertex start. Default behavior.
- LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
- + vertex.logIdentifier);
- vertex.vertexManager = new VertexManager(
- new ImmediateStartVertexManager(), vertex, vertex.appContext);
+ // Ensure no recovered events
+ if (!vertex.recoveredEvents.isEmpty()) {
+ throw new RuntimeException("Invalid Vertex state"
+ + ", found non-zero recovered events in invalid state"
+ + ", recoveredState=" + endState
+ + ", recoveredEvents=" + vertex.recoveredEvents.size());
}
}
-
- vertex.vertexManager.initialize();
-
- // Setup tasks early if possible. If the VertexManager is not being used
- // to set parallelism, sending events to Tasks is safe (and less confusing
- // then relying on tasks to be created after TaskEvents are generated).
- // For VertexManagers setting parallelism, the setParallelism call needs
- // to be inline.
- vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
- if (!(vertex.numTasks == -1 || vertex.numTasks >= 0)) {
- vertex.addDiagnostic("Invalid task count for vertex"
- + ", numTasks=" + vertex.numTasks);
- vertex.trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
- vertex.abortVertex(VertexStatus.State.FAILED);
- return vertex.finished(VertexState.FAILED);
+ return endState;
+ }
+
+ }
+
+ public static class InitTransition implements
+ MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+ @Override
+ public VertexState transition(VertexImpl vertex, VertexEvent event) {
+ VertexState vertexState = VertexState.NEW;
+ vertex.numInitedSourceVertices++;
+ if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
+ vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+ vertexState = handleInitEvent(vertex, event);
+ if (vertexState != VertexState.FAILED) {
+ if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
+ for (Vertex target : vertex.targetVertices.keySet()) {
+ vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
+ VertexEventType.V_INIT));
+ }
+ }
+ }
}
+ return vertexState;
+ }
- vertex.checkTaskLimits();
+ private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
+ VertexState state = vertex.setupVertex();
+ if (state.equals(VertexState.FAILED)) {
+ return state;
+ }
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
@@ -1862,7 +2571,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.numSuccessSourceAttemptCompletions++;
if (vertex.getState() == VertexState.RUNNING) {
vertex.vertexManager.onSourceTaskCompleted(completionEvent
- .getTaskAttemptId());
+ .getTaskAttemptId().getTaskID());
} else {
vertex.pendingReportedSrcCompletions.add(completionEvent.getTaskAttemptId());
}
@@ -2034,9 +2743,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
@Override
public void transition(VertexImpl vertex, VertexEvent event) {
VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
+ boolean recovered = rEvent.isRecovered();
List<TezEvent> tezEvents = rEvent.getEvents();
if (vertex.getAppContext().isRecoveryEnabled()
+ && !recovered
&& !tezEvents.isEmpty()) {
List<TezEvent> dataMovementEvents =
Lists.newArrayList();
@@ -2061,7 +2772,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
for(TezEvent tezEvent : tezEvents) {
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: " + vertex.getName() + " routing event: "
- + tezEvent.getEventType());
+ + tezEvent.getEventType()
+ + " Recovered:" + recovered);
}
EventMetaData sourceMeta = tezEvent.getSourceInfo();
switch(tezEvent.getEventType()) {
@@ -2074,7 +2786,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
- } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
+ } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
} else {
((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
@@ -2396,7 +3108,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
return this.vertexManager;
}
- private static void logLocationHints(VertexLocationHint locationHint) {
+ private static void logLocationHints(String vertexName,
+ VertexLocationHint locationHint) {
+ if (locationHint == null) {
+ LOG.debug("No Vertex LocationHint specified for vertex=" + vertexName);
+ return;
+ }
Multiset<String> hosts = HashMultiset.create();
Multiset<String> racks = HashMultiset.create();
int counter = 0;
@@ -2421,18 +3138,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
sb.append(rack).append(", ");
}
}
- LOG.debug("Location: " + counter + " : " + sb.toString());
+ LOG.debug("Vertex: " + vertexName + ", Location: "
+ + counter + " : " + sb.toString());
counter++;
}
- LOG.debug("Host Counts");
+ LOG.debug("Vertex: " + vertexName + ", Host Counts");
for (Multiset.Entry<String> host : hosts.entrySet()) {
- LOG.debug("host: " + host.toString());
+ LOG.debug("Vertex: " + vertexName + ", host: " + host.toString());
}
- LOG.debug("Rack Counts");
+ LOG.debug("Vertex: " + vertexName + ", Rack Counts");
for (Multiset.Entry<String> rack : racks.entrySet()) {
- LOG.debug("rack: " + rack.toString());
+ LOG.debug("Vertex: " + vertexName + ", rack: " + rack.toString());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index e08e7ed..6f75481 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -147,6 +147,11 @@ public class VertexManager {
managedVertex.setVertexLocationHint(locationHint);
}
+ @Override
+ public int getDAGAttemptNumber() {
+ return appContext.getApplicationAttemptId().getAttemptId();
+ }
+
private void verifyIsRootInput(String inputName) {
Preconditions.checkState(managedVertex.getAdditionalInputs().get(inputName) != null,
"Cannot add events for non-root inputs");
@@ -222,23 +227,24 @@ public class VertexManager {
public void onVertexStarted(List<TezTaskAttemptID> completions) {
Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
- for (TezTaskAttemptID attemptId : completions) {
- TezTaskID tezTaskId = attemptId.getTaskID();
- Integer taskId = new Integer(tezTaskId.getId());
- String vertexName =
- appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
- List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
- if (taskIdList == null) {
- taskIdList = Lists.newArrayList();
- pluginCompletionsMap.put(vertexName, taskIdList);
+ if (completions != null && !completions.isEmpty()) {
+ for (TezTaskAttemptID tezTaskAttemptID : completions) {
+ Integer taskId = new Integer(tezTaskAttemptID.getTaskID().getId());
+ String vertexName =
+ appContext.getCurrentDAG().getVertex(
+ tezTaskAttemptID.getTaskID().getVertexID()).getName();
+ List<Integer> taskIdList = pluginCompletionsMap.get(vertexName);
+ if (taskIdList == null) {
+ taskIdList = Lists.newArrayList();
+ pluginCompletionsMap.put(vertexName, taskIdList);
+ }
+ taskIdList.add(taskId);
}
- taskIdList.add(taskId);
}
plugin.onVertexStarted(pluginCompletionsMap);
}
- public void onSourceTaskCompleted(TezTaskAttemptID attemptId) {
- TezTaskID tezTaskId = attemptId.getTaskID();
+ public void onSourceTaskCompleted(TezTaskID tezTaskId) {
Integer taskId = new Integer(tezTaskId.getId());
String vertexName =
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index a71686b..7b2087a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -27,11 +27,14 @@ public enum HistoryEventType {
DAG_FINISHED,
VERTEX_INITIALIZED,
VERTEX_STARTED,
+ VERTEX_PARALLELISM_UPDATED,
VERTEX_FINISHED,
TASK_STARTED,
TASK_FINISHED,
TASK_ATTEMPT_STARTED,
TASK_ATTEMPT_FINISHED,
CONTAINER_LAUNCHED,
- VERTEX_DATA_MOVEMENT_EVENTS_GENERATED
+ VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
+ DAG_COMMIT_STARTED,
+ VERTEX_COMMIT_STARTED
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
index 4ec0632..690e850 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
@@ -22,16 +22,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.recovery.RecoveryService;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 4794a7b..54bc658 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -98,7 +98,7 @@ public class AMLaunchedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return true;
+ return false;
}
@Override
@@ -139,4 +139,16 @@ public class AMLaunchedEvent implements HistoryEvent {
fromProto(proto);
}
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public long getAppSubmitTime() {
+ return appSubmitTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index b3cbb5c..e66141b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -129,4 +129,13 @@ public class AMStartedEvent implements HistoryEvent {
fromProto(proto);
}
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 066f315..471ddd1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -138,4 +138,16 @@ public class ContainerLaunchedEvent implements HistoryEvent {
+ ", launchTime=" + launchTime;
}
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public long getLaunchTime() {
+ return launchTime;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
}
[4/4] git commit: TEZ-847. Support basic AM recovery. (hitesh)
Posted by hi...@apache.org.
TEZ-847. Support basic AM recovery. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5b464f27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5b464f27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5b464f27
Branch: refs/heads/master
Commit: 5b464f27da273723e422607518c271cd3f040560
Parents: 18290c8
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Mar 5 15:34:53 2014 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Mar 5 15:34:53 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/tez/client/TezSession.java | 2 -
.../apache/tez/dag/api/TezConfiguration.java | 10 +-
.../tez/dag/api/VertexManagerPluginContext.java | 5 +
.../apache/tez/runtime/api/OutputCommitter.java | 18 +
.../tez/runtime/api/OutputCommitterContext.java | 10 +
tez-api/src/main/proto/DAGApiRecords.proto | 15 +-
.../tez/dag/api/client/VertexStatusBuilder.java | 2 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 72 +-
.../org/apache/tez/dag/app/RecoveryParser.java | 613 ++++++++++
.../java/org/apache/tez/dag/app/dag/DAG.java | 3 +
.../java/org/apache/tez/dag/app/dag/Task.java | 3 +
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 6 +-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 5 +
.../org/apache/tez/dag/app/dag/VertexState.java | 1 +
.../tez/dag/app/dag/event/DAGEventType.java | 6 +-
.../dag/app/dag/event/TaskAttemptEventType.java | 3 +
.../dag/app/dag/event/TaskEventRecoverTask.java | 41 +
.../tez/dag/app/dag/event/TaskEventType.java | 6 +-
.../app/dag/event/VertexEventRecoverVertex.java | 36 +
.../app/dag/event/VertexEventRouteEvent.java | 14 +-
.../event/VertexEventSourceVertexRecovered.java | 56 +
.../tez/dag/app/dag/event/VertexEventType.java | 8 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 465 +++++---
.../dag/impl/OutputCommitterContextImpl.java | 10 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 87 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 234 +++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 1054 +++++++++++++++---
.../tez/dag/app/dag/impl/VertexManager.java | 30 +-
.../tez/dag/history/HistoryEventType.java | 5 +-
.../apache/tez/dag/history/ats/ATSService.java | 7 -
.../tez/dag/history/events/AMLaunchedEvent.java | 14 +-
.../tez/dag/history/events/AMStartedEvent.java | 9 +
.../history/events/ContainerLaunchedEvent.java | 12 +
.../history/events/DAGCommitStartedEvent.java | 94 ++
.../dag/history/events/DAGFinishedEvent.java | 67 +-
.../dag/history/events/DAGInitializedEvent.java | 7 +
.../tez/dag/history/events/DAGStartedEvent.java | 7 +
.../dag/history/events/DAGSubmittedEvent.java | 11 +
.../events/TaskAttemptFinishedEvent.java | 51 +-
.../history/events/TaskAttemptStartedEvent.java | 17 +-
.../dag/history/events/TaskFinishedEvent.java | 56 +-
.../dag/history/events/TaskStartedEvent.java | 14 +-
.../events/VertexCommitStartedEvent.java | 94 ++
.../VertexDataMovementEventsGeneratedEvent.java | 8 +
.../dag/history/events/VertexFinishedEvent.java | 59 +-
.../history/events/VertexInitializedEvent.java | 78 +-
.../events/VertexParallelismUpdatedEvent.java | 159 +++
.../dag/history/events/VertexStartedEvent.java | 14 +-
.../dag/history/recovery/RecoveryService.java | 171 ++-
.../apache/tez/dag/recovery/RecoveryParser.java | 186 ----
tez-dag/src/main/proto/HistoryEvents.proto | 23 +-
.../dag/api/client/TestVertexStatusBuilder.java | 8 +-
.../TestHistoryEventsProtoConversion.java | 569 ++++++++++
tez-dist/pom.xml | 12 +-
.../mapreduce/committer/MROutputCommitter.java | 34 +-
.../tez/runtime/api/impl/EventMetaData.java | 2 +-
.../apache/tez/runtime/api/impl/InputSpec.java | 4 +
.../vertexmanager/ShuffleVertexManager.java | 4 +-
.../org/apache/tez/test/TestDAGRecovery.java | 135 +++
.../apache/tez/test/dag/MultiAttemptDAG.java | 177 +++
60 files changed, 4259 insertions(+), 664 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 8f3101e..9e29910 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -116,8 +116,6 @@ public class TezSession {
sessionConfig.getTezConfiguration(), applicationId,
null, sessionName, sessionConfig.getAMConfiguration(),
tezJarResources, sessionCredentials);
- // Set Tez Sessions to not retry on AM crashes
- appContext.setMaxAppAttempts(1);
yarnClient.submitApplication(appContext);
} catch (YarnException e) {
throw new TezException(e);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 142d2d9..29d04ab 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -369,12 +369,20 @@ public class TezConfiguration extends Configuration {
public static final String DAG_RECOVERY_ENABLED =
TEZ_PREFIX + "dag.recovery.enabled";
- public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = false;
+ public static final boolean DAG_RECOVERY_ENABLED_DEFAULT = true;
public static final String DAG_RECOVERY_FILE_IO_BUFFER_SIZE =
TEZ_PREFIX + "dag.recovery.io.buffer.size";
public static final int DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT = 8192;
+ public static final String DAG_RECOVERY_MAX_UNFLUSHED_EVENTS =
+ TEZ_PREFIX + "dag.recovery.max.unflushed.events";
+ public static final int DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT = 100;
+
+ public static final String DAG_RECOVERY_FLUSH_INTERVAL_SECS =
+ TEZ_PREFIX + "dag.recovery.flush.interval.secs";
+ public static final int DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT = 30;
+
public static final String DAG_RECOVERY_DATA_DIR_NAME = "recovery";
public static final String DAG_RECOVERY_SUMMARY_FILE_SUFFIX = ".summary";
public static final String DAG_RECOVERY_RECOVER_FILE_SUFFIX = ".recovery";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index a281b90..76202f8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -125,4 +125,9 @@ public interface VertexManagerPluginContext {
* @param locationHint
*/
public void setVertexLocationHint(VertexLocationHint locationHint);
+
+ /**
+ * @return DAG Attempt number
+ */
+ public int getDAGAttemptNumber();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
index 301d01b..aadbf12 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
@@ -69,4 +69,22 @@ public abstract class OutputCommitter {
public abstract void abortOutput(VertexStatus.State finalState)
throws Exception;
+ /**
+ * Whether the OutputCommitter supports recovery of output from a Task
+ * that completed in a previous DAG attempt
+ * @return True if recovery supported
+ */
+ public boolean isTaskRecoverySupported() {
+ return true;
+ }
+
+ /**
+ * Recover task output from a previous DAG attempt
+ * @param taskIndex Index of task to be recovered
+ * @param previousDAGAttempt Previous DAG Attempt Number
+ * @throws java.lang.Exception
+ */
+ public void recoverTask(int taskIndex, int previousDAGAttempt) throws Exception {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
index 3132758..b5837e8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
@@ -18,6 +18,8 @@
package org.apache.tez.runtime.api;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
@@ -63,4 +65,12 @@ public interface OutputCommitterContext {
*/
public byte[] getUserPayload();
+ /**
+ * Get Vertex Index in the DAG
+ * @return Vertex index
+ */
+ @Unstable
+ @Evolving
+ public int getVertexIndex();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 5faa7f1..c7a317e 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -171,13 +171,14 @@ message ProgressProto {
enum VertexStatusStateProto {
VERTEX_NEW = 0;
VERTEX_INITIALIZING = 1;
- VERTEX_INITED = 2;
- VERTEX_RUNNING = 3;
- VERTEX_SUCCEEDED = 4;
- VERTEX_FAILED = 5;
- VERTEX_KILLED = 6;
- VERTEX_ERROR = 7;
- VERTEX_TERMINATING = 8;
+ VERTEX_RECOVERING = 2;
+ VERTEX_INITED = 3;
+ VERTEX_RUNNING = 4;
+ VERTEX_SUCCEEDED = 5;
+ VERTEX_FAILED = 6;
+ VERTEX_KILLED = 7;
+ VERTEX_ERROR = 8;
+ VERTEX_TERMINATING = 9;
}
message VertexStatusProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
index 0e693b8..dc24f7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java
@@ -66,6 +66,8 @@ public class VertexStatusBuilder extends VertexStatus {
return VertexStatusStateProto.VERTEX_NEW;
case INITIALIZING:
return VertexStatusStateProto.VERTEX_INITIALIZING;
+ case RECOVERING:
+ return VertexStatusStateProto.VERTEX_NEW;
case INITED:
return VertexStatusStateProto.VERTEX_INITED;
case RUNNING:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 1ce07fb..c8185b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -48,6 +48,8 @@ import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -220,6 +222,8 @@ public class DAGAppMaster extends AbstractService {
private FileSystem recoveryFS;
private int recoveryBufferSize;
+ protected boolean isLastAMRetry = false;
+
// DAG Counter
private final AtomicInteger dagCounter = new AtomicInteger();
@@ -262,6 +266,14 @@ public class DAGAppMaster extends AbstractService {
@Override
public synchronized void serviceInit(final Configuration conf) throws Exception {
+ int maxAppAttempts = 1;
+ String maxAppAttemptsEnv = System.getenv(
+ ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
+ if (maxAppAttemptsEnv != null) {
+ maxAppAttempts = Integer.valueOf(maxAppAttemptsEnv);
+ }
+ isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+
this.amConf = conf;
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@@ -481,6 +493,11 @@ public class DAGAppMaster extends AbstractService {
}
}
+ public void setCurrentDAG(DAG currentDAG) {
+ this.currentDAG = currentDAG;
+ context.setDAG(currentDAG);
+ }
+
private class DAGAppMasterEventHandler implements
EventHandler<DAGAppMasterEvent> {
@Override
@@ -539,7 +556,7 @@ public class DAGAppMaster extends AbstractService {
}
/** Create and initialize (but don't start) a single dag. */
- protected DAG createDAG(DAGPlan dagPB, TezDAGID dagId) {
+ DAGImpl createDAG(DAGPlan dagPB, TezDAGID dagId) {
if (dagId == null) {
dagId = TezDAGID.getInstance(appAttemptID.getApplicationId(),
dagCounter.incrementAndGet());
@@ -566,7 +583,7 @@ public class DAGAppMaster extends AbstractService {
TokenCache.setSessionToken(sessionToken, dagCredentials);
// create single dag
- DAG newDag =
+ DAGImpl newDag =
new DAGImpl(dagId, dagConf, dagPB, dispatcher.getEventHandler(),
taskAttemptListener, dagCredentials, clock,
appMasterUgi.getShortUserName(),
@@ -989,6 +1006,7 @@ public class DAGAppMaster extends AbstractService {
return TezSessionStatus.INITIALIZING;
case IDLE:
return TezSessionStatus.READY;
+ case RECOVERING:
case RUNNING:
return TezSessionStatus.RUNNING;
case ERROR:
@@ -1328,6 +1346,31 @@ public class DAGAppMaster extends AbstractService {
}
}
+ private DAG recoverDAG() throws IOException {
+ DAG recoveredDAG = null;
+ if (recoveryEnabled) {
+ if (this.appAttemptID.getAttemptId() > 1) {
+ this.state = DAGAppMasterState.RECOVERING;
+ RecoveryParser recoveryParser = new RecoveryParser(
+ this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
+ recoveredDAG = recoveryParser.parseRecoveryData();
+ if (recoveredDAG != null) {
+ LOG.info("Found DAG to recover, dagId=" + recoveredDAG.getID());
+ _updateLoggers(recoveredDAG, "");
+ DAGEvent recoverDAGEvent = new DAGEvent(recoveredDAG.getID(),
+ DAGEventType.DAG_RECOVER);
+ dagEventDispatcher.handle(recoverDAGEvent);
+ this.state = DAGAppMasterState.RUNNING;
+ } else {
+ LOG.info("No DAG to recover");
+ this.state = DAGAppMasterState.IDLE;
+ }
+ }
+ }
+ return recoveredDAG;
+ }
+
+ @SuppressWarnings("unchecked")
@Override
public synchronized void serviceStart() throws Exception {
@@ -1348,18 +1391,18 @@ public class DAGAppMaster extends AbstractService {
this.lastDAGCompletionTime = clock.getTime();
if (!isSession) {
- startDAG();
+ DAG recoveredDAG = null;
+ if (appAttemptID.getAttemptId() != 1) {
+ recoveredDAG = recoverDAG();
+ }
+ if (recoveredDAG == null) {
+ dagCounter.set(0);
+ startDAG();
+ }
} else {
LOG.info("In Session mode. Waiting for DAG over RPC");
this.state = DAGAppMasterState.IDLE;
-
- if (recoveryEnabled) {
- if (this.appAttemptID.getAttemptId() > 0) {
- // Recovery data and copy over into new recovery dir
- this.state = DAGAppMasterState.IDLE;
- // TODO
- }
- }
+ recoverDAG();
this.dagSubmissionTimer = new Timer(true);
this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
@@ -1570,6 +1613,9 @@ public class DAGAppMaster extends AbstractService {
}
appMaster.stop();
+
+
+
}
}
@@ -1672,4 +1718,8 @@ public class DAGAppMaster extends AbstractService {
private void sendEvent(Event<?> event) {
dispatcher.getEventHandler().handle(event);
}
+
+ synchronized void setDAGCounter(int dagCounter) {
+ this.dagCounter.set(dagCounter);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
new file mode 100644
index 0000000..9e59849
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class RecoveryParser {
+
+ private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
+
+ private final DAGAppMaster dagAppMaster;
+ private final FileSystem recoveryFS;
+ private final Path recoveryDataDir;
+ private final Path currentAttemptRecoveryDataDir;
+ private final int recoveryBufferSize;
+ private final int currentAttemptId;
+
+ private static final String dataRecoveredFileFlag = "dataRecovered";
+
+ public RecoveryParser(DAGAppMaster dagAppMaster,
+ FileSystem recoveryFS,
+ Path recoveryDataDir,
+ int currentAttemptId) {
+ this.dagAppMaster = dagAppMaster;
+ this.recoveryFS = recoveryFS;
+ this.recoveryDataDir = recoveryDataDir;
+ this.currentAttemptId = currentAttemptId;
+ this.currentAttemptRecoveryDataDir =
+ getAttemptRecoveryDataDir(recoveryDataDir, currentAttemptId);
+ recoveryBufferSize = dagAppMaster.getConfig().getInt(
+ TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
+ TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+ }
+
+ private static void parseSummaryFile(FSDataInputStream inputStream)
+ throws IOException {
+ while (inputStream.available() > 0) {
+ RecoveryProtos.SummaryEventProto proto =
+ RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+ LOG.info("[SUMMARY]"
+ + " dagId=" + proto.getDagId()
+ + ", timestamp=" + proto.getTimestamp()
+ + ", event=" + HistoryEventType.values()[proto.getEventType()]);
+ }
+ }
+
+ private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
+ throws IOException {
+ int eventTypeOrdinal = inputStream.readInt();
+ if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
+ HistoryEventType.values().length) {
+ // Corrupt data
+ // reached end
+ throw new IOException("Corrupt data found when trying to read next event type"
+ + ", eventTypeOrdinal=" + eventTypeOrdinal);
+ }
+ HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
+ HistoryEvent event;
+ switch (eventType) {
+ case AM_LAUNCHED:
+ event = new AMLaunchedEvent();
+ break;
+ case AM_STARTED:
+ event = new AMStartedEvent();
+ break;
+ case DAG_SUBMITTED:
+ event = new DAGSubmittedEvent();
+ break;
+ case DAG_INITIALIZED:
+ event = new DAGInitializedEvent();
+ break;
+ case DAG_STARTED:
+ event = new DAGStartedEvent();
+ break;
+ case DAG_COMMIT_STARTED:
+ event = new DAGCommitStartedEvent();
+ break;
+ case DAG_FINISHED:
+ event = new DAGFinishedEvent();
+ break;
+ case CONTAINER_LAUNCHED:
+ event = new ContainerLaunchedEvent();
+ break;
+ case VERTEX_INITIALIZED:
+ event = new VertexInitializedEvent();
+ break;
+ case VERTEX_STARTED:
+ event = new VertexStartedEvent();
+ break;
+ case VERTEX_PARALLELISM_UPDATED:
+ event = new VertexParallelismUpdatedEvent();
+ break;
+ case VERTEX_COMMIT_STARTED:
+ event = new VertexCommitStartedEvent();
+ break;
+ case VERTEX_FINISHED:
+ event = new VertexFinishedEvent();
+ break;
+ case TASK_STARTED:
+ event = new TaskStartedEvent();
+ break;
+ case TASK_FINISHED:
+ event = new TaskFinishedEvent();
+ break;
+ case TASK_ATTEMPT_STARTED:
+ event = new TaskAttemptStartedEvent();
+ break;
+ case TASK_ATTEMPT_FINISHED:
+ event = new TaskAttemptFinishedEvent();
+ break;
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ event = new VertexDataMovementEventsGeneratedEvent();
+ break;
+ default:
+ throw new IOException("Invalid data found, unknown event type "
+ + eventType);
+
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parsing event from input stream"
+ + ", eventType=" + eventType);
+ }
+ event.fromProtoStream(inputStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parsed event from input stream"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ }
+ return event;
+ }
+
+
+
+
+
+ private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
+ throws IOException {
+ while (inputStream.available() > 0) {
+ HistoryEvent historyEvent = getNextEvent(inputStream);
+ LOG.info("Parsed event from recovery stream"
+ + ", eventType=" + historyEvent.getEventType()
+ + ", event=" + historyEvent);
+ }
+ }
+
+ private Path getAttemptRecoveryDataDir(Path recoveryDataDir,
+ int attemptId) {
+ return new Path(recoveryDataDir, Integer.toString(attemptId));
+ }
+
+ public static void main(String argv[]) throws IOException {
+ // TODO clean up with better usage and error handling
+ Configuration conf = new Configuration();
+ String summaryPath = argv[0];
+ List<String> dagPaths = new ArrayList<String>();
+ if (argv.length > 1) {
+ for (int i = 1; i < argv.length; ++i) {
+ dagPaths.add(argv[i]);
+ }
+ }
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Parsing Summary file " + summaryPath);
+ parseSummaryFile(fs.open(new Path(summaryPath)));
+ for (String dagPath : dagPaths) {
+ LOG.info("Parsing DAG recovery file " + dagPath);
+ parseDAGRecoveryFile(fs.open(new Path(dagPath)));
+ }
+ }
+
+ private Path getSummaryPath(Path recoveryDataDir) {
+ return new Path(recoveryDataDir,
+ dagAppMaster.getAttemptID().getApplicationId().toString()
+ + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ }
+
+ private FSDataOutputStream getSummaryOutputStream(Path summaryPath)
+ throws IOException {
+ return recoveryFS.create(summaryPath, true, recoveryBufferSize);
+ }
+
+ private FSDataInputStream getSummaryStream(Path summaryPath)
+ throws IOException {
+ if (!recoveryFS.exists(summaryPath)) {
+ return null;
+ }
+ return recoveryFS.open(summaryPath, recoveryBufferSize);
+ }
+
+ private Path getDAGRecoveryFilePath(Path recoveryDataDir,
+ TezDAGID dagID) {
+ return new Path(recoveryDataDir,
+ dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ }
+
+ private FSDataInputStream getDAGRecoveryStream(Path recoveryDataDir,
+ TezDAGID dagID)
+ throws IOException {
+ Path dagRecoveryPath = getDAGRecoveryFilePath(recoveryDataDir, dagID);
+ if (!recoveryFS.exists(dagRecoveryPath)) {
+ return null;
+ }
+ return recoveryFS.open(dagRecoveryPath, recoveryBufferSize);
+ }
+
+ private FSDataOutputStream getDAGRecoveryOutputStream(Path recoveryDataDir,
+ TezDAGID dagID)
+ throws IOException {
+ Path dagRecoveryPath = new Path(recoveryDataDir,
+ dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ return recoveryFS.create(dagRecoveryPath, true, recoveryBufferSize);
+ }
+
+ private TezDAGID getLastInProgressDAG(Map<TezDAGID, Boolean> seenDAGs) {
+ TezDAGID inProgressDAG = null;
+ for (Map.Entry<TezDAGID, Boolean> entry : seenDAGs.entrySet()) {
+ if (!entry.getValue().booleanValue()) {
+ if (inProgressDAG != null) {
+ throw new RuntimeException("Multiple in progress DAGs seen"
+ + ", dagId=" + inProgressDAG
+ + ", dagId=" + entry.getKey());
+ }
+ inProgressDAG = entry.getKey();
+ }
+ }
+ return inProgressDAG;
+ }
+
+ private Path getPreviousAttemptRecoveryDataDir() {
+ int foundPreviousAttempt = -1;
+ for (int i = currentAttemptId - 1; i > 0; --i) {
+ Path attemptPath = getAttemptRecoveryDataDir(recoveryDataDir, i);
+ Path dataRecoveredFile = new Path(attemptPath, dataRecoveredFileFlag);
+ try {
+ if (recoveryFS.exists(dataRecoveredFile)) {
+ foundPreviousAttempt = i;
+ break;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception when checking previous attempt dir for "
+ + dataRecoveredFile.toString(), e);
+ }
+ }
+ if (foundPreviousAttempt == -1) {
+ LOG.info("Falling back to first attempt as no other recovered attempts"
+ + " found");
+ foundPreviousAttempt = 1;
+ }
+
+ return getAttemptRecoveryDataDir(recoveryDataDir, foundPreviousAttempt);
+ }
+
+
+ public DAG parseRecoveryData() throws IOException {
+ Path previousAttemptRecoveryDataDir = getPreviousAttemptRecoveryDataDir();
+ LOG.info("Using " + previousAttemptRecoveryDataDir.toString()
+ + " for recovering data from previous attempt");
+ if (!recoveryFS.exists(previousAttemptRecoveryDataDir)) {
+ LOG.info("Nothing to recover as previous attempt data does not exist"
+ + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString());
+ return null;
+ }
+
+ Path summaryPath = getSummaryPath(previousAttemptRecoveryDataDir);
+ FSDataInputStream summaryStream = getSummaryStream(
+ summaryPath);
+ if (summaryStream == null) {
+ LOG.info("Nothing to recover as summary file does not exist"
+ + ", previousAttemptDir=" + previousAttemptRecoveryDataDir.toString()
+ + ", summaryPath=" + summaryPath.toString());
+ return null;
+ }
+
+ Path newSummaryPath = getSummaryPath(currentAttemptRecoveryDataDir);
+ FSDataOutputStream newSummaryStream =
+ getSummaryOutputStream(newSummaryPath);
+
+ Map<TezDAGID, Boolean> seenDAGs = new TreeMap<TezDAGID, Boolean>();
+
+ FileStatus summaryFileStatus = recoveryFS.getFileStatus(summaryPath);
+ LOG.info("Parsing summary file"
+ + ", path=" + summaryPath.toString()
+ + ", len=" + summaryFileStatus.getLen()
+ + ", lastModTime=" + summaryFileStatus.getModificationTime());
+
+ int dagCounter = 0;
+ while (summaryStream.available() > 0) {
+ RecoveryProtos.SummaryEventProto proto =
+ RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+ HistoryEventType eventType =
+ HistoryEventType.values()[proto.getEventType()];
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[RECOVERY SUMMARY]"
+ + " dagId=" + proto.getDagId()
+ + ", timestamp=" + proto.getTimestamp()
+ + ", event=" + eventType);
+ }
+ TezDAGID dagId = TezDAGID.fromString(proto.getDagId());
+ if (dagCounter < dagId.getId()) {
+ dagCounter = dagId.getId();
+ }
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+ seenDAGs.put(dagId, false);
+ } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+ seenDAGs.put(dagId, true);
+ }
+ proto.writeDelimitedTo(newSummaryStream);
+ }
+ newSummaryStream.hsync();
+ newSummaryStream.close();
+
+ // Set counter for next set of DAGs
+ dagAppMaster.setDAGCounter(dagCounter);
+
+ TezDAGID lastInProgressDAG = getLastInProgressDAG(seenDAGs);
+ if (lastInProgressDAG == null) {
+ LOG.info("Nothing to recover as no uncompleted DAGs found");
+ return null;
+ }
+
+ LOG.info("Trying to recover dag from recovery file"
+ + ", dagId=" + lastInProgressDAG.toString()
+ + ", dataDir=" + previousAttemptRecoveryDataDir
+ + ", intoCurrentDir=" + currentAttemptRecoveryDataDir);
+
+ FSDataInputStream dagRecoveryStream = getDAGRecoveryStream(
+ previousAttemptRecoveryDataDir, lastInProgressDAG);
+ if (dagRecoveryStream == null) {
+ // Could not find data to recover
+ // Error out
+ throw new IOException("Could not find recovery data for last in progress DAG"
+ + ", dagId=" + lastInProgressDAG);
+ }
+
+ DAGImpl recoveredDAG = null;
+
+ LOG.info("Copying DAG data into Current Attempt directory"
+ + ", filePath=" + getDAGRecoveryFilePath(currentAttemptRecoveryDataDir,
+ lastInProgressDAG));
+ FSDataOutputStream newDAGRecoveryStream =
+ getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
+
+ while (dagRecoveryStream.available() > 0) {
+ HistoryEvent event;
+ try {
+ event = getNextEvent(dagRecoveryStream);
+ } catch (IOException ioe) {
+ LOG.warn("Corrupt data found when trying to read next event", ioe);
+ break;
+ }
+ if (event == null) {
+ // reached end of data
+ break;
+ }
+ HistoryEventType eventType = event.getEventType();
+ switch (eventType) {
+ case DAG_SUBMITTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ recoveredDAG = dagAppMaster.createDAG(((DAGSubmittedEvent) event).getDAGPlan(),
+ lastInProgressDAG);
+ dagAppMaster.setCurrentDAG(recoveredDAG);
+ break;
+ }
+ case DAG_INITIALIZED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case DAG_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case DAG_COMMIT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ break;
+ }
+ case DAG_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ // If this is seen, nothing to recover
+ assert recoveredDAG != null;
+ recoveredDAG.restoreFromEvent(event);
+ return recoveredDAG;
+ }
+ case CONTAINER_LAUNCHED:
+ {
+ // Nothing to do?
+ break;
+ }
+ case VERTEX_INITIALIZED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexInitializedEvent vEvent = (VertexInitializedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexStartedEvent vEvent = (VertexStartedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_PARALLELISM_UPDATED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexParallelismUpdatedEvent vEvent = (VertexParallelismUpdatedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_COMMIT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexCommitStartedEvent vEvent = (VertexCommitStartedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case VERTEX_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexFinishedEvent vEvent = (VertexFinishedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ case TASK_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskStartedEvent tEvent = (TaskStartedEvent) event;
+ Task task = recoveredDAG.getVertex(
+ tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case TASK_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskFinishedEvent tEvent = (TaskFinishedEvent) event;
+ Task task = recoveredDAG.getVertex(
+ tEvent.getTaskID().getVertexID()).getTask(tEvent.getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case TASK_ATTEMPT_STARTED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskAttemptStartedEvent tEvent = (TaskAttemptStartedEvent) event;
+ Task task =
+ recoveredDAG.getVertex(
+ tEvent.getTaskAttemptID().getTaskID().getVertexID())
+ .getTask(tEvent.getTaskAttemptID().getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case TASK_ATTEMPT_FINISHED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ TaskAttemptFinishedEvent tEvent = (TaskAttemptFinishedEvent) event;
+ Task task =
+ recoveredDAG.getVertex(
+ tEvent.getTaskAttemptID().getTaskID().getVertexID())
+ .getTask(tEvent.getTaskAttemptID().getTaskID());
+ task.restoreFromEvent(tEvent);
+ break;
+ }
+ case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+ {
+ LOG.info("Recovering from event"
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ assert recoveredDAG != null;
+ VertexDataMovementEventsGeneratedEvent vEvent =
+ (VertexDataMovementEventsGeneratedEvent) event;
+ Vertex v = recoveredDAG.getVertex(vEvent.getVertexID());
+ v.restoreFromEvent(vEvent);
+ break;
+ }
+ default:
+ throw new RuntimeException("Invalid data found, unknown event type "
+ + eventType);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[DAG RECOVERY]"
+ + " dagId=" + lastInProgressDAG
+ + ", eventType=" + eventType
+ + ", event=" + event.toString());
+ }
+ newDAGRecoveryStream.writeInt(eventType.ordinal());
+ event.toProtoStream(newDAGRecoveryStream);
+ }
+ newDAGRecoveryStream.hsync();
+ newDAGRecoveryStream.close();
+
+ Path dataCopiedFlagPath = new Path(currentAttemptRecoveryDataDir,
+ dataRecoveredFileFlag);
+ LOG.info("Finished copying data from previous attempt into current attempt"
+ + " - setting flag by creating file"
+ + ", path=" + dataCopiedFlagPath.toString());
+ FSDataOutputStream flagFile =
+ recoveryFS.create(dataCopiedFlagPath, true, recoveryBufferSize);
+ flagFile.writeInt(1);
+ flagFile.hsync();
+ flagFile.close();
+
+ return recoveredDAG;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 8117619..45fb50a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -31,6 +31,7 @@ import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
@@ -84,4 +85,6 @@ public interface DAG {
Credentials getCredentials();
UserGroupInformation getDagUGI();
+
+ DAGState restoreFromEvent(HistoryEvent historyEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 8243b70..ac96681 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -24,6 +24,8 @@ import java.util.Map;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskReport;
import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -80,4 +82,5 @@ public interface Task {
public List<String> getDiagnostics();
+ TaskState restoreFromEvent(HistoryEvent historyEvent);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 0cc9163..2af1232 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -27,6 +27,7 @@ import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -36,7 +37,7 @@ import org.apache.tez.dag.records.TezVertexID;
* Read only view of TaskAttempt.
*/
public interface TaskAttempt {
-
+
public static class TaskAttemptStatus {
public TaskAttemptState state;
public DAGCounter localityCounter;
@@ -118,4 +119,7 @@ public interface TaskAttempt {
public Task getTask();
public boolean getIsRescheduled();
+
+ TaskAttemptState restoreFromEvent(HistoryEvent event);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9e7a0a7..807e9b4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -37,6 +37,8 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
@@ -114,4 +116,7 @@ public interface Vertex extends Comparable<Vertex> {
// TODO remove this once RootInputVertexManager is fixed to not use
// internal apis
AppContext getAppContext();
+
+ VertexState restoreFromEvent(HistoryEvent event);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
index 5a7af0a..7130c7a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexState.java
@@ -27,4 +27,5 @@ public enum VertexState {
KILLED,
ERROR,
TERMINATING,
+ RECOVERING,
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index 476c688..741dcfa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -26,7 +26,7 @@ public enum DAGEventType {
//Producer:Client
DAG_KILL,
- //Producer:MRAppMaster
+ //Producer:AM
DAG_INIT,
DAG_START,
@@ -44,4 +44,8 @@ public enum DAGEventType {
DAG_DIAGNOSTIC_UPDATE,
INTERNAL_ERROR,
DAG_COUNTER_UPDATE,
+
+ // Event to trigger recovery
+ // Producer:AM
+ DAG_RECOVER
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 018bd3d..db3fd3b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -60,5 +60,8 @@ public enum TaskAttemptEventType {
// Producer: consumer destination vertex
TA_OUTPUT_FAILED,
+
+ // Recovery
+ TA_RECOVER,
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
new file mode 100644
index 0000000..e7e59e3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventRecoverTask.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.records.TezTaskID;
+
+public class TaskEventRecoverTask extends TaskEvent {
+
+ TaskState desiredState;
+
+ public TaskEventRecoverTask(TezTaskID taskID, TaskState desiredState) {
+ super(taskID, TaskEventType.T_RECOVER);
+ this.desiredState = desiredState;
+ }
+
+ public TaskEventRecoverTask(TezTaskID taskID) {
+ this(taskID, null);
+ }
+
+ public TaskState getDesiredState() {
+ return desiredState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
index a0b99a9..4830ae0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventType.java
@@ -40,5 +40,9 @@ public enum TaskEventType {
T_ATTEMPT_OUTPUT_CONSUMABLE,
T_ATTEMPT_FAILED,
T_ATTEMPT_SUCCEEDED,
- T_ATTEMPT_KILLED
+ T_ATTEMPT_KILLED,
+
+ // Recovery event
+ T_RECOVER
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
new file mode 100644
index 0000000..34e45fe
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRecoverVertex.java
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventRecoverVertex extends VertexEvent {
+
+ VertexState desiredState;
+
+ public VertexEventRecoverVertex(TezVertexID vertexId, VertexState desiredState) {
+ super(vertexId, VertexEventType.V_RECOVER);
+ this.desiredState = desiredState;
+ }
+
+ public VertexState getDesiredState() {
+ return desiredState;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
index a872ae2..69195db 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -27,13 +27,25 @@ public class VertexEventRouteEvent extends VertexEvent {
final List<TezEvent> events;
+ final boolean recovered;
+
public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events) {
+ this(vertexId, events, false);
+ }
+
+ public VertexEventRouteEvent(TezVertexID vertexId, List<TezEvent> events,
+ boolean recovered) {
super(vertexId, VertexEventType.V_ROUTE_EVENT);
this.events = events;
+ this.recovered = recovered;
}
-
+
public List<TezEvent> getEvents() {
return events;
}
+ public boolean isRecovered() {
+ return recovered;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
new file mode 100644
index 0000000..5e61369
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventSourceVertexRecovered.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+
+import java.util.List;
+
+public class VertexEventSourceVertexRecovered extends VertexEvent {
+
+ VertexState sourceVertexState;
+ TezVertexID sourceVertexID;
+ List<TezTaskAttemptID> completedTaskAttempts;
+
+ public VertexEventSourceVertexRecovered(TezVertexID vertexID,
+ TezVertexID sourceVertexID,
+ VertexState sourceVertexState,
+ List<TezTaskAttemptID> completedTaskAttempts) {
+ super(vertexID, VertexEventType.V_SOURCE_VERTEX_RECOVERED);
+ this.sourceVertexState = sourceVertexState;
+ this.sourceVertexID = sourceVertexID;
+ this.completedTaskAttempts = completedTaskAttempts;
+ }
+
+ public VertexState getSourceVertexState() {
+ return sourceVertexState;
+ }
+
+ public TezVertexID getSourceVertexID() {
+ return sourceVertexID;
+ }
+
+ public List<TezTaskAttemptID> getCompletedTaskAttempts() {
+ return completedTaskAttempts;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 0cf14eb..69952d9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -56,5 +56,11 @@ public enum VertexEventType {
//Producer: VertexInputInitializer
V_ROOT_INPUT_INITIALIZED,
V_ROOT_INPUT_FAILED,
-
+
+ // Recover Event, Producer:DAG
+ V_RECOVER,
+
+ // Recover Event, Producer:Vertex
+ V_SOURCE_VERTEX_RECOVERED
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 04ed223..432c189 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -90,12 +90,15 @@ import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
@@ -157,6 +160,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final List<String> diagnostics = new ArrayList<String>();
+ // Recovery related flags
+ boolean recoveryInitEventSeen = false;
+ boolean recoveryStartEventSeen = false;
+
private static final DiagnosticsUpdateTransition
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition
@@ -176,6 +183,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
.addTransition(DAGState.NEW, DAGState.NEW,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(DAGState.NEW,
+ EnumSet.of(DAGState.NEW, DAGState.INITED, DAGState.RUNNING,
+ DAGState.SUCCEEDED, DAGState.FAILED, DAGState.KILLED,
+ DAGState.ERROR, DAGState.TERMINATING),
+ DAGEventType.DAG_RECOVER,
+ new RecoverTransition())
.addTransition(DAGState.NEW, DAGState.NEW,
DAGEventType.DAG_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
@@ -342,7 +355,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
-
+ private DAGState recoveredState = DAGState.NEW;
+ private boolean recoveryCommitInProgress = false;
+
static class VertexGroupInfo {
String groupName;
Set<String> groupMembers;
@@ -466,6 +481,46 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
+ public DAGState restoreFromEvent(HistoryEvent historyEvent) {
+ switch (historyEvent.getEventType()) {
+ case DAG_INITIALIZED:
+ recoveredState = initializeDAG((DAGInitializedEvent) historyEvent);
+ recoveryInitEventSeen = true;
+ return recoveredState;
+ case DAG_STARTED:
+ if (!recoveryInitEventSeen) {
+ throw new RuntimeException("Started Event seen but"
+ + " no Init Event was encountered earlier");
+ }
+ recoveryStartEventSeen = true;
+ this.startTime = ((DAGStartedEvent) historyEvent).getStartTime();
+ recoveredState = DAGState.RUNNING;
+ return recoveredState;
+ case DAG_COMMIT_STARTED:
+ if (recoveredState != DAGState.RUNNING) {
+ throw new RuntimeException("Commit Started Event seen but"
+ + " recovered state is not RUNNING"
+ + ", recoveredState=" + recoveredState);
+ }
+ recoveryCommitInProgress = true;
+ return recoveredState;
+ case DAG_FINISHED:
+ if (!recoveryStartEventSeen) {
+ throw new RuntimeException("Finished Event seen but"
+ + " no Start Event was encountered earlier");
+ }
+ recoveryCommitInProgress = false;
+ DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
+ this.finishTime = finishedEvent.getFinishTime();
+ recoveredState = finishedEvent.getState();
+ return recoveredState;
+ default:
+ throw new RuntimeException("Unexpected event received for restoring"
+ + " state, eventType=" + historyEvent.getEventType());
+ }
+ }
+
+ @Override
public TezCounters getAllCounters() {
readLock.lock();
@@ -666,6 +721,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
boolean failedWhileCommitting = false;
if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
// commit all shared outputs
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+ new DAGCommitStartedEvent(getID())));
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
if (failedWhileCommitting) {
break;
@@ -820,7 +877,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
void logJobHistoryFinishedEvent() {
this.setFinishTime();
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
- finishTime, DAGStatus.State.SUCCEEDED, "", getAllCounters());
+ finishTime, DAGState.SUCCEEDED, "", getAllCounters());
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(dagId, finishEvt));
}
@@ -839,7 +896,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
new DAGHistoryEvent(dagId, startEvt));
}
- void logJobHistoryUnsuccesfulEvent(DAGStatus.State state) {
+ void logJobHistoryUnsuccesfulEvent(DAGState state) {
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
clock.getTime(), state,
StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
@@ -947,11 +1004,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
}
- DAGStatus.State logState = getDAGStatusFromState(finalState);
- if (logState == DAGStatus.State.SUCCEEDED) {
+ if (finalState == DAGState.SUCCEEDED) {
logJobHistoryFinishedEvent();
} else {
- logJobHistoryUnsuccesfulEvent(logState);
+ logJobHistoryUnsuccesfulEvent(finalState);
}
eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
@@ -1047,179 +1103,294 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
return null;
}
- // TODO Recovery
- /*
- @Override
- public List<AMInfo> getAMInfos() {
- return amInfos;
+ public DAGState initializeDAG() {
+ return initializeDAG(null);
}
- */
-
- private static class InitTransition
- implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
- /**
- * Note that this transition method is called directly (and synchronously)
- * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
- * just plain sequential call within AM context), so we can trigger
- * modifications in AM state from here (at least, if AM is written that
- * way; MR version is).
- */
- @Override
- public DAGState transition(DAGImpl dag, DAGEvent event) {
- // TODO Metrics
- //dag.metrics.submittedJob(dag);
- //dag.metrics.preparingJob(dag);
+ DAGState initializeDAG(DAGInitializedEvent event) {
+ if (event != null) {
+ initTime = event.getInitTime();
+ } else {
+ initTime = clock.getTime();
+ }
- dag.initTime = dag.clock.getTime();
- dag.commitAllOutputsOnSuccess = dag.conf.getBoolean(
- TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
- TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+ commitAllOutputsOnSuccess = conf.getBoolean(
+ TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+ TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT);
+
+ // If we have no vertices, fail the dag
+ numVertices = getJobPlan().getVertexCount();
+ if (numVertices == 0) {
+ addDiagnostic("No vertices for dag");
+ trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
+ if (event != null) {
+ return DAGState.FAILED;
+ }
+ return finished(DAGState.FAILED);
+ }
- // If we have no vertices, fail the dag
- dag.numVertices = dag.getJobPlan().getVertexCount();
- if (dag.numVertices == 0) {
- dag.addDiagnostic("No vertices for dag");
- dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
- return dag.finished(DAGState.FAILED);
+ if (jobPlan.getVertexGroupsCount() > 0) {
+ for (PlanVertexGroupInfo groupInfo : jobPlan.getVertexGroupsList()) {
+ vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
}
-
- if (dag.jobPlan.getVertexGroupsCount() > 0) {
- for (PlanVertexGroupInfo groupInfo : dag.jobPlan.getVertexGroupsList()) {
- dag.vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
- }
- for (VertexGroupInfo groupInfo : dag.vertexGroups.values()) {
- for (String vertexName : groupInfo.groupMembers) {
- List<VertexGroupInfo> groupList = dag.vertexGroupInfo.get(vertexName);
- if (groupList == null) {
- groupList = Lists.newLinkedList();
- dag.vertexGroupInfo.put(vertexName, groupList);
- }
- groupList.add(groupInfo);
+ for (VertexGroupInfo groupInfo : vertexGroups.values()) {
+ for (String vertexName : groupInfo.groupMembers) {
+ List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertexName);
+ if (groupList == null) {
+ groupList = Lists.newLinkedList();
+ vertexGroupInfo.put(vertexName, groupList);
}
+ groupList.add(groupInfo);
}
}
+ }
- // create the vertices`
- for (int i=0; i < dag.numVertices; ++i) {
- String vertexName = dag.getJobPlan().getVertex(i).getName();
- VertexImpl v = createVertex(dag, vertexName, i);
- dag.addVertex(v);
- }
+ // create the vertices`
+ for (int i=0; i < numVertices; ++i) {
+ String vertexName = getJobPlan().getVertex(i).getName();
+ VertexImpl v = createVertex(this, vertexName, i);
+ addVertex(v);
+ }
- createDAGEdges(dag);
- Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+ createDAGEdges(this);
+ Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList());
- // setup the dag
- for (Vertex v : dag.vertices.values()) {
- parseVertexEdges(dag, edgePlans, v);
- }
+ // setup the dag
+ for (Vertex v : vertices.values()) {
+ parseVertexEdges(this, edgePlans, v);
+ }
- // Initialize the edges, now that the payload and vertices have been set.
- for (Edge e : dag.edges.values()) {
- e.initialize();
- }
+ // Initialize the edges, now that the payload and vertices have been set.
+ for (Edge e : edges.values()) {
+ e.initialize();
+ }
- assignDAGScheduler(dag);
-
- for (Map.Entry<String, VertexGroupInfo> entry : dag.vertexGroups.entrySet()) {
- String groupName = entry.getKey();
- VertexGroupInfo groupInfo = entry.getValue();
- if (!groupInfo.outputs.isEmpty()) {
- // shared outputs
- for (String vertexName : groupInfo.groupMembers) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting shared outputs for group: " + groupName +
- " on vertex: " + vertexName);
- }
- Vertex v = dag.getVertex(vertexName);
- v.addSharedOutputs(groupInfo.outputs);
+ assignDAGScheduler(this);
+
+ for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) {
+ String groupName = entry.getKey();
+ VertexGroupInfo groupInfo = entry.getValue();
+ if (!groupInfo.outputs.isEmpty()) {
+ // shared outputs
+ for (String vertexName : groupInfo.groupMembers) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting shared outputs for group: " + groupName +
+ " on vertex: " + vertexName);
}
+ Vertex v = getVertex(vertexName);
+ v.addSharedOutputs(groupInfo.outputs);
}
}
+ }
+ return DAGState.INITED;
+ }
- // TODO Metrics
- //dag.metrics.endPreparingJob(dag);
- dag.logJobHistoryInitedEvent();
- return DAGState.INITED;
-
+ private void createDAGEdges(DAGImpl dag) {
+ for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
+ EdgeProperty edgeProperty = DagTypeConverters
+ .createEdgePropertyMapFromDAGPlan(edgePlan);
+
+ // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
+ // referencing the fake edge manager within the API module.
+ if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
+ && edgeProperty.getEdgeManagerDescriptor() == null) {
+ EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
+ NullEdgeManager.class.getName());
+ EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
+ edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
+ edgeProperty.getEdgeDestination());
+ edgeProperty = ep;
+ }
+ // edge manager may be also set via API when using custom edge type
+ dag.edges.put(edgePlan.getId(),
+ new Edge(edgeProperty, dag.getEventHandler()));
}
+ }
- private void createDAGEdges(DAGImpl dag) {
- for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) {
- EdgeProperty edgeProperty = DagTypeConverters
- .createEdgePropertyMapFromDAGPlan(edgePlan);
-
- // If CUSTOM without an edge manager, setup a fake edge manager. Avoid
- // referencing the fake edge manager within the API module.
- if (edgeProperty.getDataMovementType() == DataMovementType.CUSTOM
- && edgeProperty.getEdgeManagerDescriptor() == null) {
- EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
- NullEdgeManager.class.getName());
- EdgeProperty ep = new EdgeProperty(edgeDesc, edgeProperty.getDataSourceType(),
- edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(),
- edgeProperty.getEdgeDestination());
- edgeProperty = ep;
- }
-
- // edge manager may be also set via API when using custom edge type
- dag.edges.put(edgePlan.getId(),
- new Edge(edgeProperty, dag.getEventHandler()));
- }
+ private static void assignDAGScheduler(DAGImpl dag) {
+ LOG.info("Using Natural order dag scheduler");
+ dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+ }
+
+ private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
+ TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+
+ VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
+ VertexLocationHint vertexLocationHint = DagTypeConverters
+ .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+
+ VertexImpl v = new VertexImpl(
+ vertexId, vertexPlan, vertexName, dag.conf,
+ dag.eventHandler, dag.taskAttemptListener,
+ dag.clock, dag.taskHeartbeatHandler,
+ !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
+ dag.vertexGroups);
+ return v;
+ }
+
+ // hooks up this VertexImpl to input and output EdgeProperties
+ private static void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
+ VertexPlan vertexPlan = vertex.getVertexPlan();
+
+ Map<Vertex, Edge> inVertices =
+ new HashMap<Vertex, Edge>();
+
+ Map<Vertex, Edge> outVertices =
+ new HashMap<Vertex, Edge>();
+
+ for(String inEdgeId : vertexPlan.getInEdgeIdList()){
+ EdgePlan edgePlan = edgePlans.get(inEdgeId);
+ Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
+ Edge edge = dag.edges.get(inEdgeId);
+ edge.setSourceVertex(inVertex);
+ edge.setDestinationVertex(vertex);
+ inVertices.put(inVertex, edge);
}
- private void assignDAGScheduler(DAGImpl dag) {
- LOG.info("Using Natural order dag scheduler");
- dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+ for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
+ EdgePlan edgePlan = edgePlans.get(outEdgeId);
+ Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
+ Edge edge = dag.edges.get(outEdgeId);
+ edge.setSourceVertex(vertex);
+ edge.setDestinationVertex(outVertex);
+ outVertices.put(outVertex, edge);
}
- private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
- TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+ vertex.setInputVertices(inVertices);
+ vertex.setOutputVertices(outVertices);
+ }
- VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
- VertexLocationHint vertexLocationHint = DagTypeConverters
- .convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+ private static class RecoverTransition
+ implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
+
+ @Override
+ public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+ switch (dag.recoveredState) {
+ case NEW:
+ // send DAG an Init and start events
+ dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
+ dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_START));
+ return DAGState.NEW;
+ case INITED:
+ // DAG inited but not started
+ // This implies vertices need to be sent init event
+ // Root vertices need to be sent start event
+ // The vertices may already have been sent these events but the
+ // DAG start may not have been persisted
+ for (Vertex v : dag.vertices.values()) {
+ if (v.getInputVerticesCount() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Running Recovery event to root vertex "
+ + v.getName());
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ VertexState.RUNNING));
+ }
+ }
+ return DAGState.RUNNING;
+ case RUNNING:
+ // if commit is in progress, DAG should fail as commits are not
+ // recoverable
+ if (dag.recoveryCommitInProgress) {
+ // Fail the DAG as we have not seen a completion
+ dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
+ dag.setFinishTime();
+ // Recover all other data for all vertices
+ // send recover event to all vertices with a final end state
+ for (Vertex v : dag.vertices.values()) {
+ VertexState desiredState = VertexState.SUCCEEDED;
+ if (dag.recoveredState.equals(DAGState.KILLED)) {
+ desiredState = VertexState.KILLED;
+ } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+ dag.recoveredState)) {
+ desiredState = VertexState.FAILED;
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ desiredState));
+ }
+ dag.logJobHistoryUnsuccesfulEvent(DAGState.FAILED);
+ dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+ DAGState.FAILED));
+ return DAGState.FAILED;
+ }
+ for (Vertex v : dag.vertices.values()) {
+ if (v.getInputVerticesCount() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending Running Recovery event to root vertex "
+ + v.getName());
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ VertexState.RUNNING));
+ }
+ }
+ return DAGState.RUNNING;
+ case SUCCEEDED:
+ case ERROR:
+ case FAILED:
+ case KILLED:
+ // Completed
+
+ // Recover all other data for all vertices
+ // send recover event to all vertices with a final end state
+ for (Vertex v : dag.vertices.values()) {
+ VertexState desiredState = VertexState.SUCCEEDED;
+ if (dag.recoveredState.equals(DAGState.KILLED)) {
+ desiredState = VertexState.KILLED;
+ } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
+ dag.recoveredState)) {
+ desiredState = VertexState.FAILED;
+ }
+ dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
+ desiredState));
+ }
- VertexImpl v = new VertexImpl(
- vertexId, vertexPlan, vertexName, dag.conf,
- dag.eventHandler, dag.taskAttemptListener,
- dag.clock, dag.taskHeartbeatHandler,
- !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
- dag.vertexGroups);
- return v;
+ // Let us inform AM of completion
+ dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+ dag.recoveredState));
+
+ LOG.info("Recovered DAG: " + dag.getID() + " finished with state: "
+ + dag.recoveredState);
+ return dag.recoveredState;
+ default:
+ // Error state
+ LOG.warn("Trying to recover DAG, failed to recover"
+ + " from non-handled state" + dag.recoveredState);
+ dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
+ DAGState.ERROR));
+ return DAGState.ERROR;
+ }
}
- // hooks up this VertexImpl to input and output EdgeProperties
- private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
- VertexPlan vertexPlan = vertex.getVertexPlan();
+ }
- Map<Vertex, Edge> inVertices =
- new HashMap<Vertex, Edge>();
+ private static class InitTransition
+ implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
- Map<Vertex, Edge> outVertices =
- new HashMap<Vertex, Edge>();
+ /**
+ * Note that this transition method is called directly (and synchronously)
+ * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
+ * just plain sequential call within AM context), so we can trigger
+ * modifications in AM state from here (at least, if AM is written that
+ * way; MR version is).
+ */
+ @Override
+ public DAGState transition(DAGImpl dag, DAGEvent event) {
+ // TODO Metrics
+ //dag.metrics.submittedJob(dag);
+ //dag.metrics.preparingJob(dag);
- for(String inEdgeId : vertexPlan.getInEdgeIdList()){
- EdgePlan edgePlan = edgePlans.get(inEdgeId);
- Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());
- Edge edge = dag.edges.get(inEdgeId);
- edge.setSourceVertex(inVertex);
- edge.setDestinationVertex(vertex);
- inVertices.put(inVertex, edge);
+ DAGState state = dag.initializeDAG();
+ if (state != DAGState.INITED) {
+ return state;
}
- for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
- EdgePlan edgePlan = edgePlans.get(outEdgeId);
- Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());
- Edge edge = dag.edges.get(outEdgeId);
- edge.setSourceVertex(vertex);
- edge.setDestinationVertex(outVertex);
- outVertices.put(outVertex, edge);
- }
+ // TODO Metrics
+ //dag.metrics.endPreparingJob(dag);
+ dag.logJobHistoryInitedEvent();
+ return DAGState.INITED;
+
- vertex.setInputVertices(inVertices);
- vertex.setOutputVertices(outVertices);
}
} // end of InitTransition
@@ -1427,6 +1598,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
for (VertexGroupInfo groupInfo : commitList) {
groupInfo.committed = true;
Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
+ appContext.getHistoryHandler().handle(new DAGHistoryEvent(getID(),
+ new DAGCommitStartedEvent(dagId)));
for (String outputName : groupInfo.outputs) {
OutputCommitter committer = v.getOutputCommitters().get(outputName);
LOG.info("Committing output: " + outputName);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
index 070bd75..ff9e401 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OutputCommitterContextImpl.java
@@ -31,13 +31,15 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
private final String vertexName;
private final String outputName;
private final byte[] userPayload;
+ private final int vertexIdx;
public OutputCommitterContextImpl(ApplicationId applicationId,
int dagAttemptNumber,
String dagName,
String vertexName,
String outputName,
- @Nullable byte[] userPayload) {
+ @Nullable byte[] userPayload,
+ int vertexIdx) {
checkNotNull(applicationId, "applicationId is null");
checkNotNull(dagName, "dagName is null");
checkNotNull(vertexName, "vertexName is null");
@@ -48,6 +50,7 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
this.vertexName = vertexName;
this.outputName = outputName;
this.userPayload = userPayload;
+ this.vertexIdx = vertexIdx;
}
@Override
@@ -80,4 +83,9 @@ public class OutputCommitterContextImpl implements OutputCommitterContext {
return userPayload;
}
+ @Override
+ public int getVertexIndex() {
+ return vertexIdx;
+ }
+
}