You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/06/21 00:36:01 UTC
[23/23] git commit: TEZ-1159. Fix handling of corrupt or empty files
in recovery data. (hitesh)
TEZ-1159. Fix handling of corrupt or empty files in recovery data.
(hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/43480dac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/43480dac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/43480dac
Branch: refs/heads/branch-0.4.1-incubating
Commit: 43480dac903515b8a28da31f1616c235f6158def
Parents: ba92893
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Jun 20 15:19:44 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Jun 20 15:34:46 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/dag/app/RecoveryParser.java | 57 ++++++++++++++------
.../tez/dag/history/events/AMLaunchedEvent.java | 3 ++
.../tez/dag/history/events/AMStartedEvent.java | 3 ++
.../history/events/ContainerLaunchedEvent.java | 3 ++
.../history/events/DAGCommitStartedEvent.java | 3 ++
.../dag/history/events/DAGFinishedEvent.java | 3 ++
.../dag/history/events/DAGInitializedEvent.java | 3 ++
.../tez/dag/history/events/DAGStartedEvent.java | 3 ++
.../dag/history/events/DAGSubmittedEvent.java | 3 ++
.../events/TaskAttemptFinishedEvent.java | 3 ++
.../history/events/TaskAttemptStartedEvent.java | 3 ++
.../dag/history/events/TaskFinishedEvent.java | 3 ++
.../dag/history/events/TaskStartedEvent.java | 3 ++
.../events/VertexCommitStartedEvent.java | 3 ++
.../VertexDataMovementEventsGeneratedEvent.java | 3 ++
.../dag/history/events/VertexFinishedEvent.java | 3 ++
.../events/VertexGroupCommitFinishedEvent.java | 3 ++
.../events/VertexGroupCommitStartedEvent.java | 3 ++
.../history/events/VertexInitializedEvent.java | 3 ++
.../events/VertexParallelismUpdatedEvent.java | 3 ++
.../dag/history/events/VertexStartedEvent.java | 3 ++
21 files changed, 102 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 093069c..c5ad340 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -109,9 +110,13 @@ public class RecoveryParser {
private static void parseSummaryFile(FSDataInputStream inputStream)
throws IOException {
- while (inputStream.available() > 0) {
+ while (true) {
RecoveryProtos.SummaryEventProto proto =
RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ LOG.info("Reached end of summary stream");
+ break;
+ }
LOG.info("[SUMMARY]"
+ " dagId=" + proto.getDagId()
+ ", timestamp=" + proto.getTimestamp()
@@ -121,7 +126,12 @@ public class RecoveryParser {
private static HistoryEvent getNextEvent(FSDataInputStream inputStream)
throws IOException {
- int eventTypeOrdinal = inputStream.readInt();
+ int eventTypeOrdinal = -1;
+ try {
+ eventTypeOrdinal = inputStream.readInt();
+ } catch (EOFException eof) {
+ return null;
+ }
if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
HistoryEventType.values().length) {
// Corrupt data
@@ -201,7 +211,11 @@ public class RecoveryParser {
LOG.debug("Parsing event from input stream"
+ ", eventType=" + eventType);
}
- event.fromProtoStream(inputStream);
+ try {
+ event.fromProtoStream(inputStream);
+ } catch (EOFException eof) {
+ return null;
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Parsed event from input stream"
+ ", eventType=" + eventType
@@ -216,8 +230,12 @@ public class RecoveryParser {
private static void parseDAGRecoveryFile(FSDataInputStream inputStream)
throws IOException {
- while (inputStream.available() > 0) {
+ while (true) {
HistoryEvent historyEvent = getNextEvent(inputStream);
+ if (historyEvent == null) {
+ LOG.info("Reached end of stream");
+ break;
+ }
LOG.info("Parsed event from recovery stream"
+ ", eventType=" + historyEvent.getEventType()
+ ", event=" + historyEvent);
@@ -392,12 +410,6 @@ public class RecoveryParser {
void handleSummaryEvent(SummaryEventProto proto) throws IOException {
HistoryEventType eventType =
HistoryEventType.values()[proto.getEventType()];
- if (LOG.isDebugEnabled()) {
- LOG.debug("[RECOVERY SUMMARY]"
- + " dagId=" + proto.getDagId()
- + ", timestamp=" + proto.getTimestamp()
- + ", event=" + eventType);
- }
switch (eventType) {
case DAG_SUBMITTED:
completed = false;
@@ -534,9 +546,18 @@ public class RecoveryParser {
int dagCounter = 0;
Map<TezDAGID, DAGSummaryData> dagSummaryDataMap =
new HashMap<TezDAGID, DAGSummaryData>();
- while (summaryStream.available() > 0) {
- RecoveryProtos.SummaryEventProto proto =
- RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+ while (true) {
+ RecoveryProtos.SummaryEventProto proto;
+ try {
+ proto = RecoveryProtos.SummaryEventProto.parseDelimitedFrom(summaryStream);
+ if (proto == null) {
+ LOG.info("Reached end of summary stream");
+ break;
+ }
+ } catch (EOFException eof) {
+ LOG.info("Reached end of summary stream");
+ break;
+ }
HistoryEventType eventType =
HistoryEventType.values()[proto.getEventType()];
if (LOG.isDebugEnabled()) {
@@ -612,17 +633,23 @@ public class RecoveryParser {
getDAGRecoveryOutputStream(currentAttemptRecoveryDataDir, lastInProgressDAG);
boolean skipAllOtherEvents = false;
- while (dagRecoveryStream.available() > 0) {
+ while (true) {
HistoryEvent event;
try {
event = getNextEvent(dagRecoveryStream);
+ if (event == null) {
+ LOG.info("Reached end of dag recovery stream");
+ break;
+ }
+ } catch (EOFException eof) {
+ LOG.info("Reached end of dag recovery stream");
+ break;
} catch (IOException ioe) {
LOG.warn("Corrupt data found when trying to read next event", ioe);
break;
}
if (event == null || skipAllOtherEvents) {
// reached end of data
- event = null;
break;
}
HistoryEventType eventType = event.getEventType();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 54bc658..558f91b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -136,6 +136,9 @@ public class AMLaunchedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
AMLaunchedProto proto = AMLaunchedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index e66141b..167b85b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -126,6 +126,9 @@ public class AMStartedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
AMStartedProto proto = AMStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 471ddd1..3046f30 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -129,6 +129,9 @@ public class ContainerLaunchedEvent implements HistoryEvent {
public void fromProtoStream(InputStream inputStream) throws IOException {
ContainerLaunchedProto proto =
ContainerLaunchedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 627751a..590e1fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -84,6 +84,9 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 14381b3..4b7cb73 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -152,6 +152,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
DAGFinishedProto proto = DAGFinishedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 9b001b6..479515a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -91,6 +91,9 @@ public class DAGInitializedEvent implements HistoryEvent {
public void fromProtoStream(InputStream inputStream) throws IOException {
RecoveryProtos.DAGInitializedProto proto =
RecoveryProtos.DAGInitializedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index a1bcdf2..7d094fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -104,6 +104,9 @@ public class DAGStartedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
DAGStartedProto proto = DAGStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 18f2205..ff03a4e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -167,6 +167,9 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
DAGSubmittedProto proto = DAGSubmittedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index ecb6818..933f289 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -146,6 +146,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
public void fromProtoStream(InputStream inputStream) throws IOException {
TaskAttemptFinishedProto proto =
TaskAttemptFinishedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index ba91db8..06f9e38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -144,6 +144,9 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
TaskAttemptStartedProto proto = TaskAttemptStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 713ecd8..eeb8128 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -141,6 +141,9 @@ public class TaskFinishedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
TaskFinishedProto proto = TaskFinishedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index c2a380b..2b073fc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -121,6 +121,9 @@ public class TaskStartedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
TaskStartedProto proto = TaskStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 387bff1..a6dd844 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -86,6 +86,9 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 035c9ca..68dcc43 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -196,6 +196,9 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
public void fromProtoStream(InputStream inputStream) throws IOException {
VertexDataMovementEventsGeneratedProto proto =
VertexDataMovementEventsGeneratedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8321a38..3f530c2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -156,6 +156,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
VertexFinishedProto proto = VertexFinishedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index 99a5288..2a385b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -89,6 +89,9 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
VertexGroupCommitFinishedProto proto = VertexGroupCommitFinishedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index 04d6276..938f4a2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -89,6 +89,9 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
VertexGroupCommitStartedProto proto = VertexGroupCommitStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index e9e4a8c..d2185f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -176,6 +176,9 @@ public class VertexInitializedEvent implements HistoryEvent {
public void fromProtoStream(InputStream inputStream) throws IOException {
RecoveryProtos.VertexInitializedProto proto =
RecoveryProtos.VertexInitializedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
index 43cc787..6250cb3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -128,6 +128,9 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/43480dac/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index e6023f1..139f90d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -118,6 +118,9 @@ public class VertexStartedEvent implements HistoryEvent {
@Override
public void fromProtoStream(InputStream inputStream) throws IOException {
VertexStartedProto proto = VertexStartedProto.parseDelimitedFrom(inputStream);
+ if (proto == null) {
+ throw new IOException("No data found in stream");
+ }
fromProto(proto);
}