You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/08/06 20:08:43 UTC
tez git commit: TEZ-2663. SessionNotRunning exceptions are wrapped in
a ServiceException from a dying AM. (hitesh)
Repository: tez
Updated Branches:
refs/heads/master edddea808 -> 28cd991b8
TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/28cd991b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/28cd991b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/28cd991b
Branch: refs/heads/master
Commit: 28cd991b86c0e216e80f9246d8c0bddaa5b0f97c
Parents: edddea8
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Aug 6 11:08:30 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Aug 6 11:08:30 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/client/TezClient.java | 3 +-
.../java/org/apache/tez/common/RPCUtil.java | 173 ++++++++++++++++++
.../tez/dag/api/client/DAGClientImpl.java | 4 +
.../dag/api/client/rpc/DAGClientRPCImpl.java | 25 +--
.../java/org/apache/tez/common/TestRPCUtil.java | 181 +++++++++++++++++++
.../org/apache/tez/test/TestAMRecovery.java | 3 +-
7 files changed, 376 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d986913..5d3c4f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ INCOMPATIBLE CHANGES
TEZ-2650. Timing details on Vertex state changes
ALL CHANGES:
+ TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
TEZ-2630. TezChild receives IP address instead of FQDN.
TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized.
TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation.
http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index aad6e76..2590879 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -25,6 +25,7 @@ import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.tez.common.RPCUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -473,7 +474,7 @@ public class TezClient {
dagId = response.getDagId();
}
} catch (ServiceException e) {
- throw new TezException(e);
+ RPCUtil.unwrapAndThrowException(e);
}
LOG.info("Submitted dag to TezSession"
+ ", sessionName=" + clientName
http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
new file mode 100644
index 0000000..caeb822
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
@@ -0,0 +1,173 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.tez.dag.api.DAGNotRunningException;
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
+
+import com.google.protobuf.ServiceException;
+
+public class RPCUtil {
+
+ /**
+ * Returns an instance of {@link TezException}
+ */
+ public static TezException getRemoteException(Throwable t) {
+ return new TezException(t);
+ }
+
+ /**
+ * Returns an instance of {@link TezException}
+ */
+ public static TezException getRemoteException(String message) {
+ return new TezException(message);
+ }
+
+ private static <T extends Throwable> T instantiateException(
+ Class<? extends T> cls, RemoteException re) throws RemoteException {
+ try {
+ Constructor<? extends T> cn = cls.getConstructor(String.class);
+ cn.setAccessible(true);
+ T ex = cn.newInstance(re.getMessage());
+ ex.initCause(re);
+ return ex;
+ // RemoteException contains useful information as against the
+ // java.lang.reflect exceptions.
+ } catch (NoSuchMethodException e) {
+ throw re;
+ } catch (IllegalArgumentException e) {
+ throw re;
+ } catch (SecurityException e) {
+ throw re;
+ } catch (InstantiationException e) {
+ throw re;
+ } catch (IllegalAccessException e) {
+ throw re;
+ } catch (InvocationTargetException e) {
+ throw re;
+ }
+ }
+
+ private static <T extends TezException> T instantiateTezException(
+ Class<? extends T> cls, RemoteException re) throws RemoteException {
+ return instantiateException(cls, re);
+ }
+
+ private static <T extends IOException> T instantiateIOException(
+ Class<? extends T> cls, RemoteException re) throws RemoteException {
+ return instantiateException(cls, re);
+ }
+
+ private static <T extends RuntimeException> T instantiateRuntimeException(
+ Class<? extends T> cls, RemoteException re) throws RemoteException {
+ return instantiateException(cls, re);
+ }
+
+ private static <T extends SessionNotRunning> T instantiateSessionNotRunningException(
+ Class<? extends T> cls, RemoteException re) throws RemoteException {
+ return instantiateException(cls, re);
+ }
+
+
+ /**
+ * Utility method that unwraps and returns appropriate exceptions.
+ *
+ * @param se
+ * ServiceException
+ * @return An instance of the actual exception, which will be a subclass of
+ * {@link TezException} or {@link IOException}
+ */
+ public static Void unwrapAndThrowException(ServiceException se)
+ throws IOException, TezException {
+
+ Throwable cause = se.getCause();
+ if (cause == null) {
+ // SE generated by the RPC layer itself.
+ throw new IOException(se);
+ } else {
+ if (cause instanceof RemoteException) {
+ RemoteException re = (RemoteException) cause;
+ Class<?> realClass = null;
+ try {
+ realClass = Class.forName(re.getClassName());
+ } catch (ClassNotFoundException cnf) {
+ // Assume this to be a new exception type added to Tez. This isn't
+ // absolutely correct since the RPC layer could add an exception as
+ // well.
+ throw instantiateTezException(TezException.class, re);
+ }
+
+ if (SessionNotRunning.class.isAssignableFrom(realClass)) {
+ throw instantiateTezException(
+ realClass.asSubclass(SessionNotRunning.class), re);
+ } else if (DAGNotRunningException.class.isAssignableFrom(realClass)) {
+ throw instantiateTezException(
+ realClass.asSubclass(DAGNotRunningException.class), re);
+ } else if (TezException.class.isAssignableFrom(realClass)) {
+ throw instantiateTezException(
+ realClass.asSubclass(TezException.class), re);
+ } else if (IOException.class.isAssignableFrom(realClass)) {
+ throw instantiateIOException(realClass.asSubclass(IOException.class),
+ re);
+ } else if (RuntimeException.class.isAssignableFrom(realClass)) {
+ throw instantiateRuntimeException(
+ realClass.asSubclass(RuntimeException.class), re);
+ } else {
+ throw re;
+ }
+ // RemoteException contains useful information as against the
+ // java.lang.reflect exceptions.
+
+ } else if (cause instanceof IOException) {
+ // RPC Client exception.
+ throw (IOException) cause;
+ } else if (cause instanceof RuntimeException) {
+ // RPC RuntimeException
+ throw (RuntimeException) cause;
+ } else {
+ // Should not be generated.
+ throw new IOException(se);
+ }
+ }
+ }
+
+ /**
+ * Utility method that unwraps and returns appropriate exceptions.
+ *
+ * @param se
+ * ServiceException
+ * @return An instance of the actual exception, which will be a subclass of
+ * {@link TezException} or {@link IOException}
+ */
+ public static Void unwrapAndThrowNonIOException(ServiceException se)
+ throws TezException {
+ try {
+ return unwrapAndThrowException(se);
+ } catch (IOException ioe) {
+ throw new TezException(ioe);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index 4e2ff40..47c8a8e 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -353,6 +353,8 @@ public class DAGClientImpl extends DAGClient {
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
+ } catch (IOException e) {
+ // can be either due to a n/w issue of due to AM completed.
}
if (dagStatus == null && !dagCompleted) {
@@ -371,6 +373,8 @@ public class DAGClientImpl extends DAGClient {
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
+ } catch (IOException e) {
+ // can be either due to a n/w issue of due to AM completed.
}
if (vertexStatus == null && !dagCompleted) {
http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 223c0ab..240289c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -23,11 +23,11 @@ import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.tez.common.RPCUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -52,6 +52,7 @@ import com.google.protobuf.ServiceException;
@Private
public class DAGClientRPCImpl extends DAGClient {
+
private static final Logger LOG = LoggerFactory.getLogger(DAGClientRPCImpl.class);
private static final String DAG_NOT_RUNNING_CLASS_NAME =
@@ -96,6 +97,9 @@ public class DAGClientRPCImpl extends DAGClient {
} catch (TezException e) {
resetProxy(e); // create proxy again
throw e;
+ } catch (IOException e) {
+ resetProxy(e); // create proxy again
+ throw e;
}
}
@@ -113,6 +117,9 @@ public class DAGClientRPCImpl extends DAGClient {
} catch (TezException e) {
resetProxy(e); // create proxy again
throw e;
+ } catch (IOException e) {
+ resetProxy(e); // create proxy again
+ throw e;
}
}
@@ -176,22 +183,15 @@ public class DAGClientRPCImpl extends DAGClient {
proxy.getDAGStatus(null,
requestProtoBuilder.build()).getDagStatus(), DagStatusSource.AM);
} catch (ServiceException e) {
- final Throwable cause = e.getCause();
- if (cause instanceof RemoteException) {
- RemoteException remoteException = (RemoteException) cause;
- if (DAG_NOT_RUNNING_CLASS_NAME.equals(remoteException.getClassName())) {
- throw new DAGNotRunningException(remoteException.getMessage());
- }
- }
-
- // TEZ-151 retrieve wrapped TezException
+ RPCUtil.unwrapAndThrowException(e);
+ // Should not reach here
throw new TezException(e);
}
}
VertexStatus getVertexStatusViaAM(String vertexName,
Set<StatusGetOpts> statusOptions)
- throws TezException {
+ throws TezException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
+ " vertex: " + vertexName);
@@ -211,7 +211,8 @@ public class DAGClientRPCImpl extends DAGClient {
proxy.getVertexStatus(null,
requestProtoBuilder.build()).getVertexStatus());
} catch (ServiceException e) {
- // TEZ-151 retrieve wrapped TezException
+ RPCUtil.unwrapAndThrowException(e);
+ // Should not reach here
throw new TezException(e);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java
new file mode 100644
index 0000000..1e63b47
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
+import org.junit.Assert;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.Test;
+
+import com.google.protobuf.ServiceException;
+
+public class TestRPCUtil {
+
+ @Test (timeout=1000)
+ public void testUnknownExceptionUnwrapping() {
+ Class<? extends Throwable> exception = TezException.class;
+ String className = "UnknownException.class";
+ verifyRemoteExceptionUnwrapping(exception, className);
+ }
+
+ @Test
+ public void testRemoteIOExceptionUnwrapping() {
+ Class<? extends Throwable> exception = IOException.class;
+ verifyRemoteExceptionUnwrapping(exception, exception.getName());
+ }
+
+ @Test
+ public void testRemoteIOExceptionDerivativeUnwrapping() {
+ // Test IOException sub-class
+ Class<? extends Throwable> exception = FileNotFoundException.class;
+ verifyRemoteExceptionUnwrapping(exception, exception.getName());
+ }
+
+ @Test
+ public void testRemoteTezExceptionUnwrapping() {
+ Class<? extends Throwable> exception = TezException.class;
+ verifyRemoteExceptionUnwrapping(exception, exception.getName());
+
+ }
+
+ @Test
+ public void testRemoteTezExceptionDerivativeUnwrapping() {
+ Class<? extends Throwable> exception = SessionNotRunning.class;
+ verifyRemoteExceptionUnwrapping(exception, exception.getName());
+ }
+
+ @Test
+ public void testRemoteRuntimeExceptionUnwrapping() {
+ Class<? extends Throwable> exception = NullPointerException.class;
+ verifyRemoteExceptionUnwrapping(exception, exception.getName());
+ }
+
+ @Test
+ public void testUnexpectedRemoteExceptionUnwrapping() {
+ // Non IOException, TezException thrown by the remote side.
+ Class<? extends Throwable> exception = Exception.class;
+ verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName());
+ }
+
+ @Test
+ public void testRemoteTezExceptionWithoutStringConstructor() {
+ // Derivatives of TezException should always define a string constructor.
+ Class<? extends Throwable> exception = TezTestExceptionNoConstructor.class;
+ verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName());
+ }
+
+ @Test
+ public void testRPCServiceExceptionUnwrapping() {
+ String message = "ServiceExceptionMessage";
+ ServiceException se = new ServiceException(message);
+
+ Throwable t = null;
+ try {
+ RPCUtil.unwrapAndThrowException(se);
+ } catch (Throwable thrown) {
+ t = thrown;
+ }
+
+ Assert.assertTrue(IOException.class.isInstance(t));
+ Assert.assertTrue(t.getMessage().contains(message));
+ }
+
+ @Test
+ public void testRPCIOExceptionUnwrapping() {
+ String message = "DirectIOExceptionMessage";
+ IOException ioException = new FileNotFoundException(message);
+ ServiceException se = new ServiceException(ioException);
+
+ Throwable t = null;
+ try {
+ RPCUtil.unwrapAndThrowException(se);
+ } catch (Throwable thrown) {
+ t = thrown;
+ }
+ Assert.assertTrue(FileNotFoundException.class.isInstance(t));
+ Assert.assertTrue(t.getMessage().contains(message));
+ }
+
+ @Test
+ public void testRPCRuntimeExceptionUnwrapping() {
+ String message = "RPCRuntimeExceptionUnwrapping";
+ RuntimeException re = new NullPointerException(message);
+ ServiceException se = new ServiceException(re);
+
+ Throwable t = null;
+ try {
+ RPCUtil.unwrapAndThrowException(se);
+ } catch (Throwable thrown) {
+ t = thrown;
+ }
+
+ Assert.assertTrue(NullPointerException.class.isInstance(t));
+ Assert.assertTrue(t.getMessage().contains(message));
+ }
+
+ private void verifyRemoteExceptionUnwrapping(
+ Class<? extends Throwable> expectedLocalException,
+ String realExceptionClassName) {
+ verifyRemoteExceptionUnwrapping(expectedLocalException, realExceptionClassName, true);
+ }
+
+ private void verifyRemoteExceptionUnwrapping(
+ Class<? extends Throwable> expectedLocalException,
+ String realExceptionClassName, boolean allowIO) {
+ String message = realExceptionClassName + "Message";
+ RemoteException re = new RemoteException(realExceptionClassName, message);
+ ServiceException se = new ServiceException(re);
+
+ Throwable t = null;
+ try {
+ if (allowIO) {
+ RPCUtil.unwrapAndThrowException(se);
+ } else {
+ RPCUtil.unwrapAndThrowNonIOException(se);
+ }
+ } catch (Throwable thrown) {
+ t = thrown;
+ }
+
+ Assert.assertTrue("Expected exception [" + expectedLocalException
+ + "] but found " + t, expectedLocalException.isInstance(t));
+ Assert.assertTrue(
+ "Expected message [" + message + "] but found " + t.getMessage(), t
+ .getMessage().contains(message));
+ }
+
+
+ @Test (timeout=1000)
+ public void testRemoteNonIOExceptionUnwrapping() {
+ Class<? extends Throwable> exception = TezException.class;
+ verifyRemoteExceptionUnwrapping(exception, IOException.class.getName(), false);
+ }
+
+
+ private static class TezTestExceptionNoConstructor extends
+ Exception {
+ private static final long serialVersionUID = 1L;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 74efee2..04b0a03 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -235,6 +235,7 @@ public class TestAMRecovery {
createDAG("VertexCompletelyFinished_Broadcast", ControlledImmediateStartVertexManager.class,
DataMovementType.BROADCAST, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
@@ -483,7 +484,7 @@ public class TestAMRecovery {
"application", "dag")
+ "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
if (fs.exists(recoveryFilePath)) {
- LOG.info("read recovery file:" + recoveryFilePath);
+ LOG.info("Read recovery file:" + recoveryFilePath);
historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath)));
}
}