You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/09/16 03:13:02 UTC

tez git commit: TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 f96d12ae5 -> d382117a7


TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. (hitesh)

(cherry picked from commit 28cd991b86c0e216e80f9246d8c0bddaa5b0f97c)

Conflicts:
	CHANGES.txt
	tez-api/src/main/java/org/apache/tez/client/TezClient.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d382117a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d382117a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d382117a

Branch: refs/heads/branch-0.7
Commit: d382117a793c75ae6bad9b92efbe063c1269c45c
Parents: f96d12a
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Aug 6 11:08:30 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 15 17:01:46 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/client/TezClient.java   |   5 +-
 .../java/org/apache/tez/common/RPCUtil.java     | 173 ++++++++++++++++++
 .../tez/dag/api/client/DAGClientImpl.java       |   4 +
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |  25 +--
 .../java/org/apache/tez/common/TestRPCUtil.java | 181 +++++++++++++++++++
 .../org/apache/tez/test/TestAMRecovery.java     |   3 +-
 7 files changed, 377 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6adb38e..3e6df02 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
   TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for Tez UI
   TEZ-2812. Tez UI: Update task & attempt tables while in progress.
   TEZ-2786. Tez UI: Update vertex, task & attempt details page while in progress.

http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 0710cdd..da31141 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 import javax.annotation.Nullable;
 
-import org.apache.tez.common.JavaOptsChecker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -43,7 +42,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.counters.Limits;
+import org.apache.tez.common.JavaOptsChecker;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.RPCUtil;
 import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
@@ -501,7 +502,7 @@ public class TezClient {
         dagId = response.getDagId();
       }
     } catch (ServiceException e) {
-      throw new TezException(e);
+      RPCUtil.unwrapAndThrowException(e);
     }
     LOG.info("Submitted dag to TezSession"
         + ", sessionName=" + clientName

http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
new file mode 100644
index 0000000..caeb822
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
@@ -0,0 +1,173 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.tez.dag.api.DAGNotRunningException;
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
+
+import com.google.protobuf.ServiceException;
+
+public class RPCUtil {
+
+  /**
+   * Returns an instance of {@link TezException}
+   */
+  public static TezException getRemoteException(Throwable t) {
+    return new TezException(t);
+  }
+
+  /**
+   * Returns an instance of {@link TezException}
+   */
+  public static TezException getRemoteException(String message) {
+    return new TezException(message);
+  }
+
+  private static <T extends Throwable> T instantiateException(
+      Class<? extends T> cls, RemoteException re) throws RemoteException {
+    try {
+      Constructor<? extends T> cn = cls.getConstructor(String.class);
+      cn.setAccessible(true);
+      T ex = cn.newInstance(re.getMessage());
+      ex.initCause(re);
+      return ex;
+      // RemoteException contains useful information as against the
+      // java.lang.reflect exceptions.
+    } catch (NoSuchMethodException e) {
+      throw re;
+    } catch (IllegalArgumentException e) {
+      throw re;
+    } catch (SecurityException e) {
+      throw re;
+    } catch (InstantiationException e) {
+      throw re;
+    } catch (IllegalAccessException e) {
+      throw re;
+    } catch (InvocationTargetException e) {
+      throw re;
+    }
+  }
+
+  private static <T extends TezException> T instantiateTezException(
+      Class<? extends T> cls, RemoteException re) throws RemoteException {
+    return instantiateException(cls, re);
+  }
+
+  private static <T extends IOException> T instantiateIOException(
+      Class<? extends T> cls, RemoteException re) throws RemoteException {
+    return instantiateException(cls, re);
+  }
+
+  private static <T extends RuntimeException> T instantiateRuntimeException(
+      Class<? extends T> cls, RemoteException re) throws RemoteException {
+    return instantiateException(cls, re);
+  }
+
+  private static <T extends SessionNotRunning> T instantiateSessionNotRunningException(
+      Class<? extends T> cls, RemoteException re) throws RemoteException {
+    return instantiateException(cls, re);
+  }
+
+
+  /**
+   * Utility method that unwraps and returns appropriate exceptions.
+   *
+   * @param se
+   *          ServiceException
+   * @return An instance of the actual exception, which will be a subclass of
+   *         {@link TezException} or {@link IOException}
+   */
+  public static Void unwrapAndThrowException(ServiceException se)
+      throws IOException, TezException {
+
+    Throwable cause = se.getCause();
+    if (cause == null) {
+      // SE generated by the RPC layer itself.
+      throw new IOException(se);
+    } else {
+      if (cause instanceof RemoteException) {
+        RemoteException re = (RemoteException) cause;
+        Class<?> realClass = null;
+        try {
+          realClass = Class.forName(re.getClassName());
+        } catch (ClassNotFoundException cnf) {
+          // Assume this to be a new exception type added to Tez. This isn't
+          // absolutely correct since the RPC layer could add an exception as
+          // well.
+          throw instantiateTezException(TezException.class, re);
+        }
+
+        if (SessionNotRunning.class.isAssignableFrom(realClass)) {
+          throw instantiateTezException(
+              realClass.asSubclass(SessionNotRunning.class), re);
+        } else if (DAGNotRunningException.class.isAssignableFrom(realClass)) {
+            throw instantiateTezException(
+                realClass.asSubclass(DAGNotRunningException.class), re);
+        } else if (TezException.class.isAssignableFrom(realClass)) {
+          throw instantiateTezException(
+              realClass.asSubclass(TezException.class), re);
+        } else if (IOException.class.isAssignableFrom(realClass)) {
+          throw instantiateIOException(realClass.asSubclass(IOException.class),
+              re);
+        } else if (RuntimeException.class.isAssignableFrom(realClass)) {
+          throw instantiateRuntimeException(
+              realClass.asSubclass(RuntimeException.class), re);
+        } else {
+          throw re;
+        }
+        // RemoteException contains useful information as against the
+        // java.lang.reflect exceptions.
+
+      } else if (cause instanceof IOException) {
+        // RPC Client exception.
+        throw (IOException) cause;
+      } else if (cause instanceof RuntimeException) {
+        // RPC RuntimeException
+        throw (RuntimeException) cause;
+      } else {
+        // Should not be generated.
+        throw new IOException(se);
+      }
+    }
+  }
+
+  /**
+   * Utility method that unwraps and returns appropriate exceptions.
+   *
+   * @param se
+   *          ServiceException
+   * @return An instance of the actual exception, which will be a subclass of
+   *         {@link TezException} or {@link IOException}
+   */
+  public static Void unwrapAndThrowNonIOException(ServiceException se)
+      throws TezException {
+    try {
+      return unwrapAndThrowException(se);
+    } catch (IOException ioe) {
+      throw new TezException(ioe);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 4e2ff40..47c8a8e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -353,6 +353,8 @@ public class DAGClientImpl extends DAGClient {
       dagCompleted = true;
     } catch (TezException e) {
       // can be either due to a n/w issue of due to AM completed.
+    } catch (IOException e) {
+      // can be either due to a n/w issue of due to AM completed.
     }
 
     if (dagStatus == null && !dagCompleted) {
@@ -371,6 +373,8 @@ public class DAGClientImpl extends DAGClient {
       dagCompleted = true;
     } catch (TezException e) {
       // can be either due to a n/w issue of due to AM completed.
+    } catch (IOException e) {
+      // can be either due to a n/w issue of due to AM completed.
     }
 
     if (vertexStatus == null && !dagCompleted) {

http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 223c0ab..240289c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -23,11 +23,11 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import org.apache.tez.common.RPCUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -52,6 +52,7 @@ import com.google.protobuf.ServiceException;
 
 @Private
 public class DAGClientRPCImpl extends DAGClient {
+
   private static final Logger LOG = LoggerFactory.getLogger(DAGClientRPCImpl.class);
 
   private static final String DAG_NOT_RUNNING_CLASS_NAME =
@@ -96,6 +97,9 @@ public class DAGClientRPCImpl extends DAGClient {
       } catch (TezException e) {
         resetProxy(e); // create proxy again
         throw e;
+      } catch (IOException e) {
+        resetProxy(e); // create proxy again
+        throw e;
       }
     }
 
@@ -113,6 +117,9 @@ public class DAGClientRPCImpl extends DAGClient {
       } catch (TezException e) {
         resetProxy(e); // create proxy again
         throw e;
+      } catch (IOException e) {
+        resetProxy(e); // create proxy again
+        throw e;
       }
     }
 
@@ -176,22 +183,15 @@ public class DAGClientRPCImpl extends DAGClient {
         proxy.getDAGStatus(null,
           requestProtoBuilder.build()).getDagStatus(), DagStatusSource.AM);
     } catch (ServiceException e) {
-      final Throwable cause = e.getCause();
-      if (cause instanceof RemoteException) {
-        RemoteException remoteException = (RemoteException) cause;
-        if (DAG_NOT_RUNNING_CLASS_NAME.equals(remoteException.getClassName())) {
-          throw new DAGNotRunningException(remoteException.getMessage());
-        }
-      }
-
-      // TEZ-151 retrieve wrapped TezException
+      RPCUtil.unwrapAndThrowException(e);
+      // Should not reach here
       throw new TezException(e);
     }
   }
 
   VertexStatus getVertexStatusViaAM(String vertexName,
       Set<StatusGetOpts> statusOptions)
-      throws TezException {
+      throws TezException, IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
           + " vertex: " + vertexName);
@@ -211,7 +211,8 @@ public class DAGClientRPCImpl extends DAGClient {
         proxy.getVertexStatus(null,
           requestProtoBuilder.build()).getVertexStatus());
     } catch (ServiceException e) {
-      // TEZ-151 retrieve wrapped TezException
+      RPCUtil.unwrapAndThrowException(e);
+      // Should not reach here
       throw new TezException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java
new file mode 100644
index 0000000..1e63b47
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
+import org.junit.Assert;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.Test;
+
+import com.google.protobuf.ServiceException;
+
+public class TestRPCUtil {
+
+  @Test (timeout=1000)
+  public void testUnknownExceptionUnwrapping() {
+    Class<? extends Throwable> exception = TezException.class;
+    String className = "UnknownException.class";
+    verifyRemoteExceptionUnwrapping(exception, className);
+  }
+
+  @Test
+  public void testRemoteIOExceptionUnwrapping() {
+    Class<? extends Throwable> exception = IOException.class;
+    verifyRemoteExceptionUnwrapping(exception, exception.getName());
+  }
+
+  @Test
+  public void testRemoteIOExceptionDerivativeUnwrapping() {
+    // Test IOException sub-class
+    Class<? extends Throwable> exception = FileNotFoundException.class;
+    verifyRemoteExceptionUnwrapping(exception, exception.getName());
+  }
+
+  @Test
+  public void testRemoteTezExceptionUnwrapping() {
+    Class<? extends Throwable> exception = TezException.class;
+    verifyRemoteExceptionUnwrapping(exception, exception.getName());
+
+  }
+
+  @Test
+  public void testRemoteTezExceptionDerivativeUnwrapping() {
+    Class<? extends Throwable> exception = SessionNotRunning.class;
+    verifyRemoteExceptionUnwrapping(exception, exception.getName());
+  }
+
+  @Test
+  public void testRemoteRuntimeExceptionUnwrapping() {
+    Class<? extends Throwable> exception = NullPointerException.class;
+    verifyRemoteExceptionUnwrapping(exception, exception.getName());
+  }
+
+  @Test
+  public void testUnexpectedRemoteExceptionUnwrapping() {
+    // Non IOException, TezException thrown by the remote side.
+    Class<? extends Throwable> exception = Exception.class;
+    verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName());
+  }
+
+  @Test
+  public void testRemoteTezExceptionWithoutStringConstructor() {
+    // Derivatives of TezException should always define a string constructor.
+    Class<? extends Throwable> exception = TezTestExceptionNoConstructor.class;
+    verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName());
+  }
+
+  @Test
+  public void testRPCServiceExceptionUnwrapping() {
+    String message = "ServiceExceptionMessage";
+    ServiceException se = new ServiceException(message);
+
+    Throwable t = null;
+    try {
+      RPCUtil.unwrapAndThrowException(se);
+    } catch (Throwable thrown) {
+      t = thrown;
+    }
+
+    Assert.assertTrue(IOException.class.isInstance(t));
+    Assert.assertTrue(t.getMessage().contains(message));
+  }
+
+  @Test
+  public void testRPCIOExceptionUnwrapping() {
+    String message = "DirectIOExceptionMessage";
+    IOException ioException = new FileNotFoundException(message);
+    ServiceException se = new ServiceException(ioException);
+
+    Throwable t = null;
+    try {
+      RPCUtil.unwrapAndThrowException(se);
+    } catch (Throwable thrown) {
+      t = thrown;
+    }
+    Assert.assertTrue(FileNotFoundException.class.isInstance(t));
+    Assert.assertTrue(t.getMessage().contains(message));
+  }
+
+  @Test
+  public void testRPCRuntimeExceptionUnwrapping() {
+    String message = "RPCRuntimeExceptionUnwrapping";
+    RuntimeException re = new NullPointerException(message);
+    ServiceException se = new ServiceException(re);
+
+    Throwable t = null;
+    try {
+      RPCUtil.unwrapAndThrowException(se);
+    }  catch (Throwable thrown) {
+      t = thrown;
+    }
+
+    Assert.assertTrue(NullPointerException.class.isInstance(t));
+    Assert.assertTrue(t.getMessage().contains(message));
+  }
+
+  private void verifyRemoteExceptionUnwrapping(
+      Class<? extends Throwable> expectedLocalException,
+      String realExceptionClassName) {
+    verifyRemoteExceptionUnwrapping(expectedLocalException, realExceptionClassName, true);
+  }
+
+  private void verifyRemoteExceptionUnwrapping(
+      Class<? extends Throwable> expectedLocalException,
+      String realExceptionClassName, boolean allowIO) {
+    String message = realExceptionClassName + "Message";
+    RemoteException re = new RemoteException(realExceptionClassName, message);
+    ServiceException se = new ServiceException(re);
+
+    Throwable t = null;
+    try {
+      if (allowIO) {
+        RPCUtil.unwrapAndThrowException(se);
+      } else {
+        RPCUtil.unwrapAndThrowNonIOException(se);
+      }
+    } catch (Throwable thrown) {
+      t = thrown;
+    }
+
+    Assert.assertTrue("Expected exception [" + expectedLocalException
+        + "] but found " + t, expectedLocalException.isInstance(t));
+    Assert.assertTrue(
+        "Expected message [" + message + "] but found " + t.getMessage(), t
+            .getMessage().contains(message));
+  }
+
+
+  @Test (timeout=1000)
+  public void testRemoteNonIOExceptionUnwrapping() {
+    Class<? extends Throwable> exception = TezException.class;
+    verifyRemoteExceptionUnwrapping(exception, IOException.class.getName(), false);
+  }
+
+
+  private static class TezTestExceptionNoConstructor extends
+      Exception {
+    private static final long serialVersionUID = 1L;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/d382117a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 1d17b23..7d7069e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -235,6 +235,7 @@ public class TestAMRecovery {
         createDAG("VertexCompletelyFinished_Broadcast", ControlledImmediateStartVertexManager.class,
             DataMovementType.BROADCAST, false);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
@@ -483,7 +484,7 @@ public class TestAMRecovery {
               "application", "dag")
               + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
       if (fs.exists(recoveryFilePath)) {
-        LOG.info("read recovery file:" + recoveryFilePath);
+        LOG.info("Read recovery file:" + recoveryFilePath);
         historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath)));
       }
     }