You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:42:19 UTC
[1/2] flink git commit: [FLINK-4567] [runtime] Enhance
SerializedThrowable to properly mimic Exception causes
Repository: flink
Updated Branches:
refs/heads/release-1.1 df72667b3 -> f263b9917
[FLINK-4567] [runtime] Enhance SerializedThrowable to properly mimic Exception causes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5b4f462
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5b4f462
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5b4f462
Branch: refs/heads/release-1.1
Commit: e5b4f4621c04f6f9f60e3ac3d92880d95b0aca9e
Parents: df72667
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:38:53 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:41:33 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/util/SerializedThrowable.java | 88 ++++++++++----------
.../runtime/util/SerializedThrowableTest.java | 40 ++++++++-
2 files changed, 83 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e5b4f462/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
index a7739ef..4dea59c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -21,18 +21,19 @@ package org.apache.flink.runtime.util;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
-import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.lang.ref.WeakReference;
+import java.util.HashSet;
+import java.util.Set;
/**
* Utility class for dealing with user-defined Throwable types that are serialized (for
* example during RPC/Actor communication), but cannot be resolved with the default
* class loader.
- * <p>
- * This exception mimics the original exception with respect to message and stack trace,
+ *
+ * <p>This exception mimics the original exception with respect to message and stack trace,
* and contains the original exception in serialized form. The original exception
* can be re-obtained by supplying the appropriate class loader.
*/
@@ -49,10 +50,6 @@ public class SerializedThrowable extends Exception implements Serializable {
/** The original stack trace, to be printed */
private final String fullStingifiedStackTrace;
- /** A guaranteed serializable placeholder exception that will be used as
- * cause and to capture the original stack trace */
- private final Exception placeholder;
-
/** The original exception, not transported via serialization,
* because the class may not be part of the system class loader.
* In addition, we make sure our cached references to not prevent
@@ -66,33 +63,43 @@ public class SerializedThrowable extends Exception implements Serializable {
* @param exception The exception to serialize.
*/
public SerializedThrowable(Throwable exception) {
+ this(exception, new HashSet<Throwable>());
+ }
+
+ private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
super(getMessageOrError(exception));
if (!(exception instanceof SerializedThrowable)) {
- this.cachedException = new WeakReference<Throwable>(exception);
-
- this.originalErrorClassName = exception.getClass().getName();
- this.fullStingifiedStackTrace = ExceptionUtils.stringifyException(exception);
- this.placeholder = new Exception(
- "Serialized representation of " + originalErrorClassName + ": " + getMessage());
- this.placeholder.setStackTrace(exception.getStackTrace());
- initCause(this.placeholder);
-
+ // serialize and memoize the original message
byte[] serialized;
try {
serialized = InstantiationUtil.serializeObject(exception);
}
catch (Throwable t) {
- // could not serialize exception. send the stringified version instead
- try {
- serialized = InstantiationUtil.serializeObject(placeholder);
- }
- catch (IOException e) {
- // this should really never happen, as we only serialize a a standard exception
- throw new RuntimeException(e.getMessage(), e);
- }
+ serialized = null;
}
this.serializedException = serialized;
+ this.cachedException = new WeakReference<Throwable>(exception);
+
+ // record the original exception's properties (name, stack prints)
+ this.originalErrorClassName = exception.getClass().getName();
+ this.fullStingifiedStackTrace = ExceptionUtils.stringifyException(exception);
+
+ // mimic the original exception's stack trace
+ setStackTrace(exception.getStackTrace());
+
+ // mimic the original exception's cause
+ if (exception.getCause() == null) {
+ initCause(null);
+ }
+ else {
+ // exception causes may by cyclic, so we truncate the cycle when we find it
+ if (alreadySeen.add(exception)) {
+ // we are not in a cycle, yet
+ initCause(new SerializedThrowable(exception.getCause(), alreadySeen));
+ }
+ }
+
}
else {
// copy from that serialized throwable
@@ -100,39 +107,37 @@ public class SerializedThrowable extends Exception implements Serializable {
this.serializedException = other.serializedException;
this.originalErrorClassName = other.originalErrorClassName;
this.fullStingifiedStackTrace = other.fullStingifiedStackTrace;
- this.placeholder = other.placeholder;
this.cachedException = other.cachedException;
}
}
public Throwable deserializeError(ClassLoader classloader) {
+ if (serializedException == null) {
+ // failed to serialize the original exception
+ // return this SerializedThrowable as a stand in
+ return this;
+ }
+
Throwable cached = cachedException == null ? null : cachedException.get();
if (cached == null) {
try {
cached = InstantiationUtil.deserializeObject(serializedException, classloader);
cachedException = new WeakReference<Throwable>(cached);
}
- catch (Exception e) {
- return placeholder;
+ catch (Throwable t) {
+ // something went wrong
+ // return this SerializedThrowable as a stand in
+ return this;
}
}
return cached;
}
-
- public String getStrigifiedStackTrace() {
- return fullStingifiedStackTrace;
- }
-
+
// ------------------------------------------------------------------------
// Override the behavior of Throwable
// ------------------------------------------------------------------------
@Override
- public Throwable getCause() {
- return placeholder;
- }
-
- @Override
public void printStackTrace(PrintStream s) {
s.print(fullStingifiedStackTrace);
s.flush();
@@ -150,15 +155,10 @@ public class SerializedThrowable extends Exception implements Serializable {
return (message != null) ? (originalErrorClassName + ": " + message) : originalErrorClassName;
}
- @Override
- public StackTraceElement[] getStackTrace() {
- return placeholder.getStackTrace();
- }
-
// ------------------------------------------------------------------------
// Static utilities
// ------------------------------------------------------------------------
-
+
public static Throwable get(Throwable serThrowable, ClassLoader loader) {
if (serThrowable instanceof SerializedThrowable) {
return ((SerializedThrowable)serThrowable).deserializeError(loader);
@@ -166,7 +166,7 @@ public class SerializedThrowable extends Exception implements Serializable {
return serThrowable;
}
}
-
+
private static String getMessageOrError(Throwable error) {
try {
return error.getMessage();
http://git-wip-us.apache.org/repos/asf/flink/blob/e5b4f462/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 50efd52..4d57892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -131,10 +131,48 @@ public class SerializedThrowableTest {
// deserialize the proper exception
Throwable deserialized = copy.deserializeError(loader);
assertEquals(clazz, deserialized.getClass());
+
+ // deserialization with the wrong classloader does not lead to a failure
+ Throwable wronglyDeserialized = copy.deserializeError(getClass().getClassLoader());
+ assertEquals(ExceptionUtils.stringifyException(userException),
+ ExceptionUtils.stringifyException(wronglyDeserialized));
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- }
+ }
+
+ @Test
+ public void testCauseChaining() {
+ Exception cause2 = new Exception("level2");
+ Exception cause1 = new Exception("level1", cause2);
+ Exception root = new Exception("level0", cause1);
+
+ SerializedThrowable st = new SerializedThrowable(root);
+
+ assertEquals("level0", st.getMessage());
+
+ assertNotNull(st.getCause());
+ assertEquals("level1", st.getCause().getMessage());
+
+ assertNotNull(st.getCause().getCause());
+ assertEquals("level2", st.getCause().getCause().getMessage());
+ }
+
+ @Test
+ public void testCyclicCauseChaining() {
+ Exception cause3 = new Exception("level3");
+ Exception cause2 = new Exception("level2", cause3);
+ Exception cause1 = new Exception("level1", cause2);
+ Exception root = new Exception("level0", cause1);
+
+ // introduce a cyclic reference
+ cause3.initCause(cause1);
+
+ SerializedThrowable st = new SerializedThrowable(root);
+
+ assertArrayEquals(root.getStackTrace(), st.getStackTrace());
+ assertEquals(ExceptionUtils.stringifyException(root), ExceptionUtils.stringifyException(st));
+ }
}
[2/2] flink git commit: [FLINK-4566] [network runtime] Properly
preserve exception causes for ProducerFailedException
Posted by se...@apache.org.
[FLINK-4566] [network runtime] Properly preserve exception causes for ProducerFailedException
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f263b991
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f263b991
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f263b991
Branch: refs/heads/release-1.1
Commit: f263b99173389ca60442614d449a1e9ce1b524c3
Parents: e5b4f46
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:45:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:41:41 2016 +0200
----------------------------------------------------------------------
.../partition/ProducerFailedException.java | 19 +++++--------------
.../partition/ProducerFailedExceptionTest.java | 12 ++++++------
2 files changed, 11 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f263b991/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
index 2b2acab..934234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.util.SerializedThrowable;
/**
* Network-stack level Exception to notify remote receiver about a failed
@@ -29,23 +29,14 @@ public class ProducerFailedException extends CancelTaskException {
private static final long serialVersionUID = -1555492656299526395L;
- private final String causeAsString;
-
/**
* The cause of the producer failure.
*
- * Note: The cause will be stringified, because it might be an instance of
- * a user level Exception, which can not be deserialized by the remote
- * receiver's system class loader.
+ * <p>The cause will be stored as a {@link SerializedThrowable}, because it might
+ * be an instance of a user level Exception, which may not be possible to deserialize
+ * by the remote receiver's system class loader.
*/
public ProducerFailedException(Throwable cause) {
- this.causeAsString = cause != null ? ExceptionUtils.stringifyException(cause) : null;
- }
-
- /**
- * Returns the stringified cause of the producer failure.
- */
- public String getCauseAsString() {
- return causeAsString;
+ super(new SerializedThrowable(cause));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f263b991/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 042c136..ca2de0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -19,27 +19,27 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ProducerFailedExceptionTest {
@Test
public void testInstanceOfCancelTaskException() throws Exception {
- ProducerFailedException e = new ProducerFailedException(new Exception());
- assertTrue(e instanceof CancelTaskException);
+ assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class));
}
@Test
- public void testCauseIsStringified() throws Exception {
+ public void testCauseIsSerialized() throws Exception {
// Tests that the cause is stringified, because it might be an instance
// of a user level Exception, which can not be deserialized by the
// remote receiver's system class loader.
ProducerFailedException e = new ProducerFailedException(new Exception());
- assertNull(e.getCause());
- assertNotNull(e.getCauseAsString());
+ assertNotNull(e.getCause());
+ assertTrue(e.getCause() instanceof SerializedThrowable);
}
}