You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/12/12 02:27:33 UTC

git commit: TEZ-533. Exception thrown by a VertexCommitter kills the AM instead of just the DAG. (hitesh)

Updated Branches:
  refs/heads/master 6fcf78ca9 -> 18da4e29c


TEZ-533. Exception thrown by a VertexCommitter kills the AM instead of just the DAG. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/18da4e29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/18da4e29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/18da4e29

Branch: refs/heads/master
Commit: 18da4e29c23a4af5e7c36513b3a0097cde50eb29
Parents: 6fcf78c
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Dec 11 17:27:13 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Dec 11 17:27:13 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 22 +++++---
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 55 +++++++++++++++++---
 2 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18da4e29/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f068617..5f89df7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1046,10 +1046,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         try {
           if (!vertex.committed.getAndSet(true)) {
             // commit only once
+            LOG.info("Invoking committer commit for vertex, vertexId="
+                + vertex.logIdentifier);
             vertex.committer.commitVertex();
           }
-        } catch (IOException e) {
-          LOG.error("Failed to do commit on vertex, name=" + vertex.getName(), e);
+        } catch (Exception e) {
+          LOG.error("Failed to do commit on vertex, vertexId="
+              + vertex.logIdentifier, e);
           vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
           return vertex.finished(VertexState.FAILED);
         }
@@ -1162,11 +1165,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       committer = new MRVertexOutputCommitter();
     }
     try {
+      LOG.info("Invoking committer init for vertex, vertexId=" + logIdentifier);
       committer.init(vertexContext);
+      LOG.info("Invoking committer setup for vertex, vertexId="
+          + logIdentifier);
       committer.setupVertex();
-    } catch (IOException e) {
-      LOG.warn("Vertex init failed", e);
-      addDiagnostic("Job init failed : "
+    } catch (Exception e) {
+      LOG.warn("Vertex init failed, vertexId=" + logIdentifier, e);
+      addDiagnostic("Vertex init failed : "
           + StringUtils.stringifyException(e));
       trySetTerminationCause(VertexTerminationCause.INIT_FAILURE);
       abortVertex(VertexStatus.State.FAILED);
@@ -1578,9 +1584,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private void abortVertex(VertexStatus.State finalState) {
     try {
+      LOG.info("Invoking committer abort for vertex, vertexId="
+          + logIdentifier);
       committer.abortVertex(finalState);
-    } catch (IOException e) {
-      LOG.warn("Could not abort vertex, name=" + getName(), e);
+    } catch (Exception e) {
+      LOG.warn("Could not abort vertex, vertexId=" + logIdentifier, e);
     }
 
     if (finishTime == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18da4e29/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 0b8fde0..78c7dcc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -141,15 +141,18 @@ public class TestVertexImpl {
     public int abortCounter = 0;
     private boolean throwError;
     private boolean throwErrorOnAbort;
+    private boolean throwRuntimeException;
 
     public CountingVertexOutputCommitter(boolean throwError,
-        boolean throwOnAbort) {
+        boolean throwOnAbort,
+        boolean throwRuntimeException) {
       this.throwError = throwError;
       this.throwErrorOnAbort = throwOnAbort;
+      this.throwRuntimeException = throwRuntimeException;
     }
 
     public CountingVertexOutputCommitter() {
-      this(false, false);
+      this(false, false, false);
     }
 
     @Override
@@ -166,7 +169,11 @@ public class TestVertexImpl {
     public void commitVertex() throws IOException {
       ++commitCounter;
       if (throwError) {
-        throw new IOException("I can throwz exceptions in commit");
+        if (!throwRuntimeException) {
+          throw new IOException("I can throwz exceptions in commit");
+        } else {
+          throw new RuntimeException("I can throwz exceptions in commit");
+        }
       }
     }
 
@@ -174,7 +181,11 @@ public class TestVertexImpl {
     public void abortVertex(VertexStatus.State finalState) throws IOException {
       ++abortCounter;
       if (throwErrorOnAbort) {
-        throw new IOException("I can throwz exceptions in abort");
+        if (!throwRuntimeException) {
+          throw new IOException("I can throwz exceptions in abort");
+        } else {
+          throw new RuntimeException("I can throwz exceptions in abort");
+        }
       }
     }
   }
@@ -1429,7 +1440,7 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
 
     CountingVertexOutputCommitter committer =
-        new CountingVertexOutputCommitter(true, true);
+        new CountingVertexOutputCommitter(true, true, false);
     v.setVertexOutputCommitter(committer);
 
     startVertex(v);
@@ -1451,7 +1462,39 @@ public class TestVertexImpl {
     Assert.assertEquals(0, committer.initCounter); // already done in init
     Assert.assertEquals(0, committer.setupCounter); // already done in init
   }
-  
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testBadCommitter2() {
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v = vertices.get("vertex2");
+
+    CountingVertexOutputCommitter committer =
+      new CountingVertexOutputCommitter(true, true, true);
+    v.setVertexOutputCommitter(committer);
+
+    startVertex(v);
+
+    TezTaskID t1 = TezTaskID.getInstance(v.getVertexId(), 0);
+    TezTaskID t2 = TezTaskID.getInstance(v.getVertexId(), 1);
+
+    dispatcher.getEventHandler().handle(
+      new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.getEventHandler().handle(
+      new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, v.getTerminationCause());
+    Assert.assertEquals(1, committer.commitCounter);
+
+    // FIXME need to verify whether abort needs to be called if commit fails
+    Assert.assertEquals(0, committer.abortCounter);
+    Assert.assertEquals(0, committer.initCounter); // already done in init
+    Assert.assertEquals(0, committer.setupCounter); // already done in init
+  }
+
+
   @Test//(timeout = 5000)
   public void testVertexWithOneToOneSplit() {
     // create a diamond shaped dag with 1-1 edges.