You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:36:09 UTC

[5/8] flink git commit: [FLINK-4566] [network runtime] Properly preserve exception causes for ProducerFailedException

[FLINK-4566] [network runtime] Properly preserve exception causes for ProducerFailedException


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e227b101
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e227b101
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e227b101

Branch: refs/heads/master
Commit: e227b10134e387f3c49804dc0cc4c223c30702e3
Parents: 761d0a0
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:45:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../partition/ProducerFailedException.java       | 19 +++++--------------
 .../partition/ProducerFailedExceptionTest.java   | 12 ++++++------
 2 files changed, 11 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e227b101/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
index 2b2acab..934234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * Network-stack level Exception to notify remote receiver about a failed
@@ -29,23 +29,14 @@ public class ProducerFailedException extends CancelTaskException {
 
 	private static final long serialVersionUID = -1555492656299526395L;
 
-	private final String causeAsString;
-
 	/**
 	 * The cause of the producer failure.
 	 *
-	 * Note: The cause will be stringified, because it might be an instance of
-	 * a user level Exception, which can not be deserialized by the remote
-	 * receiver's system class loader.
+	 * <p>The cause will be stored as a {@link SerializedThrowable}, because it might
+	 * be an instance of a user level Exception, which may not be possible to deserialize
+	 * by the remote receiver's system class loader.
 	 */
 	public ProducerFailedException(Throwable cause) {
-		this.causeAsString = cause != null ? ExceptionUtils.stringifyException(cause) : null;
-	}
-
-	/**
-	 * Returns the stringified cause of the producer failure.
-	 */
-	public String getCauseAsString() {
-		return causeAsString;
+		super(new SerializedThrowable(cause));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e227b101/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 042c136..ca2de0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -19,27 +19,27 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class ProducerFailedExceptionTest {
 
 	@Test
 	public void testInstanceOfCancelTaskException() throws Exception {
-		ProducerFailedException e = new ProducerFailedException(new Exception());
-		assertTrue(e instanceof CancelTaskException);
+		assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class));
 	}
 
 	@Test
-	public void testCauseIsStringified() throws Exception {
+	public void testCauseIsSerialized() throws Exception {
 		// Tests that the cause is stringified, because it might be an instance
 		// of a user level Exception, which can not be deserialized by the
 		// remote receiver's system class loader.
 		ProducerFailedException e = new ProducerFailedException(new Exception());
-		assertNull(e.getCause());
-		assertNotNull(e.getCauseAsString());
+		assertNotNull(e.getCause());
+		assertTrue(e.getCause() instanceof SerializedThrowable);
 	}
 }