You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:24 UTC

[02/50] [abbrv] flink git commit: [FLINK-4567] [runtime] Enhance SerializedThrowable to properly mimic Exception causes

[FLINK-4567] [runtime] Enhance SerializedThrowable to properly mimic Exception causes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/761d0a02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/761d0a02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/761d0a02

Branch: refs/heads/flip-6
Commit: 761d0a02505c7eaef7a566f978145b187c89cbf8
Parents: c251efc
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:38:53 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/util/SerializedThrowable.java | 88 ++++++++++----------
 .../runtime/util/SerializedThrowableTest.java   | 40 ++++++++-
 2 files changed, 83 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/761d0a02/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
index a7739ef..4dea59c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -21,18 +21,19 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
 
-import java.io.IOException;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.Serializable;
 import java.lang.ref.WeakReference;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Utility class for dealing with user-defined Throwable types that are serialized (for
  * example during RPC/Actor communication), but cannot be resolved with the default
  * class loader.
- * <p>
- * This exception mimics the original exception with respect to message and stack trace,
+ * 
+ * <p>This exception mimics the original exception with respect to message and stack trace,
  * and contains the original exception in serialized form. The original exception
  * can be re-obtained by supplying the appropriate class loader.
  */
@@ -49,10 +50,6 @@ public class SerializedThrowable extends Exception implements Serializable {
 	/** The original stack trace, to be printed */
 	private final String fullStingifiedStackTrace;
 
-	/** A guaranteed serializable placeholder exception that will be used as
-	 * cause and to capture the original stack trace */
-	private final Exception placeholder;
-	
 	/** The original exception, not transported via serialization, 
 	 * because the class may not be part of the system class loader.
 	 * In addition, we make sure our cached references to not prevent
@@ -66,33 +63,43 @@ public class SerializedThrowable extends Exception implements Serializable {
 	 * @param exception The exception to serialize.
 	 */
 	public SerializedThrowable(Throwable exception) {
+		this(exception, new HashSet<Throwable>());
+	}
+
+	private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
 		super(getMessageOrError(exception));
 
 		if (!(exception instanceof SerializedThrowable)) {
-			this.cachedException = new WeakReference<Throwable>(exception);
-			
-			this.originalErrorClassName = exception.getClass().getName();
-			this.fullStingifiedStackTrace = ExceptionUtils.stringifyException(exception);
-			this.placeholder = new Exception(
-					"Serialized representation of " + originalErrorClassName + ": " + getMessage());
-			this.placeholder.setStackTrace(exception.getStackTrace());
-			initCause(this.placeholder);
-			
+			// serialize and memoize the original message
 			byte[] serialized;
 			try {
 				serialized = InstantiationUtil.serializeObject(exception);
 			}
 			catch (Throwable t) {
-				// could not serialize exception. send the stringified version instead
-				try {
-					serialized = InstantiationUtil.serializeObject(placeholder);
-				}
-				catch (IOException e) {
-					// this should really never happen, as we only serialize a a standard exception
-					throw new RuntimeException(e.getMessage(), e);
-				}
+				serialized = null;
 			}
 			this.serializedException = serialized;
+			this.cachedException = new WeakReference<Throwable>(exception);
+
+			// record the original exception's properties (name, stack prints)
+			this.originalErrorClassName = exception.getClass().getName();
+			this.fullStingifiedStackTrace = ExceptionUtils.stringifyException(exception);
+
+			// mimic the original exception's stack trace
+			setStackTrace(exception.getStackTrace());
+
+			// mimic the original exception's cause
+			if (exception.getCause() == null) {
+				initCause(null);
+			}
+			else {
+				// exception causes may by cyclic, so we truncate the cycle when we find it 
+				if (alreadySeen.add(exception)) {
+					// we are not in a cycle, yet
+					initCause(new SerializedThrowable(exception.getCause(), alreadySeen));
+				}
+			}
+
 		}
 		else {
 			// copy from that serialized throwable
@@ -100,39 +107,37 @@ public class SerializedThrowable extends Exception implements Serializable {
 			this.serializedException = other.serializedException;
 			this.originalErrorClassName = other.originalErrorClassName;
 			this.fullStingifiedStackTrace = other.fullStingifiedStackTrace;
-			this.placeholder = other.placeholder;
 			this.cachedException = other.cachedException;
 		}
 	}
 
 	public Throwable deserializeError(ClassLoader classloader) {
+		if (serializedException == null) {
+			// failed to serialize the original exception
+			// return this SerializedThrowable as a stand in
+			return this;
+		}
+
 		Throwable cached = cachedException == null ? null : cachedException.get();
 		if (cached == null) {
 			try {
 				cached = InstantiationUtil.deserializeObject(serializedException, classloader);
 				cachedException = new WeakReference<Throwable>(cached);
 			}
-			catch (Exception e) {
-				return placeholder;
+			catch (Throwable t) {
+				// something went wrong
+				// return this SerializedThrowable as a stand in
+				return this;
 			}
 		}
 		return cached;
 	}
-	
-	public String getStrigifiedStackTrace() {
-		return fullStingifiedStackTrace;
-	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Override the behavior of Throwable
 	// ------------------------------------------------------------------------
 
 	@Override
-	public Throwable getCause() {
-		return placeholder;
-	}
-
-	@Override
 	public void printStackTrace(PrintStream s) {
 		s.print(fullStingifiedStackTrace);
 		s.flush();
@@ -150,15 +155,10 @@ public class SerializedThrowable extends Exception implements Serializable {
 		return (message != null) ? (originalErrorClassName + ": " + message) : originalErrorClassName;
 	}
 
-	@Override
-	public StackTraceElement[] getStackTrace() {
-		return placeholder.getStackTrace();
-	}
-
 	// ------------------------------------------------------------------------
 	//  Static utilities
 	// ------------------------------------------------------------------------
-	
+
 	public static Throwable get(Throwable serThrowable, ClassLoader loader) {
 		if (serThrowable instanceof SerializedThrowable) {
 			return ((SerializedThrowable)serThrowable).deserializeError(loader);
@@ -166,7 +166,7 @@ public class SerializedThrowable extends Exception implements Serializable {
 			return serThrowable;
 		}
 	}
-	
+
 	private static String getMessageOrError(Throwable error) {
 		try {
 			return error.getMessage();

http://git-wip-us.apache.org/repos/asf/flink/blob/761d0a02/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 50efd52..4d57892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -131,10 +131,48 @@ public class SerializedThrowableTest {
 			// deserialize the proper exception
 			Throwable deserialized = copy.deserializeError(loader); 
 			assertEquals(clazz, deserialized.getClass());
+
+			// deserialization with the wrong classloader does not lead to a failure
+			Throwable wronglyDeserialized = copy.deserializeError(getClass().getClassLoader());
+			assertEquals(ExceptionUtils.stringifyException(userException),
+					ExceptionUtils.stringifyException(wronglyDeserialized));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
-	} 
+	}
+
+	@Test
+	public void testCauseChaining() {
+		Exception cause2 = new Exception("level2");
+		Exception cause1 = new Exception("level1", cause2);
+		Exception root = new Exception("level0", cause1);
+
+		SerializedThrowable st = new SerializedThrowable(root);
+
+		assertEquals("level0", st.getMessage());
+
+		assertNotNull(st.getCause());
+		assertEquals("level1", st.getCause().getMessage());
+
+		assertNotNull(st.getCause().getCause());
+		assertEquals("level2", st.getCause().getCause().getMessage());
+	}
+
+	@Test
+	public void testCyclicCauseChaining() {
+		Exception cause3 = new Exception("level3");
+		Exception cause2 = new Exception("level2", cause3);
+		Exception cause1 = new Exception("level1", cause2);
+		Exception root = new Exception("level0", cause1);
+
+		// introduce a cyclic reference
+		cause3.initCause(cause1);
+
+		SerializedThrowable st = new SerializedThrowable(root);
+
+		assertArrayEquals(root.getStackTrace(), st.getStackTrace());
+		assertEquals(ExceptionUtils.stringifyException(root), ExceptionUtils.stringifyException(st));
+	}
 }