You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/02 15:42:20 UTC
[2/2] flink git commit: [FLINK-4566] [network runtime] Properly
preserve exception causes for ProducerFailedException
[FLINK-4566] [network runtime] Properly preserve exception causes for ProducerFailedException
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f263b991
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f263b991
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f263b991
Branch: refs/heads/release-1.1
Commit: f263b99173389ca60442614d449a1e9ce1b524c3
Parents: e5b4f46
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Sep 2 11:45:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:41:41 2016 +0200
----------------------------------------------------------------------
.../partition/ProducerFailedException.java | 19 +++++--------------
.../partition/ProducerFailedExceptionTest.java | 12 ++++++------
2 files changed, 11 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f263b991/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
index 2b2acab..934234d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.util.SerializedThrowable;
/**
* Network-stack level Exception to notify remote receiver about a failed
@@ -29,23 +29,14 @@ public class ProducerFailedException extends CancelTaskException {
private static final long serialVersionUID = -1555492656299526395L;
- private final String causeAsString;
-
/**
* The cause of the producer failure.
*
- * Note: The cause will be stringified, because it might be an instance of
- * a user level Exception, which can not be deserialized by the remote
- * receiver's system class loader.
+ * <p>The cause will be stored as a {@link SerializedThrowable}, because it might
+ * be an instance of a user level Exception, which may not be possible to deserialize
+ * by the remote receiver's system class loader.
*/
public ProducerFailedException(Throwable cause) {
- this.causeAsString = cause != null ? ExceptionUtils.stringifyException(cause) : null;
- }
-
- /**
- * Returns the stringified cause of the producer failure.
- */
- public String getCauseAsString() {
- return causeAsString;
+ super(new SerializedThrowable(cause));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f263b991/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index 042c136..ca2de0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -19,27 +19,27 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ProducerFailedExceptionTest {
@Test
public void testInstanceOfCancelTaskException() throws Exception {
- ProducerFailedException e = new ProducerFailedException(new Exception());
- assertTrue(e instanceof CancelTaskException);
+ assertTrue(CancelTaskException.class.isAssignableFrom(ProducerFailedException.class));
}
@Test
- public void testCauseIsStringified() throws Exception {
+ public void testCauseIsSerialized() throws Exception {
// Tests that the cause is stringified, because it might be an instance
// of a user level Exception, which can not be deserialized by the
// remote receiver's system class loader.
ProducerFailedException e = new ProducerFailedException(new Exception());
- assertNull(e.getCause());
- assertNotNull(e.getCauseAsString());
+ assertNotNull(e.getCause());
+ assertTrue(e.getCause() instanceof SerializedThrowable);
}
}