You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/11/25 15:02:26 UTC

[6/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign (zjffdu)

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index e17a4d4..95ff0cd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -64,7 +64,10 @@ public class HistoryEventHandler extends CompositeService {
     addService(historyLoggingService);
 
     if (recoveryEnabled) {
-      recoveryService = new RecoveryService(context);
+      String recoveryServiceClass = conf.get(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS,
+          TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS_DEFAULT);
+      recoveryService = ReflectionUtils.createClazzInstance(recoveryServiceClass,
+          new Class[]{AppContext.class}, new Object[] {context});
       addService(recoveryService);
     }
     super.serviceInit(conf);

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index d791d9e..4e56e9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -29,7 +29,7 @@ public enum HistoryEventType {
   DAG_KILL_REQUEST,
   VERTEX_INITIALIZED,
   VERTEX_STARTED,
-  VERTEX_PARALLELISM_UPDATED,
+  VERTEX_CONFIGURE_DONE,
   VERTEX_FINISHED,
   TASK_STARTED,
   TASK_FINISHED,
@@ -37,7 +37,6 @@ public enum HistoryEventType {
   TASK_ATTEMPT_FINISHED,
   CONTAINER_LAUNCHED,
   CONTAINER_STOPPED,
-  VERTEX_DATA_MOVEMENT_EVENTS_GENERATED,
   DAG_COMMIT_STARTED,
   VERTEX_COMMIT_STARTED,
   VERTEX_GROUP_COMMIT_STARTED,

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java b/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java
new file mode 100644
index 0000000..bab713d
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/RecoveryConverters.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+
+public class RecoveryConverters {
+
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 7d83db2..21b8719 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -34,10 +34,13 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.utils.TezEventUtils;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto;
+import org.apache.tez.runtime.api.impl.TezEvent;
 
 public class TaskAttemptFinishedEvent implements HistoryEvent {
 
@@ -55,6 +58,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   private TezCounters tezCounters;
   private TaskAttemptTerminationCause error;
   private List<DataEventDependencyInfo> dataEvents;
+  private List<TezEvent> taGeneratedEvents;
   
   public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
       String vertexName,
@@ -63,7 +67,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
       TaskAttemptState state,
       TaskAttemptTerminationCause error,
       String diagnostics, TezCounters counters, 
-      List<DataEventDependencyInfo> dataEvents, 
+      List<DataEventDependencyInfo> dataEvents,
+      List<TezEvent> taGeneratedEvents,
       long creationTime, 
       TezTaskAttemptID creationCausalTA, 
       long allocationTime) {
@@ -79,6 +84,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     this.tezCounters = counters;
     this.error = error;
     this.dataEvents = dataEvents;
+    this.taGeneratedEvents = taGeneratedEvents;
   }
 
   public TaskAttemptFinishedEvent() {
@@ -103,7 +109,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     return dataEvents;
   }
   
-  public TaskAttemptFinishedProto toProto() {
+  public TaskAttemptFinishedProto toProto() throws IOException {
     TaskAttemptFinishedProto.Builder builder =
         TaskAttemptFinishedProto.newBuilder();
     builder.setTaskAttemptId(taskAttemptId.toString())
@@ -129,10 +135,15 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         builder.addDataEvents(DataEventDependencyInfo.toProto(info));
       }
     }
+    if (taGeneratedEvents != null && !taGeneratedEvents.isEmpty()) {
+      for (TezEvent event : taGeneratedEvents) {
+        builder.addTaGeneratedEvents(TezEventUtils.toProto(event));
+      }
+    }
     return builder.build();
   }
 
-  public void fromProto(TaskAttemptFinishedProto proto) {
+  public void fromProto(TaskAttemptFinishedProto proto) throws IOException {
     this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
     this.state = TaskAttemptState.values()[proto.getState()];
     this.creationTime = proto.getCreationTime();
@@ -158,6 +169,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
         this.dataEvents.add(DataEventDependencyInfo.fromProto(protoEvent));
       }
     }
+    if (proto.getTaGeneratedEventsCount() > 0) {
+      this.taGeneratedEvents = Lists.newArrayListWithCapacity(proto.getTaGeneratedEventsCount());
+      for (TezEventProto eventProto : proto.getTaGeneratedEventsList()) {
+        this.taGeneratedEvents.add(TezEventUtils.fromProto(eventProto));
+      }
+    }
   }
 
   @Override
@@ -236,4 +253,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
     return creationCausalTA;
   }
 
+  public List<TezEvent> getTAGeneratedEvents() {
+    return taGeneratedEvents;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
new file mode 100644
index 0000000..4ad1c63
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexConfigurationDoneEvent.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.RootInputSpecUpdateProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexConfigurationDoneProto;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+
+import com.google.common.collect.Maps;
+
+public class VertexConfigurationDoneEvent implements HistoryEvent {
+
+  private TezVertexID vertexID;
+  private long reconfigureDoneTime;
+  private int numTasks;
+  private VertexLocationHint vertexLocationHint;
+  private Map<String, EdgeProperty> sourceEdgeProperties;
+  private Map<String, InputSpecUpdate> rootInputSpecUpdates;
+  private boolean setParallelismCalledFlag;
+
+  public VertexConfigurationDoneEvent() {  
+  }
+
+  public VertexConfigurationDoneEvent(TezVertexID vertexID,
+      long reconfigureDoneTime, int numTasks,
+      VertexLocationHint vertexLocationHint,
+      Map<String, EdgeProperty> sourceEdgeProperties,
+      Map<String, InputSpecUpdate> rootInputSpecUpdates,
+      boolean setParallelismCalledFlag) {
+    super();
+    this.vertexID = vertexID;
+    this.numTasks = numTasks;
+    this.vertexLocationHint = vertexLocationHint;
+    this.sourceEdgeProperties = sourceEdgeProperties;
+    this.rootInputSpecUpdates = rootInputSpecUpdates;
+    this.setParallelismCalledFlag = setParallelismCalledFlag;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.VERTEX_CONFIGURE_DONE;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return true;
+  }
+
+  public VertexConfigurationDoneProto toProto() {
+    VertexConfigurationDoneProto.Builder builder =
+        VertexConfigurationDoneProto.newBuilder();
+    builder.setVertexId(vertexID.toString())
+        .setReconfigureDoneTime(reconfigureDoneTime)
+        .setSetParallelismCalledFlag(setParallelismCalledFlag)
+        .setNumTasks(numTasks);
+
+    if (vertexLocationHint != null) {
+      builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
+            this.vertexLocationHint));
+    }
+    if (sourceEdgeProperties != null) {
+      for (Entry<String, EdgeProperty> entry :
+        sourceEdgeProperties.entrySet()) {
+        EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
+            EdgeManagerDescriptorProto.newBuilder();
+        edgeMgrBuilder.setEdgeName(entry.getKey());
+        edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue()));
+        builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
+      }
+    }
+    if (rootInputSpecUpdates != null) {
+      for (Entry<String, InputSpecUpdate> entry : rootInputSpecUpdates.entrySet()) {
+        RootInputSpecUpdateProto.Builder rootInputSpecUpdateBuilder = RootInputSpecUpdateProto
+            .newBuilder();
+        rootInputSpecUpdateBuilder.setInputName(entry.getKey());
+        rootInputSpecUpdateBuilder.setForAllWorkUnits(entry.getValue().isForAllWorkUnits());
+        rootInputSpecUpdateBuilder.addAllNumPhysicalInputs(entry.getValue()
+            .getAllNumPhysicalInputs());
+        builder.addRootInputSpecUpdates(rootInputSpecUpdateBuilder.build());
+      }
+    }
+    return builder.build();
+  }
+
+  public void fromProto(VertexConfigurationDoneProto proto) {
+    this.vertexID = TezVertexID.fromString(proto.getVertexId());
+    this.reconfigureDoneTime = proto.getReconfigureDoneTime();
+    this.setParallelismCalledFlag = proto.getSetParallelismCalledFlag();
+    this.numTasks = proto.getNumTasks();
+    if (proto.hasVertexLocationHint()) {
+      this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
+          proto.getVertexLocationHint());
+    }
+    if (proto.getEdgeManagerDescriptorsCount() > 0) {
+      this.sourceEdgeProperties = new HashMap<String, EdgeProperty>(
+          proto.getEdgeManagerDescriptorsCount());
+      for (EdgeManagerDescriptorProto edgeManagerProto :
+        proto.getEdgeManagerDescriptorsList()) {
+        EdgeProperty edgeProperty =
+            DagTypeConverters.convertFromProto(
+                edgeManagerProto.getEdgeProperty());
+        sourceEdgeProperties.put(edgeManagerProto.getEdgeName(),
+            edgeProperty);
+      }
+    }
+    if (proto.getRootInputSpecUpdatesCount() > 0) {
+      this.rootInputSpecUpdates = Maps.newHashMap();
+      for (RootInputSpecUpdateProto rootInputSpecUpdateProto : proto.getRootInputSpecUpdatesList()) {
+        InputSpecUpdate specUpdate;
+        if (rootInputSpecUpdateProto.getForAllWorkUnits()) {
+          specUpdate = InputSpecUpdate
+              .createAllTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputs(0));
+        } else {
+          specUpdate = InputSpecUpdate
+              .createPerTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputsList());
+        }
+        this.rootInputSpecUpdates.put(rootInputSpecUpdateProto.getInputName(), specUpdate);
+      }
+    }
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    VertexConfigurationDoneProto proto = VertexConfigurationDoneProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
+    fromProto(proto);
+  }
+
+  @Override
+  public String toString() {
+    return "vertexId=" + vertexID
+        + ", reconfigureDoneTime=" + reconfigureDoneTime
+        + ", numTasks=" + numTasks
+        + ", vertexLocationHint=" +
+        (vertexLocationHint == null? "null" : vertexLocationHint)
+        + ", edgeManagersCount=" +
+        (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size())
+        + ", rootInputSpecUpdateCount="
+        + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size())
+        + ", setParallelismCalledFlag=" + setParallelismCalledFlag;
+  }
+
+  public TezVertexID getVertexID() {
+    return this.vertexID;
+  }
+
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+  public VertexLocationHint getVertexLocationHint() {
+    return vertexLocationHint;
+  }
+
+  public Map<String, EdgeProperty> getSourceEdgeProperties() {
+    return sourceEdgeProperties;
+  }
+
+  public Map<String, InputSpecUpdate> getRootInputSpecUpdates() {
+    return rootInputSpecUpdates;
+  }
+
+  public long getReconfigureDoneTime() {
+    return reconfigureDoneTime;
+  }
+
+  public boolean isSetParallelismCalled() {
+    return setParallelismCalledFlag;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index 8947b5f..ec8f3e1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -21,28 +21,35 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collection;
 
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
 public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
 
   private TezDAGID dagID;
   private String vertexGroupName;
+  private Collection<TezVertexID> vertexIds;
   private long commitFinishTime;
 
   public VertexGroupCommitFinishedEvent() {
   }
 
   public VertexGroupCommitFinishedEvent(TezDAGID dagID,
-      String vertexGroupName, long commitFinishTime) {
+      String vertexGroupName, Collection<TezVertexID> vertexIds, long commitFinishTime) {
     this.dagID = dagID;
     this.vertexGroupName = vertexGroupName;
+    this.vertexIds = vertexIds;
     this.commitFinishTime = commitFinishTime;
   }
 
@@ -62,15 +69,28 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
   }
 
   public VertexGroupCommitFinishedProto toProto() {
+    Collection<String> vertexIdsStr = Collections2.transform(vertexIds, new Function<TezVertexID, String>(){
+      @Override
+      public String apply(TezVertexID vertexId) {
+        return vertexId.toString();
+      }
+    });
     return VertexGroupCommitFinishedProto.newBuilder()
         .setDagId(dagID.toString())
         .setVertexGroupName(vertexGroupName)
+        .addAllVertexIds(vertexIdsStr)
         .build();
   }
 
   public void fromProto(VertexGroupCommitFinishedProto proto) {
     this.dagID = TezDAGID.fromString(proto.getDagId());
     this.vertexGroupName = proto.getVertexGroupName();
+    this.vertexIds = Collections2.transform(proto.getVertexIdsList(), new Function<String, TezVertexID>() {
+      @Override
+      public TezVertexID apply(String input) {
+        return TezVertexID.fromString(input);
+      }
+    });
   }
 
   @Override
@@ -124,4 +144,8 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
     return dagID;
   }
 
+  public Collection<TezVertexID> getVertexIds() {
+    return vertexIds;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index c388957..3de355c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -21,28 +21,35 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collection;
 
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
 public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
   private TezDAGID dagID;
   private String vertexGroupName;
+  private Collection<TezVertexID> vertexIds;
   private long commitStartTime;
 
   public VertexGroupCommitStartedEvent() {
   }
 
   public VertexGroupCommitStartedEvent(TezDAGID dagID,
-      String vertexGroupName, long commitStartTime) {
+      String vertexGroupName, Collection<TezVertexID> vertexIds, long commitStartTime) {
     this.dagID = dagID;
     this.vertexGroupName = vertexGroupName;
+    this.vertexIds = vertexIds;
     this.commitStartTime = commitStartTime;
   }
 
@@ -62,15 +69,28 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
   }
 
   public VertexGroupCommitStartedProto toProto() {
+    Collection<String> vertexIdsStr = Collections2.transform(vertexIds, new Function<TezVertexID, String>(){
+      @Override
+      public String apply(TezVertexID vertexId) {
+        return vertexId.toString();
+      }
+    });
     return VertexGroupCommitStartedProto.newBuilder()
         .setDagId(dagID.toString())
         .setVertexGroupName(vertexGroupName)
+        .addAllVertexIds(vertexIdsStr)
         .build();
   }
 
   public void fromProto(VertexGroupCommitStartedProto proto) {
     this.dagID = TezDAGID.fromString(proto.getDagId());
     this.vertexGroupName = proto.getVertexGroupName();
+    this.vertexIds = Collections2.transform(proto.getVertexIdsList(), new Function<String, TezVertexID>() {
+      @Override
+      public TezVertexID apply(String input) {
+        return TezVertexID.fromString(input);
+      }
+    });
   }
 
   @Override
@@ -124,4 +144,7 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
     return dagID;
   }
 
+  public Collection<TezVertexID> getVertexIds() {
+    return vertexIds;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 01e0d3c..052908b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -31,9 +32,14 @@ import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.utils.TezEventUtils;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+import com.google.common.collect.Lists;
 
 public class VertexInitializedEvent implements HistoryEvent {
 
@@ -44,6 +50,7 @@ public class VertexInitializedEvent implements HistoryEvent {
   private int numTasks;
   private String processorName;
   private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs;
+  private List<TezEvent> initGeneratedEvents;
 
   public VertexInitializedEvent() {
   }
@@ -51,7 +58,8 @@ public class VertexInitializedEvent implements HistoryEvent {
   public VertexInitializedEvent(TezVertexID vertexId,
       String vertexName, long initRequestedTime, long initedTime,
       int numTasks, String processorName,
-      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs) {
+      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs,
+      List<TezEvent> initGeneratedEvents) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
@@ -59,6 +67,7 @@ public class VertexInitializedEvent implements HistoryEvent {
     this.numTasks = numTasks;
     this.processorName = processorName;
     this.additionalInputs = additionalInputs;
+    this.initGeneratedEvents = initGeneratedEvents;
   }
 
   @Override
@@ -76,7 +85,7 @@ public class VertexInitializedEvent implements HistoryEvent {
     return true;
   }
 
-  public RecoveryProtos.VertexInitializedProto toProto() {
+  public RecoveryProtos.VertexInitializedProto toProto() throws IOException {
     VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder();
     if (additionalInputs != null
       && !additionalInputs.isEmpty()) {
@@ -94,6 +103,11 @@ public class VertexInitializedEvent implements HistoryEvent {
         builder.addInputs(inputBuilder.build());
       }
     }
+    if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) {
+      for (TezEvent event : initGeneratedEvents) {
+        builder.addInitGeneratedEvents(TezEventUtils.toProto(event));
+      }
+    }
     return builder.setVertexId(vertexID.toString())
         .setVertexName(vertexName)
         .setInitRequestedTime(initRequestedTime)
@@ -102,7 +116,7 @@ public class VertexInitializedEvent implements HistoryEvent {
         .build();
   }
 
-  public void fromProto(RecoveryProtos.VertexInitializedProto proto) {
+  public void fromProto(RecoveryProtos.VertexInitializedProto proto) throws IOException {
     this.vertexID = TezVertexID.fromString(proto.getVertexId());
     this.vertexName = proto.getVertexName();
     this.initRequestedTime = proto.getInitRequestedTime();
@@ -123,6 +137,14 @@ public class VertexInitializedEvent implements HistoryEvent {
         additionalInputs.put(input.getName(), input);
       }
     }
+    int eventCount = proto.getInitGeneratedEventsCount();
+    if (eventCount > 0) {
+      this.initGeneratedEvents = Lists.newArrayListWithCapacity(eventCount);
+    }
+    for (TezEventProto eventProto :
+        proto.getInitGeneratedEventsList()) {
+      this.initGeneratedEvents.add(TezEventUtils.fromProto(eventProto));
+    }
   }
 
   @Override
@@ -149,7 +171,9 @@ public class VertexInitializedEvent implements HistoryEvent {
         + ", numTasks=" + numTasks
         + ", processorName=" + processorName
         + ", additionalInputsCount="
-        + (additionalInputs != null ? additionalInputs.size() : 0);
+        + (additionalInputs != null ? additionalInputs.size() : 0)
+        + ", initGeneratedEventsCount="
+        + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0);
   }
 
   public TezVertexID getVertexID() {
@@ -181,4 +205,7 @@ public class VertexInitializedEvent implements HistoryEvent {
     return vertexName;
   }
 
+  public List<TezEvent> getInitGeneratedEvents() {
+    return initGeneratedEvents;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
deleted file mode 100644
index 456e2a5..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.RootInputSpecUpdateProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
-import org.apache.tez.runtime.api.InputSpecUpdate;
-
-import com.google.common.collect.Maps;
-
-public class VertexParallelismUpdatedEvent implements HistoryEvent {
-
-  private TezVertexID vertexID;
-  private int numTasks;
-  private int oldNumTasks;
-  private VertexLocationHint vertexLocationHint;
-  private Map<String, EdgeProperty> sourceEdgeProperties;
-  private Map<String, InputSpecUpdate> rootInputSpecUpdates;
-  private long updateTime;
-
-  public VertexParallelismUpdatedEvent() {
-  }
-
-  public VertexParallelismUpdatedEvent(TezVertexID vertexID,
-      int numTasks, VertexLocationHint vertexLocationHint,
-      Map<String, EdgeProperty> sourceEdgeProperties,
-      Map<String, InputSpecUpdate> rootInputSpecUpdates, int oldNumTasks) {
-    this.vertexID = vertexID;
-    this.numTasks = numTasks;
-    this.vertexLocationHint = vertexLocationHint;
-    this.sourceEdgeProperties = sourceEdgeProperties;
-    this.rootInputSpecUpdates = rootInputSpecUpdates;
-    this.updateTime = System.currentTimeMillis();
-    this.oldNumTasks = oldNumTasks;
-  }
-
-  @Override
-  public HistoryEventType getEventType() {
-    return HistoryEventType.VERTEX_PARALLELISM_UPDATED;
-  }
-
-  @Override
-  public boolean isRecoveryEvent() {
-    return true;
-  }
-
-  @Override
-  public boolean isHistoryEvent() {
-    return true;
-  }
-
-  public VertexParallelismUpdatedProto toProto() {
-    VertexParallelismUpdatedProto.Builder builder =
-        VertexParallelismUpdatedProto.newBuilder();
-    builder.setVertexId(vertexID.toString())
-        .setNumTasks(numTasks);
-    if (vertexLocationHint != null) {
-      builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
-            this.vertexLocationHint));
-    }
-    if (sourceEdgeProperties != null) {
-      for (Entry<String, EdgeProperty> entry :
-        sourceEdgeProperties.entrySet()) {
-        EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
-            EdgeManagerDescriptorProto.newBuilder();
-        edgeMgrBuilder.setEdgeName(entry.getKey());
-        edgeMgrBuilder.setEdgeProperty(DagTypeConverters.convertToProto(entry.getValue()));
-        builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
-      }
-    }
-    if (rootInputSpecUpdates != null) {
-      for (Entry<String, InputSpecUpdate> entry : rootInputSpecUpdates.entrySet()) {
-        RootInputSpecUpdateProto.Builder rootInputSpecUpdateBuilder = RootInputSpecUpdateProto
-            .newBuilder();
-        rootInputSpecUpdateBuilder.setInputName(entry.getKey());
-        rootInputSpecUpdateBuilder.setForAllWorkUnits(entry.getValue().isForAllWorkUnits());
-        rootInputSpecUpdateBuilder.addAllNumPhysicalInputs(entry.getValue()
-            .getAllNumPhysicalInputs());
-        builder.addRootInputSpecUpdates(rootInputSpecUpdateBuilder.build());
-      }
-    }
-    return builder.build();
-  }
-
-  public void fromProto(VertexParallelismUpdatedProto proto) {
-    this.vertexID = TezVertexID.fromString(proto.getVertexId());
-    this.numTasks = proto.getNumTasks();
-    if (proto.hasVertexLocationHint()) {
-      this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
-          proto.getVertexLocationHint());
-    }
-    if (proto.getEdgeManagerDescriptorsCount() > 0) {
-      this.sourceEdgeProperties = new HashMap<String, EdgeProperty>(
-          proto.getEdgeManagerDescriptorsCount());
-      for (EdgeManagerDescriptorProto edgeManagerProto :
-        proto.getEdgeManagerDescriptorsList()) {
-        EdgeProperty edgeProperty =
-            DagTypeConverters.convertFromProto(
-                edgeManagerProto.getEdgeProperty());
-        sourceEdgeProperties.put(edgeManagerProto.getEdgeName(),
-            edgeProperty);
-      }
-    }
-    if (proto.getRootInputSpecUpdatesCount() > 0) {
-      this.rootInputSpecUpdates = Maps.newHashMap();
-      for (RootInputSpecUpdateProto rootInputSpecUpdateProto : proto.getRootInputSpecUpdatesList()) {
-        InputSpecUpdate specUpdate;
-        if (rootInputSpecUpdateProto.getForAllWorkUnits()) {
-          specUpdate = InputSpecUpdate
-              .createAllTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputs(0));
-        } else {
-          specUpdate = InputSpecUpdate
-              .createPerTaskInputSpecUpdate(rootInputSpecUpdateProto.getNumPhysicalInputsList());
-        }
-        this.rootInputSpecUpdates.put(rootInputSpecUpdateProto.getInputName(), specUpdate);
-      }
-    }
-  }
-
-  @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
-  }
-
-  @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
-    if (proto == null) {
-      throw new IOException("No data found in stream");
-    }
-    fromProto(proto);
-  }
-
-  @Override
-  public String toString() {
-    return "vertexId=" + vertexID
-        + ", numTasks=" + numTasks
-        + ", vertexLocationHint=" +
-        (vertexLocationHint == null? "null" : vertexLocationHint)
-        + ", edgeManagersCount=" +
-        (sourceEdgeProperties == null? "null" : sourceEdgeProperties.size()
-        + ", rootInputSpecUpdateCount="
-        + (rootInputSpecUpdates == null ? "null" : rootInputSpecUpdates.size()));
-  }
-
-  public TezVertexID getVertexID() {
-    return this.vertexID;
-  }
-
-  public int getNumTasks() {
-    return numTasks;
-  }
-
-  public VertexLocationHint getVertexLocationHint() {
-    return vertexLocationHint;
-  }
-
-  public Map<String, EdgeProperty> getSourceEdgeProperties() {
-    return sourceEdgeProperties;
-  }
-  
-  public Map<String, InputSpecUpdate> getRootInputSpecUpdates() {
-    return rootInputSpecUpdates;
-  }
-
-  public long getUpdateTime() {
-    return updateTime;
-  }
-
-  public int getOldNumTasks() {
-    return oldNumTasks;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
deleted file mode 100644
index 6f44f33..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexRecoverableEventsGeneratedEvent.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.events;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.tez.common.ProtoConverters;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.TezDataMovementEventProto;
-import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexDataMovementEventsGeneratedProto;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-import com.google.common.collect.Lists;
-
-// TODO PreCommit - rename this to VertexRecoverableEventGeneratedEvent
-public class VertexRecoverableEventsGeneratedEvent implements HistoryEvent {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      VertexRecoverableEventsGeneratedEvent.class);
-  private List<TezEvent> events;
-  private TezVertexID vertexID;
-
-  public VertexRecoverableEventsGeneratedEvent(TezVertexID vertexID,
-                                               List<TezEvent> events) {
-    this.vertexID = vertexID;
-    this.events = Lists.newArrayListWithCapacity(events.size());
-    for (TezEvent event : events) {
-      if (EnumSet.of(EventType.DATA_MOVEMENT_EVENT,
-          EventType.COMPOSITE_DATA_MOVEMENT_EVENT,
-          EventType.ROOT_INPUT_DATA_INFORMATION_EVENT,
-          EventType.ROOT_INPUT_INITIALIZER_EVENT)
-              .contains(event.getEventType())) {
-        this.events.add(event);
-      }
-    }
-    if (events.isEmpty()) {
-      throw new RuntimeException("Invalid creation of VertexDataMovementEventsGeneratedEvent"
-        + ", no data movement/information events provided");
-    }
-  }
-
-  public VertexRecoverableEventsGeneratedEvent() {
-  }
-
-  @Override
-  public HistoryEventType getEventType() {
-    return HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED;
-  }
-
-  @Override
-  public boolean isRecoveryEvent() {
-    return true;
-  }
-
-  @Override
-  public boolean isHistoryEvent() {
-    return false;
-  }
-
-  static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
-      EventMetaData eventMetaData) {
-    RecoveryProtos.EventMetaDataProto.Builder builder =
-        RecoveryProtos.EventMetaDataProto.newBuilder()
-        .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
-        .setEdgeVertexName(eventMetaData.getEdgeVertexName())
-        .setTaskVertexName(eventMetaData.getTaskVertexName());
-    if (eventMetaData.getTaskAttemptID() != null) {
-        builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
-    }
-    return builder.build();
-  }
-
-  static EventMetaData convertEventMetaDataFromProto(
-      RecoveryProtos.EventMetaDataProto proto) {
-    TezTaskAttemptID attemptID = null;
-    if (proto.hasTaskAttemptId()) {
-      attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
-    }
-    return new EventMetaData(
-        EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
-        proto.getTaskVertexName(),
-        proto.getEdgeVertexName(),
-        attemptID);
-  }
-
-  public VertexDataMovementEventsGeneratedProto toProto() {
-    List<TezDataMovementEventProto> tezEventProtos = null;
-    if (events != null) {
-      tezEventProtos = Lists.newArrayListWithCapacity(events.size());
-      for (TezEvent event : events) {
-        TezDataMovementEventProto.Builder evtBuilder =
-            TezDataMovementEventProto.newBuilder();
-        if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
-          evtBuilder.setCompositeDataMovementEvent(
-              ProtoConverters.convertCompositeDataMovementEventToProto(
-                  (CompositeDataMovementEvent) event.getEvent()));
-        } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
-          evtBuilder.setDataMovementEvent(
-              ProtoConverters.convertDataMovementEventToProto(
-                  (DataMovementEvent) event.getEvent()));
-        } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
-          evtBuilder.setRootInputDataInformationEvent(
-              ProtoConverters.convertRootInputDataInformationEventToProto(
-                  (InputDataInformationEvent) event.getEvent()));
-        } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
-          evtBuilder.setInputInitializerEvent(ProtoConverters
-              .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent()));
-        }
-        if (event.getSourceInfo() != null) {
-          evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
-        }
-        if (event.getDestinationInfo() != null) {
-          evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
-        }
-        evtBuilder.setEventTime(event.getEventReceivedTime());
-        tezEventProtos.add(evtBuilder.build());
-      }
-    }
-    return VertexDataMovementEventsGeneratedProto.newBuilder()
-        .setVertexId(vertexID.toString())
-        .addAllTezDataMovementEvent(tezEventProtos)
-        .build();
-  }
-
-  public void fromProto(VertexDataMovementEventsGeneratedProto proto) {
-    this.vertexID = TezVertexID.fromString(proto.getVertexId());
-    int eventCount = proto.getTezDataMovementEventCount();
-    if (eventCount > 0) {
-      this.events = Lists.newArrayListWithCapacity(eventCount);
-    }
-    for (TezDataMovementEventProto eventProto :
-        proto.getTezDataMovementEventList()) {
-      Event evt = null;
-      if (eventProto.hasCompositeDataMovementEvent()) {
-        evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
-            eventProto.getCompositeDataMovementEvent());
-      } else if (eventProto.hasDataMovementEvent()) {
-        evt = ProtoConverters.convertDataMovementEventFromProto(
-            eventProto.getDataMovementEvent());
-      } else if (eventProto.hasRootInputDataInformationEvent()) {
-        evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
-            eventProto.getRootInputDataInformationEvent());
-      } else if (eventProto.hasInputInitializerEvent()) {
-        evt = ProtoConverters.convertRootInputInitializerEventFromProto(
-            eventProto.getInputInitializerEvent());
-      }
-      EventMetaData sourceInfo = null;
-      EventMetaData destinationInfo = null;
-      if (eventProto.hasSourceInfo()) {
-        sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
-      }
-      if (eventProto.hasDestinationInfo()) {
-        destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
-      }
-      TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
-      tezEvent.setDestinationInfo(destinationInfo);
-      this.events.add(tezEvent);
-    }
-  }
-
-  @Override
-  public void toProtoStream(OutputStream outputStream) throws IOException {
-    toProto().writeDelimitedTo(outputStream);
-  }
-
-  @Override
-  public void fromProtoStream(InputStream inputStream) throws IOException {
-    VertexDataMovementEventsGeneratedProto proto =
-        VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
-    if (proto == null) {
-      throw new IOException("No data found in stream");
-    }
-    fromProto(proto);
-  }
-
-  @Override
-  public String toString() {
-    return "vertexId=" + vertexID.toString()
-        + ", eventCount=" + (events != null ? events.size() : "null");
-
-  }
-
-  public TezVertexID getVertexID() {
-    return this.vertexID;
-  }
-
-  public List<TezEvent> getTezEvents() {
-    return this.events;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index bf63045..c4e7e5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -42,7 +42,7 @@ import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
@@ -108,13 +108,12 @@ public class HistoryEventJsonConversion {
       case TASK_ATTEMPT_FINISHED:
         jsonObject = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
         break;
-      case VERTEX_PARALLELISM_UPDATED:
-        jsonObject = convertVertexParallelismUpdatedEvent((VertexParallelismUpdatedEvent) historyEvent);
+      case VERTEX_CONFIGURE_DONE:
+        jsonObject = convertVertexReconfigureDoneEvent((VertexConfigurationDoneEvent) historyEvent);
         break;
       case DAG_RECOVERED:
         jsonObject = convertDAGRecoveredEvent((DAGRecoveredEvent) historyEvent);
         break;
-      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
       case VERTEX_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_FINISHED:
@@ -662,7 +661,6 @@ public class HistoryEventJsonConversion {
     JSONObject otherInfo = new JSONObject();
     otherInfo.put(ATSConstants.START_TIME, event.getStartTime());
     otherInfo.put(ATSConstants.SCHEDULED_TIME, event.getScheduledTime());
-
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;
@@ -705,6 +703,41 @@ public class HistoryEventJsonConversion {
     return jsonObject;
   }
 
+  private static JSONObject convertVertexReconfigureDoneEvent(VertexConfigurationDoneEvent event) throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
+    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
+
+    // Events
+    JSONArray events = new JSONArray();
+    JSONObject updateEvent = new JSONObject();
+    updateEvent.put(ATSConstants.TIMESTAMP, event.getReconfigureDoneTime());
+    updateEvent.put(ATSConstants.EVENT_TYPE,
+        HistoryEventType.VERTEX_CONFIGURE_DONE.name());
+
+    JSONObject eventInfo = new JSONObject();
+    eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
+    if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
+      JSONObject updatedEdgeManagers = new JSONObject();
+      for (Entry<String, EdgeProperty> entry :
+          event.getSourceEdgeProperties().entrySet()) {
+        updatedEdgeManagers.put(entry.getKey(),
+            new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue())));
+      }
+      eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
+    }
+    updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
+    events.put(updateEvent);
+    jsonObject.put(ATSConstants.EVENTS, events);
+
+    // Other info
+    JSONObject otherInfo = new JSONObject();
+    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
+
+    // TODO add more on all other updated information
+    return jsonObject;
+  }
+
   private static JSONObject convertVertexInitializedEvent(VertexInitializedEvent event) throws JSONException {
     JSONObject jsonObject = new JSONObject();
     jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
@@ -773,42 +806,4 @@ public class HistoryEventJsonConversion {
     return jsonObject;
   }
 
-  private static JSONObject convertVertexParallelismUpdatedEvent(
-      VertexParallelismUpdatedEvent event) throws JSONException {
-    JSONObject jsonObject = new JSONObject();
-    jsonObject.put(ATSConstants.ENTITY, event.getVertexID().toString());
-    jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
-    // Events
-    JSONArray events = new JSONArray();
-    JSONObject updateEvent = new JSONObject();
-    updateEvent.put(ATSConstants.TIMESTAMP, event.getUpdateTime());
-    updateEvent.put(ATSConstants.EVENT_TYPE,
-        HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
-
-    JSONObject eventInfo = new JSONObject();
-    eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
-    eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
-    if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
-      JSONObject updatedEdgeManagers = new JSONObject();
-      for (Entry<String, EdgeProperty> entry :
-          event.getSourceEdgeProperties().entrySet()) {
-        updatedEdgeManagers.put(entry.getKey(),
-            new JSONObject(DAGUtils.convertEdgeProperty(entry.getValue())));
-      }
-      eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
-    }
-    updateEvent.put(ATSConstants.EVENT_INFO, eventInfo);
-    events.put(updateEvent);
-    jsonObject.put(ATSConstants.EVENTS, events);
-
-    // Other info
-    JSONObject otherInfo = new JSONObject();
-    otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
-    jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
-    // TODO add more on all other updated information
-    return jsonObject;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 585050d..fed4f3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -50,7 +50,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class RecoveryService extends AbstractService {
 
   private static final Logger LOG = LoggerFactory.getLogger(RecoveryService.class);
-  private final AppContext appContext;
+  protected final AppContext appContext;
 
   public static final String RECOVERY_FATAL_OCCURRED_DIR =
       "RecoveryFatalErrorOccurred";
@@ -73,7 +73,7 @@ public class RecoveryService extends AbstractService {
   private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
   private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
 
-  private Thread eventHandlingThread;
+  public Thread eventHandlingThread;
   private AtomicBoolean stopped = new AtomicBoolean(false);
   private AtomicBoolean started = new AtomicBoolean(false);
   private int eventCounter = 0;
@@ -374,7 +374,7 @@ public class RecoveryService extends AbstractService {
     }
   }
 
-  private void handleSummaryEvent(TezDAGID dagID,
+  protected void handleSummaryEvent(TezDAGID dagID,
       HistoryEventType eventType,
       SummaryEvent summaryEvent) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -506,4 +506,8 @@ public class RecoveryService extends AbstractService {
       Thread.yield();
     }
   }
+
+  public void setStopped(boolean stopped) {
+    this.stopped.set(stopped);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
new file mode 100644
index 0000000..cc89b9f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.utils;
+
+import java.io.IOException;
+import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.TezEventProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+public class TezEventUtils {
+
+  public static TezEventProto toProto(TezEvent event) throws IOException {
+    TezEventProto.Builder evtBuilder =
+        TezEventProto.newBuilder();
+    if (event.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)) {
+      evtBuilder.setCompositeDataMovementEvent(
+          ProtoConverters.convertCompositeDataMovementEventToProto(
+              (CompositeDataMovementEvent) event.getEvent()));
+    } else if (event.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)) {
+      evtBuilder.setDataMovementEvent(
+          ProtoConverters.convertDataMovementEventToProto(
+              (DataMovementEvent) event.getEvent()));
+    } else if (event.getEventType().equals(EventType.ROOT_INPUT_INITIALIZER_EVENT)) {
+      evtBuilder.setInputInitializerEvent(ProtoConverters
+          .convertRootInputInitializerEventToProto((InputInitializerEvent) event.getEvent()));
+    } else if (event.getEventType().equals(EventType.VERTEX_MANAGER_EVENT)) {
+      evtBuilder.setVmEvent(ProtoConverters
+          .convertVertexManagerEventToProto((VertexManagerEvent)event.getEvent()));
+    } else if (event.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
+      evtBuilder.setRootInputDataInformationEvent(
+          ProtoConverters.convertRootInputDataInformationEventToProto(
+              (InputDataInformationEvent) event.getEvent()));
+    } else {
+      throw new IOException("Unsupported TezEvent type:" + event.getEventType());
+    }
+
+    if (event.getSourceInfo() != null) {
+      evtBuilder.setSourceInfo(convertEventMetaDataToProto(event.getSourceInfo()));
+    }
+    if (event.getDestinationInfo() != null) {
+      evtBuilder.setDestinationInfo(convertEventMetaDataToProto(event.getDestinationInfo()));
+    }
+    evtBuilder.setEventTime(event.getEventReceivedTime());
+    return evtBuilder.build();
+  }
+
+  public static TezEvent fromProto(TezEventProto eventProto) throws IOException {
+    Event evt = null;
+    if (eventProto.hasCompositeDataMovementEvent()) {
+      evt = ProtoConverters.convertCompositeDataMovementEventFromProto(
+          eventProto.getCompositeDataMovementEvent());
+    } else if (eventProto.hasDataMovementEvent()) {
+      evt = ProtoConverters.convertDataMovementEventFromProto(
+          eventProto.getDataMovementEvent());
+    } else if (eventProto.hasInputInitializerEvent()) {
+      evt = ProtoConverters.convertRootInputInitializerEventFromProto(
+          eventProto.getInputInitializerEvent());
+    } else if (eventProto.hasVmEvent()) {
+      evt = ProtoConverters.convertVertexManagerEventFromProto(
+          eventProto.getVmEvent());
+    } else if (eventProto.hasRootInputDataInformationEvent()) {
+      evt = ProtoConverters.convertRootInputDataInformationEventFromProto(
+          eventProto.getRootInputDataInformationEvent());
+    } else {
+      throw new IOException("Unsupported TezEvent type");
+    }
+
+    EventMetaData sourceInfo = null;
+    EventMetaData destinationInfo = null;
+    if (eventProto.hasSourceInfo()) {
+      sourceInfo = convertEventMetaDataFromProto(eventProto.getSourceInfo());
+    }
+    if (eventProto.hasDestinationInfo()) {
+      destinationInfo = convertEventMetaDataFromProto(eventProto.getDestinationInfo());
+    }
+    TezEvent tezEvent = new TezEvent(evt, sourceInfo, eventProto.getEventTime());
+    tezEvent.setDestinationInfo(destinationInfo);
+    return tezEvent;
+  }
+  
+  public static RecoveryProtos.EventMetaDataProto convertEventMetaDataToProto(
+      EventMetaData eventMetaData) {
+    RecoveryProtos.EventMetaDataProto.Builder builder =
+        RecoveryProtos.EventMetaDataProto.newBuilder()
+        .setProducerConsumerType(eventMetaData.getEventGenerator().ordinal())
+        .setEdgeVertexName(eventMetaData.getEdgeVertexName())
+        .setTaskVertexName(eventMetaData.getTaskVertexName());
+    if (eventMetaData.getTaskAttemptID() != null) {
+        builder.setTaskAttemptId(eventMetaData.getTaskAttemptID().toString());
+    }
+    return builder.build();
+  }
+
+  public static EventMetaData convertEventMetaDataFromProto(
+      RecoveryProtos.EventMetaDataProto proto) {
+    TezTaskAttemptID attemptID = null;
+    if (proto.hasTaskAttemptId()) {
+      attemptID = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
+    }
+    return new EventMetaData(
+        EventMetaData.EventProducerConsumerType.values()[proto.getProducerConsumerType()],
+        proto.getTaskVertexName(),
+        proto.getEdgeVertexName(),
+        attemptID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 5392d8a..b9e4507 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -90,6 +90,8 @@ message VertexInitializedProto {
   optional int64 init_time = 4;
   optional int32 num_tasks = 5;
   repeated RootInputLeafOutputProto inputs = 6;
+  repeated TezEventProto init_generated_events = 7; 
+  optional bool isReconfigurePlanned = 8;
 }
 
 message VertexStartedProto {
@@ -99,6 +101,16 @@ message VertexStartedProto {
   optional int64 start_time = 4;
 }
 
+message VertexConfigurationDoneProto {
+  optional string vertex_id = 1;
+  optional int64 reconfigure_done_time = 2;
+  optional int32 num_tasks = 3;
+  optional VertexLocationHintProto vertex_location_hint = 4;
+  repeated EdgeManagerDescriptorProto edge_manager_descriptors = 5;
+  repeated RootInputSpecUpdateProto root_input_spec_updates = 6;
+  optional bool setParallelismCalled_flag = 7;
+}
+
 message EdgeManagerDescriptorProto {
   optional string edge_name = 1;
   optional PlanEdgeProperty edge_property = 2;
@@ -110,14 +122,6 @@ message RootInputSpecUpdateProto {
   repeated int32 num_physical_inputs = 3;
 }
 
-message VertexParallelismUpdatedProto {
-  optional string vertex_id = 1;
-  optional int32 num_tasks = 2;
-  optional VertexLocationHintProto vertex_location_hint = 3;
-  repeated EdgeManagerDescriptorProto edge_manager_descriptors = 4;
-  repeated RootInputSpecUpdateProto root_input_spec_updates = 5;
-}
-
 message VertexCommitStartedProto {
   optional string vertex_id = 1;
 }
@@ -129,11 +133,13 @@ message VertexCommitFinishedProto {
 message VertexGroupCommitStartedProto {
   optional string dag_id = 1;
   optional string vertex_group_name = 2;
+  repeated string vertex_ids = 3;
 }
 
 message VertexGroupCommitFinishedProto {
   optional string dag_id = 1;
   optional string vertex_group_name = 2;
+  repeated string vertex_ids = 3;
 }
 
 message VertexFinishedProto {
@@ -183,6 +189,7 @@ message TaskAttemptFinishedProto {
   optional TezCountersProto counters = 9;
   optional string error_enum = 10;
   repeated DataEventDependencyInfoProto data_events = 11;
+  repeated TezEventProto ta_generated_events = 12;
 }
 
 message EventMetaDataProto {
@@ -192,7 +199,7 @@ message EventMetaDataProto {
   optional string task_attempt_id = 4;
 }
 
-message TezDataMovementEventProto {
+message TezEventProto {
   optional EventMetaDataProto source_info = 1;
   optional EventMetaDataProto destination_info = 2;
   optional DataMovementEventProto data_movement_event = 3;
@@ -200,11 +207,7 @@ message TezDataMovementEventProto {
   optional RootInputDataInformationEventProto root_input_data_information_event = 5;
   optional RootInputInitializerEventProto input_initializer_event = 6;
   optional int64 event_time = 7;
-}
-
-message VertexDataMovementEventsGeneratedProto {
-  optional string vertex_id = 1;
-  repeated TezDataMovementEventProto tez_data_movement_event = 2;
+  optional VertexManagerEventProto vm_event = 8;
 }
 
 message SummaryEventProto {

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
index 521ed50..0159372 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
@@ -33,12 +33,7 @@ public class TestVertexStatusBuilder {
           VertexStatusBuilder.getProtoState(state);
       VertexStatus.State clientState =
           VertexStatus.getState(stateProto);
-      if (state.equals(VertexState.RECOVERING)) {
-        Assert.assertEquals(clientState.name(),
-            State.NEW.name());
-      } else {
-        Assert.assertEquals(state.name(), clientState.name());
-      }
+      Assert.assertEquals(state.name(), clientState.name());
     }
   }