You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/12/12 02:27:33 UTC
git commit: TEZ-533. Exception thrown by a VertexCommitter kills the
AM instead of just the DAG. (hitesh)
Updated Branches:
refs/heads/master 6fcf78ca9 -> 18da4e29c
TEZ-533. Exception thrown by a VertexCommitter kills the AM instead of just the DAG. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/18da4e29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/18da4e29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/18da4e29
Branch: refs/heads/master
Commit: 18da4e29c23a4af5e7c36513b3a0097cde50eb29
Parents: 6fcf78c
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Dec 11 17:27:13 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Dec 11 17:27:13 2013 -0800
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 22 +++++---
.../tez/dag/app/dag/impl/TestVertexImpl.java | 55 +++++++++++++++++---
2 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18da4e29/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f068617..5f89df7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1046,10 +1046,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
try {
if (!vertex.committed.getAndSet(true)) {
// commit only once
+ LOG.info("Invoking committer commit for vertex, vertexId="
+ + vertex.logIdentifier);
vertex.committer.commitVertex();
}
- } catch (IOException e) {
- LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
+ } catch (Exception e) {
+ LOG.error("Failed to do commit on vertex, vertexId="
+ + vertex.logIdentifier, e);
vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
return vertex.finished(VertexState.FAILED);
}
@@ -1162,11 +1165,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
committer = new MRVertexOutputCommitter();
}
try {
+ LOG.info("Invoking committer init for vertex, vertexId=" + logIdentifier);
committer.init(vertexContext);
+ LOG.info("Invoking committer setup for vertex, vertexId="
+ + logIdentifier);
committer.setupVertex();
- } catch (IOException e) {
- LOG.warn("Vertex init failed", e);
- addDiagnostic("Job init failed : "
+ } catch (Exception e) {
+ LOG.warn("Vertex init failed, vertexId=" + logIdentifier, e);
+ addDiagnostic("Vertex init failed : "
+ StringUtils.stringifyException(e));
trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
abortVertex(VertexStatus.State.FAILED);
@@ -1578,9 +1584,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
private void abortVertex(VertexStatus.State finalState) {
try {
+ LOG.info("Invoking committer abort for vertex, vertexId="
+ + logIdentifier);
committer.abortVertex(finalState);
- } catch (IOException e) {
- LOG.warn("Could not abort vertex, name=" + getName(), e);
+ } catch (Exception e) {
+ LOG.warn("Could not abort vertex, vertexId=" + logIdentifier, e);
}
if (finishTime == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18da4e29/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 0b8fde0..78c7dcc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -141,15 +141,18 @@ public class TestVertexImpl {
public int abortCounter = 0;
private boolean throwError;
private boolean throwErrorOnAbort;
+ private boolean throwRuntimeException;
public CountingVertexOutputCommitter(boolean throwError,
- boolean throwOnAbort) {
+ boolean throwOnAbort,
+ boolean throwRuntimeException) {
this.throwError = throwError;
this.throwErrorOnAbort = throwOnAbort;
+ this.throwRuntimeException = throwRuntimeException;
}
public CountingVertexOutputCommitter() {
- this(false, false);
+ this(false, false, false);
}
@Override
@@ -166,7 +169,11 @@ public class TestVertexImpl {
public void commitVertex() throws IOException {
++commitCounter;
if (throwError) {
- throw new IOException("I can throwz exceptions in commit");
+ if (!throwRuntimeException) {
+ throw new IOException("I can throwz exceptions in commit");
+ } else {
+ throw new RuntimeException("I can throwz exceptions in commit");
+ }
}
}
@@ -174,7 +181,11 @@ public class TestVertexImpl {
public void abortVertex(VertexStatus.State finalState) throws IOException {
++abortCounter;
if (throwErrorOnAbort) {
- throw new IOException("I can throwz exceptions in abort");
+ if (!throwRuntimeException) {
+ throw new IOException("I can throwz exceptions in abort");
+ } else {
+ throw new RuntimeException("I can throwz exceptions in abort");
+ }
}
}
}
@@ -1429,7 +1440,7 @@ public class TestVertexImpl {
VertexImpl v = vertices.get("vertex2");
CountingVertexOutputCommitter committer =
- new CountingVertexOutputCommitter(true, true);
+ new CountingVertexOutputCommitter(true, true, false);
v.setVertexOutputCommitter(committer);
startVertex(v);
@@ -1451,7 +1462,39 @@ public class TestVertexImpl {
Assert.assertEquals(0, committer.initCounter); // already done in init
Assert.assertEquals(0, committer.setupCounter); // already done in init
}
-
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testBadCommitter2() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex2");
+
+ CountingVertexOutputCommitter committer =
+ new CountingVertexOutputCommitter(true, true, true);
+ v.setVertexOutputCommitter(committer);
+
+ startVertex(v);
+
+ TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+ TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+ dispatcher.getEventHandler().handle(
+ new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.FAILED, v.getState());
+ Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v.getTerminationCause());
+ Assert.assertEquals(1, committer.commitCounter);
+
+ // FIXME need to verify whether abort needs to be called if commit fails
+ Assert.assertEquals(0, committer.abortCounter);
+ Assert.assertEquals(0, committer.initCounter); // already done in init
+ Assert.assertEquals(0, committer.setupCounter); // already done in init
+ }
+
+
@Test//(timeout = 5000)
public void testVertexWithOneToOneSplit() {
// create a diamond shaped dag with 1-1 edges.