You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:17 UTC

[2/4] TEZ-847. Support basic AM recovery. (hitesh)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
new file mode 100644
index 0000000..6d5a769
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class DAGCommitStartedEvent implements HistoryEvent {
+
+  private TezDAGID dagID;
+
+  public DAGCommitStartedEvent() {
+  }
+
+  public DAGCommitStartedEvent(TezDAGID dagID) {
+    this.dagID = dagID;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.DAG_COMMIT_STARTED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public DAGCommitStartedProto toProto() {
+    return DAGCommitStartedProto.newBuilder()
+        .setDagId(dagID.toString())
+        .build();
+  }
+
+  public void fromProto(DAGCommitStartedProto proto) {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "dagID=" + dagID;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index cc9c3ad..38e0702 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
 
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
@@ -43,7 +43,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   private TezDAGID dagID;
   private long startTime;
   private long finishTime;
-  private DAGStatus.State state;
+  private DAGState state;
   private String diagnostics;
   private TezCounters tezCounters;
 
@@ -51,7 +51,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   public DAGFinishedEvent(TezDAGID dagId, long startTime,
-      long finishTime, DAGStatus.State state,
+      long finishTime, DAGState state,
       String diagnostics, TezCounters counters) {
     this.dagID = dagId;
     this.startTime = startTime;
@@ -111,22 +111,33 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   }
 
   public DAGFinishedProto toProto() {
-    return DAGFinishedProto.newBuilder()
-        .setDagId(dagID.toString())
+    DAGFinishedProto.Builder builder = DAGFinishedProto.newBuilder();
+
+    builder.setDagId(dagID.toString())
         .setState(state.ordinal())
-        .setDiagnostics(diagnostics)
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+
+    if (diagnostics != null) {
+      builder.setDiagnostics(diagnostics);
+    }
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+
+    return builder.build();
   }
 
   public void fromProto(DAGFinishedProto proto) {
     this.dagID = TezDAGID.fromString(proto.getDagId());
     this.finishTime = proto.getFinishTime();
-    this.state = DAGStatus.State.values()[proto.getState()];
-    this.diagnostics = proto.getDiagnostics();
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-        proto.getCounters());
+    this.state = DAGState.values()[proto.getState()];
+    if (proto.hasDiagnostics()) {
+      this.diagnostics = proto.getDiagnostics();
+    }
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+          proto.getCounters());
+    }
   }
 
   @Override
@@ -148,9 +159,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", counters=" + ((tezCounters == null) ? "null" :
+          (tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " ")));
   }
 
   @Override
@@ -159,4 +170,28 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
         HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
   }
 
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public DAGState getState() {
+    return state;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 20479e6..9b001b6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -94,4 +94,11 @@ public class DAGInitializedEvent implements HistoryEvent {
     fromProto(proto);
   }
 
+  public long getInitTime() {
+    return this.initTime;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index 4574753..a1bcdf2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -113,4 +113,11 @@ public class DAGStartedEvent implements HistoryEvent {
         + ", startTime=" + startTime;
   }
 
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 3d24105..853bea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -179,4 +179,15 @@ import java.io.OutputStream;
     return this.dagPlan;
   }
 
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  public long getSubmitTime() {
+    return submitTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 97f3be3..ecb6818 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -110,22 +110,31 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   }
 
   public TaskAttemptFinishedProto toProto() {
-    return TaskAttemptFinishedProto.newBuilder()
-        .setTaskAttemptId(taskAttemptId.toString())
+    TaskAttemptFinishedProto.Builder builder =
+        TaskAttemptFinishedProto.newBuilder();
+    builder.setTaskAttemptId(taskAttemptId.toString())
         .setState(state.ordinal())
-        .setDiagnostics(diagnostics)
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+    if (diagnostics != null) {
+      builder.setDiagnostics(diagnostics);
+    }
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+    return builder.build();
   }
 
   public void fromProto(TaskAttemptFinishedProto proto) {
     this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
     this.finishTime = proto.getFinishTime();
     this.state = TaskAttemptState.values()[proto.getState()];
-    this.diagnostics = proto.getDiagnostics();
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+    if (proto.hasDiagnostics()) {
+      this.diagnostics = proto.getDiagnostics();
+    }
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
         proto.getCounters());
+    }
   }
 
   @Override
@@ -149,9 +158,29 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", counters=" + (tezCounters == null ? "null" :
+          tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptId;
+  }
+
+  public TezCounters getCounters() {
+    return tezCounters;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public TaskAttemptState getState() {
+    return state;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 11a1c62..ba91db8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -112,7 +112,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return false;
+    return true;
   }
 
   @Override
@@ -158,4 +158,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
         + ", completedLogs=" + completedLogsUrl;
   }
 
+  public TezTaskAttemptID getTaskAttemptID() {
+    return this.taskAttemptId;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public NodeId getNodeId() {
+    return nodeId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index dac2f9a..74c804c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -26,6 +27,7 @@ import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskFinishedProto;
 import org.codehaus.jettison.json.JSONArray;
@@ -44,9 +46,11 @@ public class TaskFinishedEvent implements HistoryEvent {
   private long finishTime;
   private TaskState state;
   private TezCounters tezCounters;
+  private TezTaskAttemptID successfulAttemptID;
 
   public TaskFinishedEvent(TezTaskID taskID,
       String vertexName, long startTime, long finishTime,
+      TezTaskAttemptID successfulAttemptID,
       TaskState state, TezCounters counters) {
     this.vertexName = vertexName;
     this.taskID = taskID;
@@ -103,20 +107,31 @@ public class TaskFinishedEvent implements HistoryEvent {
   }
 
   public TaskFinishedProto toProto() {
-    return TaskFinishedProto.newBuilder()
-        .setTaskId(taskID.toString())
+    TaskFinishedProto.Builder builder = TaskFinishedProto.newBuilder();
+    builder.setTaskId(taskID.toString())
         .setState(state.ordinal())
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+    if (successfulAttemptID != null) {
+      builder.setSuccessfulTaskAttemptId(successfulAttemptID.toString());
+    }
+    return builder.build();
   }
 
   public void fromProto(TaskFinishedProto proto) {
     this.taskID = TezTaskID.fromString(proto.getTaskId());
     this.finishTime = proto.getFinishTime();
     this.state = TaskState.values()[proto.getState()];
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-        proto.getCounters());
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+          proto.getCounters());
+    }
+    if (proto.hasSuccessfulTaskAttemptId()) {
+      this.successfulAttemptID =
+          TezTaskAttemptID.fromString(proto.getSuccessfulTaskAttemptId());
+    }
   }
 
   @Override
@@ -138,9 +153,30 @@ public class TaskFinishedEvent implements HistoryEvent {
         + ", finishTime=" + finishTime
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", successfulAttemptID=" + (successfulAttemptID == null ? "null" :
+            successfulAttemptID.toString())
+        + ", counters=" + ( tezCounters == null ? "null" :
+          tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
   }
 
+  public TezTaskID getTaskID() {
+    return taskID;
+  }
+
+  public TaskState getState() {
+    return state;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
+
+  public TezTaskAttemptID getSuccessfulAttemptID() {
+    return successfulAttemptID;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index 9efa357..c2a380b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -91,7 +91,7 @@ public class TaskStartedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return false;
+    return true;
   }
 
   @Override
@@ -132,4 +132,16 @@ public class TaskStartedEvent implements HistoryEvent {
         + ", launchTime=" + startTime;
   }
 
+  public TezTaskID getTaskID() {
+    return taskID;
+  }
+
+  public long getScheduledTime() {
+    return scheduledTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
new file mode 100644
index 0000000..564f9ed
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VertexCommitStartedEvent implements HistoryEvent {
+
+  private TezVertexID vertexID;
+
+  public VertexCommitStartedEvent() {
+  }
+
+  public VertexCommitStartedEvent(TezVertexID vertexId) {
+    this.vertexID = vertexId;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_COMMIT_STARTED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    // TODO
+    return null;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexCommitStartedProto toProto() {
+    return VertexCommitStartedProto.newBuilder()
+        .setVertexId(vertexID.toString())
+        .build();
+  }
+
+  public void fromProto(VertexCommitStartedProto proto) {
+    this.vertexID = TezVertexID.fromString(proto.getVertexId());
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "vertexId=" + vertexID;
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 7c2a16f..035c9ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -206,4 +206,12 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
 
   }
 
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public List<TezEvent> getTezEvents() {
+    return this.events;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 0f2b8a1..2366eb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
 
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.ats.EntityTypes;
@@ -45,13 +45,13 @@ public class VertexFinishedEvent implements HistoryEvent {
   private long startRequestedTime;
   private long startTime;
   private long finishTime;
-  private VertexStatus.State state;
+  private VertexState state;
   private String diagnostics;
   private TezCounters tezCounters;
 
   public VertexFinishedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
-      VertexStatus.State state, String diagnostics,
+      VertexState state, String diagnostics,
       TezCounters counters) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
@@ -111,24 +111,32 @@ public class VertexFinishedEvent implements HistoryEvent {
   }
 
   public VertexFinishedProto toProto() {
-    return VertexFinishedProto.newBuilder()
-        .setVertexName(vertexName)
+    VertexFinishedProto.Builder builder = VertexFinishedProto.newBuilder();
+    builder.setVertexName(vertexName)
         .setVertexId(vertexID.toString())
         .setState(state.ordinal())
-        .setDiagnostics(diagnostics)
-        .setFinishTime(finishTime)
-        .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
-        .build();
+        .setFinishTime(finishTime);
+    if (diagnostics != null) {
+      builder.setDiagnostics(diagnostics);
+    }
+    if (tezCounters != null) {
+      builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+    }
+    return builder.build();
   }
 
   public void fromProto(VertexFinishedProto proto) {
     this.vertexName = proto.getVertexName();
     this.vertexID = TezVertexID.fromString(proto.getVertexId());
     this.finishTime = proto.getFinishTime();
-    this.state = VertexStatus.State.values()[proto.getState()];
-    this.diagnostics = proto.getDiagnostics();
-    this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
-        proto.getCounters());
+    this.state = VertexState.values()[proto.getState()];
+    if (proto.hasDiagnostics())  {
+      this.diagnostics = proto.getDiagnostics();
+    }
+    if (proto.hasCounters()) {
+      this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+          proto.getCounters());
+    }
   }
 
   @Override
@@ -154,9 +162,28 @@ public class VertexFinishedEvent implements HistoryEvent {
         + ", timeTaken=" + (finishTime - startTime)
         + ", status=" + state.name()
         + ", diagnostics=" + diagnostics
-        + ", counters="
-        + tezCounters.toString()
-            .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+        + ", counters=" + ( tezCounters == null ? "null" :
+          tezCounters.toString()
+            .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
   }
 
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public VertexState getState() {
+    return this.state;
+  }
+
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 21c7587..e9e4a8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -18,12 +18,19 @@
 
 package org.apache.tez.dag.history.events;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.ats.EntityTypes;
 import org.apache.tez.dag.history.utils.ATSConstants;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -31,28 +38,35 @@ import org.codehaus.jettison.json.JSONObject;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 public class VertexInitializedEvent implements HistoryEvent {
 
+  private static final Log LOG = LogFactory.getLog(VertexInitializedEvent.class);
+
   private TezVertexID vertexID;
   private String vertexName;
   private long initRequestedTime;
   private long initedTime;
-  private long numTasks;
+  private int numTasks;
   private String processorName;
+  private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs;
 
   public VertexInitializedEvent() {
   }
 
   public VertexInitializedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime,
-      long numTasks, String processorName) {
+      int numTasks, String processorName,
+      Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
     this.initedTime = initedTime;
     this.numTasks = numTasks;
     this.processorName = processorName;
+    this.additionalInputs = additionalInputs;
   }
 
   @Override
@@ -107,8 +121,23 @@ public class VertexInitializedEvent implements HistoryEvent {
   }
 
   public RecoveryProtos.VertexInitializedProto toProto() {
-    return RecoveryProtos.VertexInitializedProto.newBuilder()
-        .setVertexId(vertexID.toString())
+    VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder();
+    if (additionalInputs != null
+      && !additionalInputs.isEmpty()) {
+      for (RootInputLeafOutputDescriptor<InputDescriptor> input :
+        additionalInputs.values()) {
+        RootInputLeafOutputProto.Builder inputBuilder
+            = RootInputLeafOutputProto.newBuilder();
+        inputBuilder.setName(input.getEntityName());
+        if (input.getInitializerClassName() != null) {
+          inputBuilder.setInitializerClassName(input.getInitializerClassName());
+        }
+        inputBuilder.setEntityDescriptor(
+            DagTypeConverters.convertToDAGPlan(input.getDescriptor()));
+        builder.addInputs(inputBuilder.build());
+      }
+    }
+    return builder.setVertexId(vertexID.toString())
         .setVertexName(vertexName)
         .setInitRequestedTime(initRequestedTime)
         .setInitTime(initedTime)
@@ -122,6 +151,20 @@ public class VertexInitializedEvent implements HistoryEvent {
     this.initRequestedTime = proto.getInitRequestedTime();
     this.initedTime = proto.getInitTime();
     this.numTasks = proto.getNumTasks();
+    if (proto.getInputsCount() > 0) {
+      this.additionalInputs =
+          new LinkedHashMap<String, RootInputLeafOutputDescriptor<InputDescriptor>>();
+      for (RootInputLeafOutputProto inputProto : proto.getInputsList()) {
+        RootInputLeafOutputDescriptor<InputDescriptor> input =
+            new RootInputLeafOutputDescriptor<InputDescriptor>(
+                inputProto.getName(),
+                DagTypeConverters.convertInputDescriptorFromDAGPlan(
+                    inputProto.getEntityDescriptor()),
+                inputProto.hasInitializerClassName() ?
+                    inputProto.getInitializerClassName() : null);
+        additionalInputs.put(input.getEntityName(), input);
+      }
+    }
   }
 
   @Override
@@ -143,7 +186,32 @@ public class VertexInitializedEvent implements HistoryEvent {
         + ", initRequestedTime=" + initRequestedTime
         + ", initedTime=" + initedTime
         + ", numTasks=" + numTasks
-        + ", processorName=" + processorName;
+        + ", processorName=" + processorName
+        + ", additionalInputsCount="
+        + (additionalInputs != null ? additionalInputs.size() : 0);
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public long getInitRequestedTime() {
+    return initRequestedTime;
+  }
+
+  public long getInitedTime() {
+    return initedTime;
+  }
+
+  public int getNumTasks() {
+    return numTasks;
   }
 
+  public Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs() {
+    return additionalInputs;
+  }
+
+  public String getProcessorName() {
+    return processorName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
new file mode 100644
index 0000000..43cc787
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class VertexParallelismUpdatedEvent implements HistoryEvent {
+
+  private TezVertexID vertexID;
+  private int numTasks;
+  private VertexLocationHint vertexLocationHint;
+  private Map<String, EdgeManagerDescriptor> sourceEdgeManagers;
+
+  public VertexParallelismUpdatedEvent() {
+  }
+
+  public VertexParallelismUpdatedEvent(TezVertexID vertexID,
+      int numTasks, VertexLocationHint vertexLocationHint,
+      Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+    this.vertexID = vertexID;
+    this.numTasks = numTasks;
+    this.vertexLocationHint = vertexLocationHint;
+    this.sourceEdgeManagers = sourceEdgeManagers;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_PARALLELISM_UPDATED;
+  }
+
+  @Override
+  public JSONObject convertToATSJSON() throws JSONException {
+    throw new UnsupportedOperationException("VertexParallelismUpdatedEvent"
+        + " not a History event");
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  public VertexParallelismUpdatedProto toProto() {
+    VertexParallelismUpdatedProto.Builder builder =
+        VertexParallelismUpdatedProto.newBuilder();
+    builder.setVertexId(vertexID.toString())
+        .setNumTasks(numTasks);
+    if (vertexLocationHint != null) {
+      builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
+            this.vertexLocationHint));
+    }
+    if (sourceEdgeManagers != null) {
+      for (Entry<String, EdgeManagerDescriptor> entry :
+          sourceEdgeManagers.entrySet()) {
+        EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
+            EdgeManagerDescriptorProto.newBuilder();
+        edgeMgrBuilder.setEdgeName(entry.getKey());
+        edgeMgrBuilder.setEntityDescriptor(
+            DagTypeConverters.convertToDAGPlan(entry.getValue()));
+        builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
+      }
+    }
+    return builder.build();
+  }
+
+  public void fromProto(VertexParallelismUpdatedProto proto) {
+    this.vertexID = TezVertexID.fromString(proto.getVertexId());
+    this.numTasks = proto.getNumTasks();
+    if (proto.hasVertexLocationHint()) {
+      this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
+          proto.getVertexLocationHint());
+    }
+    if (proto.getEdgeManagerDescriptorsCount() > 0) {
+      this.sourceEdgeManagers = new HashMap<String, EdgeManagerDescriptor>(
+          proto.getEdgeManagerDescriptorsCount());
+      for (EdgeManagerDescriptorProto edgeManagerProto :
+        proto.getEdgeManagerDescriptorsList()) {
+        EdgeManagerDescriptor edgeManagerDescriptor =
+            DagTypeConverters.convertEdgeManagerDescriptorFromDAGPlan(
+                edgeManagerProto.getEntityDescriptor());
+        sourceEdgeManagers.put(edgeManagerProto.getEdgeName(),
+            edgeManagerDescriptor);
+      }
+    }
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "vertexId=" + vertexID
+        + ", numTasks=" + numTasks
+        + ", vertexLocationHint=" +
+        (vertexLocationHint == null? "null" : vertexLocationHint)
+        + ", edgeManagersCount=" +
+        (sourceEdgeManagers == null? "null" : sourceEdgeManagers.size());
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+  public VertexLocationHint getVertexLocationHint() {
+    return vertexLocationHint;
+  }
+
+  public Map<String, EdgeManagerDescriptor> getSourceEdgeManagers() {
+    return sourceEdgeManagers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 6bb383d..e6023f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -88,7 +88,7 @@ public class VertexStartedEvent implements HistoryEvent {
 
   @Override
   public boolean isRecoveryEvent() {
-    return false;
+    return true;
   }
 
   @Override
@@ -128,4 +128,16 @@ public class VertexStartedEvent implements HistoryEvent {
         + ", startedTime=" + startTime;
   }
 
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public long getStartRequestedTime() {
+    return startRequestedTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 807ad81..5986657 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -34,9 +34,11 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +51,7 @@ public class RecoveryService extends AbstractService {
   private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
       new LinkedBlockingQueue<DAGHistoryEvent>();
   private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
+  private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
 
   private Thread eventHandlingThread;
   private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -60,9 +63,12 @@ public class RecoveryService extends AbstractService {
   Path recoveryPath;
   Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
       HashMap<TezDAGID, FSDataOutputStream>();
-  // FSDataOutputStream metaInfoStream;
   private int bufferSize;
   private FSDataOutputStream summaryStream;
+  private int unflushedEventsCount = 0;
+  private long lastFlushTime = -1;
+  private int maxUnflushedEvents;
+  private int flushInterval;
 
   public RecoveryService(AppContext appContext) {
     super(RecoveryService.class.getName());
@@ -76,11 +82,17 @@ public class RecoveryService extends AbstractService {
     recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
     bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
         TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+
+    flushInterval = conf.getInt(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS,
+        TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT);
+    maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
+        TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
   }
 
   @Override
   public void serviceStart() {
     LOG.info("Starting RecoveryService");
+    lastFlushTime = appContext.getClock().getTime();
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -128,18 +140,21 @@ public class RecoveryService extends AbstractService {
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
     }
+
     if (summaryStream != null) {
       try {
-        summaryStream.flush();
+        LOG.info("Closing Summary Stream");
+        summaryStream.hsync();
         summaryStream.close();
       } catch (IOException ioe) {
         LOG.warn("Error when closing summary stream", ioe);
       }
     }
-    for (FSDataOutputStream outputStream : outputStreamMap.values()) {
+    for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
       try {
-        outputStream.flush();
-        outputStream.close();
+        LOG.info("Closing Output Stream for DAG " + entry.getKey());
+        entry.getValue().hsync();
+        entry.getValue().close();
       } catch (IOException ioe) {
         LOG.warn("Error when closing output stream", ioe);
       }
@@ -152,38 +167,44 @@ public class RecoveryService extends AbstractService {
           + event.getHistoryEvent().getEventType());
       return;
     }
+    HistoryEventType eventType = event.getHistoryEvent().getEventType();
     if (!started.get()) {
+      LOG.warn("Adding event of type " + eventType
+          + " to queue as service not started");
       eventQueue.add(event);
       return;
     }
-    HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
     if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
       || eventType.equals(HistoryEventType.DAG_FINISHED)) {
       // handle submissions and completion immediately
       synchronized (lock) {
         try {
           handleEvent(event);
-          summaryStream.flush();
+          summaryStream.hsync();
           if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-            outputStreamMap.get(event.getDagID()).flush();
+            if (outputStreamMap.containsKey(event.getDagID())) {
+              doFlush(outputStreamMap.get(event.getDagID()),
+                  appContext.getClock().getTime(), true);
+            }
           } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
             completedDAGs.add(event.getDagID());
             if (outputStreamMap.containsKey(event.getDagID())) {
               try {
-                outputStreamMap.get(event.getDagID()).flush();
+                doFlush(outputStreamMap.get(event.getDagID()),
+                    appContext.getClock().getTime(), true);
                 outputStreamMap.get(event.getDagID()).close();
                 outputStreamMap.remove(event.getDagID());
               } catch (IOException ioe) {
                 LOG.warn("Error when trying to flush/close recovery file for"
                     + " dag, dagId=" + event.getDagID());
+                // FIXME handle error ?
               }
-            } else {
-              // TODO this is an error
             }
           }
         } catch (Exception e) {
-            // TODO handle failures - treat as fatal or ignore?
-            LOG.warn("Error handling recovery event", e);
+          // FIXME handle failures
+          LOG.warn("Error handling recovery event", e);
         }
       }
       LOG.info("DAG completed"
@@ -191,6 +212,9 @@ public class RecoveryService extends AbstractService {
           + ", queueSize=" + eventQueue.size());
     } else {
       // All other events just get queued
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Queueing Recovery event of type " + eventType.name());
+      }
       eventQueue.add(event);
     }
   }
@@ -206,27 +230,42 @@ public class RecoveryService extends AbstractService {
       // AM event
       // anything to be done?
       // TODO
+      LOG.info("Skipping Recovery Event as DAG is null"
+          + ", eventType=" + event.getHistoryEvent().getEventType());
       return;
     }
 
     TezDAGID dagID = event.getDagID();
-    if (completedDAGs.contains(dagID)) {
-      // Skip events for completed DAGs
+    if (completedDAGs.contains(dagID)
+        || skippedDAGs.contains(dagID)) {
+      // Skip events for completed and skipped DAGs
       // no need to recover completed DAGs
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping Recovery Event as either completed or skipped"
+            + ", dagId=" + dagID
+            + ", completed=" + completedDAGs.contains(dagID)
+            + ", skipped=" + skippedDAGs.contains(dagID)
+            + ", eventType=" + event.getHistoryEvent().getEventType());
+      }
       return;
     }
 
     try {
 
-      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
-          || eventType.equals(HistoryEventType.DAG_FINISHED)) {
-        if (summaryStream == null) {
-          Path summaryPath = new Path(recoveryPath,
-              appContext.getApplicationID()
-              + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+      if (summaryStream == null) {
+        Path summaryPath = new Path(recoveryPath,
+            appContext.getApplicationID()
+                + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+        if (!recoveryDirFS.exists(summaryPath)) {
           summaryStream = recoveryDirFS.create(summaryPath, false,
               bufferSize);
+        } else {
+          summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
         }
+      }
+
+      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
+          || eventType.equals(HistoryEventType.DAG_FINISHED)) {
         if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
           DAGSubmittedEvent dagSubmittedEvent =
               (DAGSubmittedEvent) event.getHistoryEvent();
@@ -235,33 +274,97 @@ public class RecoveryService extends AbstractService {
               && dagName.startsWith(
               TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
             // Skip recording pre-warm DAG events
+            skippedDAGs.add(dagID);
             return;
           }
-          Path dagFilePath = new Path(recoveryPath,
-              dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-          FSDataOutputStream outputStream =
-              recoveryDirFS.create(dagFilePath, false, bufferSize);
-          outputStreamMap.put(dagID, outputStream);
         }
+        SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Writing recovery event to summary stream"
+              + ", dagId=" + dagID
+              + ", type="
+              + event.getHistoryEvent().getEventType());
+        }
+        summaryEvent.toSummaryProtoStream(summaryStream);
+      }
 
-        if (outputStreamMap.containsKey(dagID)) {
-          SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
-          summaryEvent.toSummaryProtoStream(summaryStream);
+      if (!outputStreamMap.containsKey(dagID)) {
+        Path dagFilePath = new Path(recoveryPath,
+            dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+        FSDataOutputStream outputStream;
+        if (recoveryDirFS.exists(dagFilePath)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Opening DAG recovery file in append mode"
+                + ", filePath=" + dagFilePath);
+          }
+          outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Opening DAG recovery file in create mode"
+                + ", filePath=" + dagFilePath);
+          }
+          outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
         }
+        outputStreamMap.put(dagID, outputStream);
       }
 
       FSDataOutputStream outputStream = outputStreamMap.get(dagID);
-      if (outputStream == null) {
-        return;
-      }
 
-      outputStream.write(event.getHistoryEvent().getEventType().ordinal());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Writing recovery event to output stream"
+            + ", dagId=" + dagID
+            + ", type="
+            + event.getHistoryEvent().getEventType());
+      }
+      ++unflushedEventsCount;
+      outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
       event.getHistoryEvent().toProtoStream(outputStream);
+      if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
+          HistoryEventType.DAG_FINISHED).contains(eventType)) {
+        maybeFlush(outputStream);
+      }
     } catch (IOException ioe) {
-      // TODO handle failures - treat as fatal or ignore?
+      // FIXME handle failures
       LOG.warn("Failed to write to stream", ioe);
     }
 
   }
 
+  private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
+    long currentTime = appContext.getClock().getTime();
+    boolean doFlush = false;
+    if (unflushedEventsCount >= maxUnflushedEvents) {
+      doFlush = true;
+    } else if (flushInterval >= 0
+        && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
+      doFlush = true;
+    }
+
+    if (!doFlush) {
+      return;
+    }
+
+    doFlush(outputStream, currentTime, false);
+  }
+
+  private void doFlush(FSDataOutputStream outputStream,
+      long currentTime, boolean sync) throws IOException {
+    if (sync) {
+      outputStream.hsync();
+    } else {
+      outputStream.hflush();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Flushing output stream"
+          + ", lastTimeSinceFLush=" + lastFlushTime
+          + ", unflushedEventsCount=" + unflushedEventsCount
+          + ", maxUnflushedEvents=" + maxUnflushedEvents
+          + ", currentTime=" + currentTime);
+    }
+
+    unflushedEventsCount = 0;
+    lastFlushTime = currentTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
deleted file mode 100644
index 5b87bc8..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.recovery;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.AMLaunchedEvent;
-import org.apache.tez.dag.history.events.AMStartedEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
-import org.apache.tez.dag.history.events.DAGFinishedEvent;
-import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.history.events.TaskStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
-import org.apache.tez.dag.history.events.VertexFinishedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexStartedEvent;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-
-public class RecoveryParser {
-
-  private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
-
-  Path recoveryDirectory;
-  FileSystem recoveryDirFS;
-
-  public RecoveryParser(Path recoveryDirectory, Configuration conf)
-      throws IOException {
-    this.recoveryDirectory = recoveryDirectory;
-    recoveryDirFS = FileSystem.get(recoveryDirectory.toUri(), conf);
-
-  }
-
-  public void parse() throws IOException {
-    RemoteIterator<LocatedFileStatus> locatedFilesStatus =
-        recoveryDirFS.listFiles(recoveryDirectory, false);
-    while (locatedFilesStatus.hasNext()) {
-      LocatedFileStatus fileStatus = locatedFilesStatus.next();
-      String fileName = fileStatus.getPath().getName();
-      if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX)) {
-        FSDataInputStream inputStream =
-            recoveryDirFS.open(fileStatus.getPath());
-        LOG.info("Parsing DAG file " + fileName);
-        parseDAGRecoveryFile(inputStream);
-      } else if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX)) {
-        FSDataInputStream inputStream =
-            recoveryDirFS.open(fileStatus.getPath());
-        LOG.info("Parsing Summary file " + fileName);
-        parseSummaryFile(inputStream);
-      } else {
-        LOG.warn("Encountered unknown file in recovery dir, fileName="
-            + fileName);
-        continue;
-      }
-    }
-  }
-
-  private void parseSummaryFile(FSDataInputStream inputStream)
-      throws IOException {
-    int counter = 0;
-    while (inputStream.available() > 0) {
-      RecoveryProtos.SummaryEventProto proto =
-          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
-      LOG.info("[SUMMARY]"
-          + " dagId=" + proto.getDagId()
-          + ", timestamp=" + proto.getTimestamp()
-          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
-    }
-  }
-
-  private void parseDAGRecoveryFile(FSDataInputStream inputStream)
-      throws IOException {
-    int counter = 0;
-    while (inputStream.available() > 0) {
-      int eventTypeOrdinal = inputStream.read();
-      if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
-          HistoryEventType.values().length) {
-        // Corrupt data
-        // reached end
-        LOG.warn("Corrupt data found when trying to read next event type");
-        break;
-      }
-      HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
-      HistoryEvent event = null;
-      switch (eventType) {
-        case AM_LAUNCHED:
-          event = new AMLaunchedEvent();
-          break;
-        case AM_STARTED:
-          event = new AMStartedEvent();
-          break;
-        case DAG_SUBMITTED:
-          event = new DAGSubmittedEvent();
-          break;
-        case DAG_INITIALIZED:
-          event = new DAGInitializedEvent();
-          break;
-        case DAG_STARTED:
-          event = new DAGStartedEvent();
-          break;
-        case DAG_FINISHED:
-          event = new DAGFinishedEvent();
-          break;
-        case CONTAINER_LAUNCHED:
-          event = new ContainerLaunchedEvent();
-          break;
-        case VERTEX_INITIALIZED:
-          event = new VertexInitializedEvent();
-          break;
-        case VERTEX_STARTED:
-          event = new VertexStartedEvent();
-          break;
-        case VERTEX_FINISHED:
-          event = new VertexFinishedEvent();
-          break;
-        case TASK_STARTED:
-          event = new TaskStartedEvent();
-          break;
-        case TASK_FINISHED:
-          event = new TaskFinishedEvent();
-          break;
-        case TASK_ATTEMPT_STARTED:
-          event = new TaskAttemptStartedEvent();
-          break;
-        case TASK_ATTEMPT_FINISHED:
-          event = new TaskAttemptFinishedEvent();
-          break;
-        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          event = new VertexDataMovementEventsGeneratedEvent();
-          break;
-        default:
-          throw new IOException("Invalid data found, unknown event type "
-              + eventType);
-
-      }
-      ++counter;
-      LOG.info("Parsing event from input stream"
-          + ", eventType=" + eventType
-          + ", eventIndex=" + counter);
-      event.fromProtoStream(inputStream);
-      LOG.info("Parsed event from input stream"
-          + ", eventType=" + eventType
-          + ", eventIndex=" + counter
-          + ", event=" + event.toString());
-    }
-  }
-
-  public static void main(String argv[]) throws IOException {
-    // TODO clean up with better usage and error handling
-    Configuration conf = new Configuration();
-    String dir = argv[0];
-    RecoveryParser parser = new RecoveryParser(new Path(dir), conf);
-    parser.parse();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e21f5df..65f3aaf 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -56,6 +56,9 @@ message DAGStartedProto {
   optional int64 start_time = 3;
 }
 
+message DAGCommitStartedProto {
+  optional string dag_id = 1;
+}
 message DAGFinishedProto {
   optional string dag_id = 1;
   optional int64 finish_time = 2;
@@ -69,7 +72,8 @@ message VertexInitializedProto {
   optional string vertex_id = 2;
   optional int64 init_requested_time = 3;
   optional int64 init_time = 4;
-  optional int64 num_tasks = 5;
+  optional int32 num_tasks = 5;
+  repeated RootInputLeafOutputProto inputs = 6;
 }
 
 message VertexStartedProto {
@@ -79,6 +83,22 @@ message VertexStartedProto {
   optional int64 start_time = 4;
 }
 
+message EdgeManagerDescriptorProto {
+  optional string edge_name = 1;
+  optional TezEntityDescriptorProto entity_descriptor = 2;
+}
+
+message VertexParallelismUpdatedProto {
+  optional string vertex_id = 1;
+  optional int32 num_tasks = 2;
+  optional VertexLocationHintProto vertex_location_hint = 3;
+  repeated EdgeManagerDescriptorProto edge_manager_descriptors = 4;
+}
+
+message VertexCommitStartedProto {
+  optional string vertex_id = 1;
+}
+
 message VertexFinishedProto {
   optional string vertex_name = 1;
   optional string vertex_id = 2;
@@ -100,6 +120,7 @@ message TaskFinishedProto {
   optional int32 state = 3;
   optional string diagnostics = 4;
   optional TezCountersProto counters = 5;
+  optional string successful_task_attempt_id = 6;
 }
 
 message TaskAttemptStartedProto {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
index e965bca..0a3b9e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.api.client;
 
+import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.junit.Assert;
@@ -32,7 +33,12 @@ public class TestVertexStatusBuilder {
           VertexStatusBuilder.getProtoState(state);
       VertexStatus.State clientState =
           VertexStatus.getState(stateProto);
-      Assert.assertEquals(state.name(), clientState.name());
+      if (state.equals(VertexState.RECOVERING)) {
+        Assert.assertEquals(clientState.name(),
+            State.NEW.name());
+      } else {
+        Assert.assertEquals(state.name(), clientState.name());
+      }
     }
   }