You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:42:20 UTC

[2/2] flink git commit: [FLINK-4566] [network runtime] Properly preserve exception causes for ProducerFailedException

[FLINK-4566] [network runtime] Properly preserve exception causes for ProducerFailedException


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f263b991
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f263b991
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f263b991

Branch: refs/heads/release-1.1
Commit: f263b99173389ca60442614d449a1e9ce1b524c3
Parents: e5b4f46
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:45:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:41:41 2016 +0200

----------------------------------------------------------------------
 .../partition/ProducerFailedException.java       | 19 +++++--------------
 .../partition/ProducerFailedExceptionTest.java   | 12 ++++++------
 2 files changed, 11 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f263b991/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
index 2b2acab..934234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.util.SerializedThrowable;
 
 /**
  * Network-stack level Exception to notify remote receiver about a failed
@@ -29,23 +29,14 @@ public class ProducerFailedException extends CancelTaskException {
 
 	private static final long serialVersionUID = -1555492656299526395L;
 
-	private final String causeAsString;
-
 	/**
 	 * The cause of the producer failure.
 	 *
-	 * Note: The cause will be stringified, because it might be an instance of
-	 * a user level Exception, which can not be deserialized by the remote
-	 * receiver's system class loader.
+	 * <p>The cause will be stored as a {@link SerializedThrowable}, because it might
+	 * be an instance of a user level Exception, which may not be possible to deserialize
+	 * by the remote receiver's system class loader.
 	 */
 	public ProducerFailedException(Throwable cause) {
-		this.causeAsString = cause != null ? ExceptionUtils.stringifyException(cause) : null;
-	}
-
-	/**
-	 * Returns the stringified cause of the producer failure.
-	 */
-	public String getCauseAsString() {
-		return causeAsString;
+		super(new SerializedThrowable(cause));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f263b991/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 042c136..ca2de0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -19,27 +19,27 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class ProducerFailedExceptionTest {
 
 	@Test
 	public void testInstanceOfCancelTaskException() throws Exception {
-		ProducerFailedException e = new ProducerFailedException(new Exception());
-		assertTrue(e instanceof CancelTaskException);
+		assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class));
 	}
 
 	@Test
-	public void testCauseIsStringified() throws Exception {
+	public void testCauseIsSerialized() throws Exception {
 		// Tests that the cause is stringified, because it might be an instance
 		// of a user level Exception, which can not be deserialized by the
 		// remote receiver's system class loader.
 		ProducerFailedException e = new ProducerFailedException(new Exception());
-		assertNull(e.getCause());
-		assertNotNull(e.getCauseAsString());
+		assertNotNull(e.getCause());
+		assertTrue(e.getCause() instanceof SerializedThrowable);
 	}
 }