You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/06/21 00:36:01 UTC

[23/23] git commit: TEZ-1159. Fix handling of corrupt or empty files in recovery data. (hitesh)

TEZ-1159. Fix handling of corrupt or empty files in recovery data.
(hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/43480dac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/43480dac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/43480dac

Branch: refs/heads/branch-0.4.1-incubating
Commit: 43480dac903515b8a28da31f1616c235f6158def
Parents: ba92893
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jun 20 15:19:44 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jun 20 15:34:46 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/RecoveryParser.java  | 57 ++++++++++++++------
 .../tez/dag/history/events/AMLaunchedEvent.java |  3 ++
 .../tez/dag/history/events/AMStartedEvent.java  |  3 ++
 .../history/events/ContainerLaunchedEvent.java  |  3 ++
 .../history/events/DAGCommitStartedEvent.java   |  3 ++
 .../dag/history/events/DAGFinishedEvent.java    |  3 ++
 .../dag/history/events/DAGInitializedEvent.java |  3 ++
 .../tez/dag/history/events/DAGStartedEvent.java |  3 ++
 .../dag/history/events/DAGSubmittedEvent.java   |  3 ++
 .../events/TaskAttemptFinishedEvent.java        |  3 ++
 .../history/events/TaskAttemptStartedEvent.java |  3 ++
 .../dag/history/events/TaskFinishedEvent.java   |  3 ++
 .../dag/history/events/TaskStartedEvent.java    |  3 ++
 .../events/VertexCommitStartedEvent.java        |  3 ++
 .../VertexDataMovementEventsGeneratedEvent.java |  3 ++
 .../dag/history/events/VertexFinishedEvent.java |  3 ++
 .../events/VertexGroupCommitFinishedEvent.java  |  3 ++
 .../events/VertexGroupCommitStartedEvent.java   |  3 ++
 .../history/events/VertexInitializedEvent.java  |  3 ++
 .../events/VertexParallelismUpdatedEvent.java   |  3 ++
 .../dag/history/events/VertexStartedEvent.java  |  3 ++
 21 files changed, 102 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 093069c..c5ad340 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -109,9 +110,13 @@ public class RecoveryParser {
 
   private static void parseSummaryFile(FSDataInputStream inputStream)
       throws IOException {
-    while (inputStream.available() > 0) {
+    while (true) {
       RecoveryProtos.SummaryEventProto proto =
           RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+      if (proto == null) {
+        LOG.info("Reached end of summary stream");
+        break;
+      }
       LOG.info("[SUMMARY]"
           + " dagId=" + proto.getDagId()
           + ", timestamp=" + proto.getTimestamp()
@@ -121,7 +126,12 @@ public class RecoveryParser {
 
   private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
       throws IOException {
-    int eventTypeOrdinal = inputStream.readInt();
+    int eventTypeOrdinal = -1;
+    try {
+      eventTypeOrdinal = inputStream.readInt();
+    } catch (EOFException eof) {
+      return null;
+    }
     if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
         HistoryEventType.values().length) {
       // Corrupt data
@@ -201,7 +211,11 @@ public class RecoveryParser {
       LOG.debug("Parsing event from input stream"
           + ", eventType=" + eventType);
     }
-    event.fromProtoStream(inputStream);
+    try {
+      event.fromProtoStream(inputStream);
+    } catch (EOFException eof) {
+      return null;
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Parsed event from input stream"
           + ", eventType=" + eventType
@@ -216,8 +230,12 @@ public class RecoveryParser {
 
   private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
       throws IOException {
-    while (inputStream.available() > 0) {
+    while (true) {
       HistoryEvent historyEvent = getNextEvent(inputStream);
+      if (historyEvent == null) {
+        LOG.info("Reached end of stream");
+        break;
+      }
       LOG.info("Parsed event from recovery stream"
           + ", eventType=" + historyEvent.getEventType()
           + ", event=" + historyEvent);
@@ -392,12 +410,6 @@ public class RecoveryParser {
     void handleSummaryEvent(SummaryEventProto proto) throws IOException {
       HistoryEventType eventType =
           HistoryEventType.values()[proto.getEventType()];
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("[RECOVERY SUMMARY]"
-            + " dagId=" + proto.getDagId()
-            + ", timestamp=" + proto.getTimestamp()
-            + ", event=" + eventType);
-      }
       switch (eventType) {
         case DAG_SUBMITTED:
           completed = false;
@@ -534,9 +546,18 @@ public class RecoveryParser {
     int dagCounter = 0;
     Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
         new HashMap<TezDAGID, DAGSummaryData>();
-    while (summaryStream.available() > 0) {
-      RecoveryProtos.SummaryEventProto proto =
-          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+    while (true) {
+      RecoveryProtos.SummaryEventProto proto;
+      try {
+        proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+        if (proto == null) {
+          LOG.info("Reached end of summary stream");
+          break;
+        }
+      } catch (EOFException eof) {
+        LOG.info("Reached end of summary stream");
+        break;
+      }
       HistoryEventType eventType =
           HistoryEventType.values()[proto.getEventType()];
       if (LOG.isDebugEnabled()) {
@@ -612,17 +633,23 @@ public class RecoveryParser {
         getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
 
     boolean skipAllOtherEvents = false;
-    while (dagRecoveryStream.available() > 0) {
+    while (true) {
       HistoryEvent event;
       try {
         event = getNextEvent(dagRecoveryStream);
+        if (event == null) {
+          LOG.info("Reached end of dag recovery stream");
+          break;
+        }
+      } catch (EOFException eof) {
+        LOG.info("Reached end of dag recovery stream");
+        break;
       } catch (IOException ioe) {
         LOG.warn("Corrupt data found when trying to read next event", ioe);
         break;
       }
       if (event == null || skipAllOtherEvents) {
         // reached end of data
-        event = null;
         break;
       }
       HistoryEventType eventType = event.getEventType();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 54bc658..558f91b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -136,6 +136,9 @@ public class AMLaunchedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     AMLaunchedProto proto = AMLaunchedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index e66141b..167b85b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -126,6 +126,9 @@ public class AMStartedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     AMStartedProto proto = AMStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 471ddd1..3046f30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -129,6 +129,9 @@ public class ContainerLaunchedEvent implements HistoryEvent {
   public void fromProtoStream(InputStream inputStream) throws IOException {
     ContainerLaunchedProto proto =
         ContainerLaunchedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 627751a..590e1fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -84,6 +84,9 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 14381b3..4b7cb73 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -152,6 +152,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     DAGFinishedProto proto = DAGFinishedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 9b001b6..479515a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -91,6 +91,9 @@ public class DAGInitializedEvent implements HistoryEvent {
   public void fromProtoStream(InputStream inputStream) throws IOException {
     RecoveryProtos.DAGInitializedProto proto =
         RecoveryProtos.DAGInitializedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index a1bcdf2..7d094fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -104,6 +104,9 @@ public class DAGStartedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     DAGStartedProto proto = DAGStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 18f2205..ff03a4e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -167,6 +167,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     DAGSubmittedProto proto = DAGSubmittedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index ecb6818..933f289 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -146,6 +146,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
   public void fromProtoStream(InputStream inputStream) throws IOException {
     TaskAttemptFinishedProto proto =
         TaskAttemptFinishedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index ba91db8..06f9e38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -144,6 +144,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     TaskAttemptStartedProto proto = TaskAttemptStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 713ecd8..eeb8128 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -141,6 +141,9 @@ public class TaskFinishedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     TaskFinishedProto proto = TaskFinishedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index c2a380b..2b073fc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -121,6 +121,9 @@ public class TaskStartedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     TaskStartedProto proto = TaskStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 387bff1..a6dd844 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -86,6 +86,9 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 035c9ca..68dcc43 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -196,6 +196,9 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
   public void fromProtoStream(InputStream inputStream) throws IOException {
     VertexDataMovementEventsGeneratedProto proto =
         VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8321a38..3f530c2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -156,6 +156,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     VertexFinishedProto proto = VertexFinishedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index 99a5288..2a385b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -89,6 +89,9 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index 04d6276..938f4a2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -89,6 +89,9 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index e9e4a8c..d2185f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -176,6 +176,9 @@ public class VertexInitializedEvent implements HistoryEvent {
   public void fromProtoStream(InputStream inputStream) throws IOException {
     RecoveryProtos.VertexInitializedProto proto =
         RecoveryProtos.VertexInitializedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
index 43cc787..6250cb3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -128,6 +128,9 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index e6023f1..139f90d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -118,6 +118,9 @@ public class VertexStartedEvent implements HistoryEvent {
   @Override
   public void fromProtoStream(InputStream inputStream) throws IOException {
     VertexStartedProto proto = VertexStartedProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
     fromProto(proto);
   }