You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/24 17:21:12 UTC
[flink] branch release-1.9 updated: [FLINK-14076] Ensure
CheckpointException can be deserialized on JobManager
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 9ee42ec [FLINK-14076] Ensure CheckpointException can be deserialized on JobManager
9ee42ec is described below
commit 9ee42ec6b009f1c4da7265851e23cfaa45f4c83a
Author: Jeff Martin <jm...@palantir.com>
AuthorDate: Sun Sep 22 21:27:12 2019 -0700
[FLINK-14076] Ensure CheckpointException can be deserialized on JobManager
This closes #9742.
---
.../cassandra/CassandraSinkBaseTest.java | 5 +++-
.../kafka/FlinkKafkaProducer011ITCase.java | 7 +++--
.../connectors/kafka/FlinkKafkaProducerITCase.java | 6 ++--
.../java/org/apache/flink/util/ExceptionUtils.java | 32 ++++++++++++++++++++++
.../runtime/checkpoint/CheckpointException.java | 13 +++++++--
.../sink/TwoPhaseCommitSinkFunctionTest.java | 3 +-
.../api/operators/AbstractStreamOperatorTest.java | 4 +--
.../apache/flink/streaming/util/ContentDump.java | 2 +-
8 files changed, 59 insertions(+), 13 deletions(-)
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index b4406ab..3ce9742 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -37,11 +37,13 @@ import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
import java.util.LinkedList;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
@@ -156,7 +158,8 @@ public class CassandraSinkBaseTest {
Assert.fail();
} catch (Exception e) {
- Assert.assertTrue(e.getCause() instanceof IOException);
+ Optional<IOException> exCause = findSerializedThrowable(e, IOException.class, ClassLoader.getSystemClassLoader());
+ Assert.assertTrue(exCause.isPresent());
}
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 0932d42..b2d0a96 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -49,7 +49,7 @@ import java.util.stream.IntStream;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.EXACTLY_ONCE;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
@@ -160,7 +160,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBaseWithFlink {
}
catch (Exception ex) {
// testHarness1 will be fenced off after creating and closing testHarness2
- if (!findThrowable(ex, ProducerFencedException.class).isPresent()) {
+ if (!findSerializedThrowable(ex, ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) {
throw ex;
}
}
@@ -664,7 +664,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBaseWithFlink {
}
private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) {
- Optional<FlinkKafka011Exception> cause = findThrowable(ex, FlinkKafka011Exception.class);
+ // Extract the root cause kafka exception (if any) from the serialized throwable.
+ Optional<FlinkKafka011Exception> cause = findSerializedThrowable(ex, FlinkKafka011Exception.class, ClassLoader.getSystemClassLoader());
if (cause.isPresent()) {
return cause.get().getErrorCode().equals(expectedErrorCode);
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 1097fd6..d393819 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -45,7 +45,7 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
@@ -156,7 +156,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
}
catch (Exception ex) {
// testHarness1 will be fenced off after creating and closing testHarness2
- if (!findThrowable(ex, ProducerFencedException.class).isPresent()) {
+ if (!findSerializedThrowable(ex, ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) {
throw ex;
}
}
@@ -662,7 +662,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
}
private boolean isCausedBy(FlinkKafkaErrorCode expectedErrorCode, Throwable ex) {
- Optional<FlinkKafkaException> cause = findThrowable(ex, FlinkKafkaException.class);
+ Optional<FlinkKafkaException> cause = findSerializedThrowable(ex, FlinkKafkaException.class, ClassLoader.getSystemClassLoader());
if (cause.isPresent()) {
return cause.get().getErrorCode().equals(expectedErrorCode);
}
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 721bf7f..ddd0276 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -315,6 +315,38 @@ public final class ExceptionUtils {
}
/**
+ * Checks whether a throwable chain contains a specific type of exception and returns it. It deserializes
+ * any {@link SerializedThrowable} that are found using the provided {@link ClassLoader}.
+ *
+ * @param throwable the throwable chain to check.
+ * @param searchType the type of exception to search for in the chain.
+ * @param classLoader to use for deserialization.
+ * @return Optional throwable of the requested type if available, otherwise empty
+ */
+ public static <T extends Throwable> Optional<T> findSerializedThrowable(Throwable throwable, Class<T> searchType, ClassLoader classLoader) {
+ if (throwable == null || searchType == null) {
+ return Optional.empty();
+ }
+
+ Throwable t = throwable;
+ while (t != null) {
+ if (searchType.isAssignableFrom(t.getClass())) {
+ return Optional.of(searchType.cast(t));
+ } else if (t.getClass().isAssignableFrom(SerializedThrowable.class)) {
+ Throwable next = ((SerializedThrowable) t).deserializeError(classLoader);
+ // SerializedThrowable#deserializeError returns itself under some conditions (e.g., null cause).
+ // If that happens, exit to avoid looping infinitely. This is ok because if the user was searching
+ // for a SerializedThrowable, we would have returned it in the initial if condition.
+ t = (next == t) ? null : next;
+ } else {
+ t = t.getCause();
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ /**
* Checks whether a throwable chain contains a specific type of exception and returns it.
*
* @param throwable the throwable chain to check.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
index c0bc2d1..7c8ab49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
/**
* Base class for checkpoint related exceptions.
@@ -40,12 +41,20 @@ public class CheckpointException extends Exception {
}
public CheckpointException(CheckpointFailureReason failureReason, Throwable cause) {
- super(failureReason.message(), cause);
+ // Defensively replace the cause with a SerializedThrowable in case it's a user-defined exception
+ // that doesn't exist on the JobManager's default classpath.
+ super(
+ failureReason.message(),
+ cause == null ? null : new SerializedThrowable(cause));
this.checkpointFailureReason = Preconditions.checkNotNull(failureReason);
}
public CheckpointException(String message, CheckpointFailureReason failureReason, Throwable cause) {
- super(message + " Failure reason: " + failureReason.message(), cause);
+ // Defensively replace the cause with a SerializedThrowable in case it's a user-defined exception
+ // that doesn't exist on the JobManager's default classpath.
+ super(
+ message + " Failure reason: " + failureReason.message(),
+ cause == null ? null : new SerializedThrowable(cause));
this.checkpointFailureReason = Preconditions.checkNotNull(failureReason);
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 2970b87..84c0104 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -47,6 +47,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
@@ -167,7 +168,7 @@ public class TwoPhaseCommitSinkFunctionTest {
harness.snapshot(2, 5);
fail("something should fail");
} catch (Exception ex) {
- if (!(ex.getCause() instanceof ContentDump.NotWritableException)) {
+ if (!findSerializedThrowable(ex, ContentDump.NotWritableException.class, ClassLoader.getSystemClassLoader()).isPresent()) {
throw ex;
}
// ignore
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 36fb867..f9d9aa5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -558,7 +558,7 @@ public class AbstractStreamOperatorTest {
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
- assertEquals(failingException, e.getCause());
+ assertEquals(failingException.getMessage(), e.getCause().getMessage());
}
}
@@ -636,7 +636,7 @@ public class AbstractStreamOperatorTest {
new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
- assertEquals(failingException, e.getCause());
+ assertEquals(failingException.getMessage(), e.getCause().getMessage());
}
// verify that the context has been closed, the operator snapshot result has been cancelled
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
index 903b237..5c1568e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
@@ -124,7 +124,7 @@ public class ContentDump {
/**
* Exception thrown for an attempt to write into read-only {@link ContentDump}.
*/
- public class NotWritableException extends RuntimeException {
+ public static class NotWritableException extends RuntimeException {
public NotWritableException(String name) {
super(String.format("File [%s] is not writable", name));
}