You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/09/24 17:21:12 UTC

[flink] branch release-1.9 updated: [FLINK-14076] Ensure CheckpointException can be deserialized on JobManager

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 9ee42ec  [FLINK-14076] Ensure CheckpointException can be deserialized on JobManager
9ee42ec is described below

commit 9ee42ec6b009f1c4da7265851e23cfaa45f4c83a
Author: Jeff Martin <jm...@palantir.com>
AuthorDate: Sun Sep 22 21:27:12 2019 -0700

    [FLINK-14076] Ensure CheckpointException can be deserialized on JobManager
    
    This closes #9742.
---
 .../cassandra/CassandraSinkBaseTest.java           |  5 +++-
 .../kafka/FlinkKafkaProducer011ITCase.java         |  7 +++--
 .../connectors/kafka/FlinkKafkaProducerITCase.java |  6 ++--
 .../java/org/apache/flink/util/ExceptionUtils.java | 32 ++++++++++++++++++++++
 .../runtime/checkpoint/CheckpointException.java    | 13 +++++++--
 .../sink/TwoPhaseCommitSinkFunctionTest.java       |  3 +-
 .../api/operators/AbstractStreamOperatorTest.java  |  4 +--
 .../apache/flink/streaming/util/ContentDump.java   |  2 +-
 8 files changed, 59 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index b4406ab..3ce9742 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -37,11 +37,13 @@ import org.junit.Test;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.hamcrest.number.OrderingComparison.greaterThan;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -156,7 +158,8 @@ public class CassandraSinkBaseTest {
 
 				Assert.fail();
 			} catch (Exception e) {
-				Assert.assertTrue(e.getCause() instanceof IOException);
+				Optional<IOException> exCause = findSerializedThrowable(e, IOException.class, ClassLoader.getSystemClassLoader());
+				Assert.assertTrue(exCause.isPresent());
 			}
 		}
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 0932d42..b2d0a96 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -49,7 +49,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic;
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE;
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic.EXACTLY_ONCE;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
@@ -160,7 +160,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBaseWithFlink {
 		}
 		catch (Exception ex) {
 			// testHarness1 will be fenced off after creating and closing testHarness2
-			if (!findThrowable(ex, ProducerFencedException.class).isPresent()) {
+			if (!findSerializedThrowable(ex, ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) {
 				throw ex;
 			}
 		}
@@ -664,7 +664,8 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBaseWithFlink {
 	}
 
 	private boolean isCausedBy(FlinkKafka011ErrorCode expectedErrorCode, Throwable ex) {
-		Optional<FlinkKafka011Exception> cause = findThrowable(ex, FlinkKafka011Exception.class);
+		// Extract the root cause kafka exception (if any) from the serialized throwable.
+		Optional<FlinkKafka011Exception> cause = findSerializedThrowable(ex, FlinkKafka011Exception.class, ClassLoader.getSystemClassLoader());
 		if (cause.isPresent()) {
 			return cause.get().getErrorCode().equals(expectedErrorCode);
 		}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 1097fd6..d393819 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -45,7 +45,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
@@ -156,7 +156,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 		}
 		catch (Exception ex) {
 			// testHarness1 will be fenced off after creating and closing testHarness2
-			if (!findThrowable(ex, ProducerFencedException.class).isPresent()) {
+			if (!findSerializedThrowable(ex, ProducerFencedException.class, ClassLoader.getSystemClassLoader()).isPresent()) {
 				throw ex;
 			}
 		}
@@ -662,7 +662,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase {
 	}
 
 	private boolean isCausedBy(FlinkKafkaErrorCode expectedErrorCode, Throwable ex) {
-		Optional<FlinkKafkaException> cause = findThrowable(ex, FlinkKafkaException.class);
+		Optional<FlinkKafkaException> cause = findSerializedThrowable(ex, FlinkKafkaException.class, ClassLoader.getSystemClassLoader());
 		if (cause.isPresent()) {
 			return cause.get().getErrorCode().equals(expectedErrorCode);
 		}
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 721bf7f..ddd0276 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -315,6 +315,38 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether a throwable chain contains a specific type of exception and returns it. It deserializes
+	 * any {@link SerializedThrowable} that are found using the provided {@link ClassLoader}.
+	 *
+	 * @param throwable the throwable chain to check.
+	 * @param searchType the type of exception to search for in the chain.
+	 * @param classLoader to use for deserialization.
+	 * @return Optional throwable of the requested type if available, otherwise empty
+	 */
+	public static <T extends Throwable> Optional<T> findSerializedThrowable(Throwable throwable, Class<T> searchType, ClassLoader classLoader) {
+		if (throwable == null || searchType == null) {
+			return Optional.empty();
+		}
+
+		Throwable t = throwable;
+		while (t != null) {
+			if (searchType.isAssignableFrom(t.getClass())) {
+				return Optional.of(searchType.cast(t));
+			} else if (t.getClass().isAssignableFrom(SerializedThrowable.class)) {
+				Throwable next = ((SerializedThrowable) t).deserializeError(classLoader);
+				// SerializedThrowable#deserializeError returns itself under some conditions (e.g., null cause).
+				// If that happens, exit to avoid looping infinitely. This is ok because if the user was searching
+				// for a SerializedThrowable, we would have returned it in the initial if condition.
+				t = (next == t) ? null : next;
+			} else {
+				t = t.getCause();
+			}
+		}
+
+		return Optional.empty();
+	}
+
+	/**
 	 * Checks whether a throwable chain contains a specific type of exception and returns it.
 	 *
 	 * @param throwable the throwable chain to check.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
index c0bc2d1..7c8ab49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
 /**
  * Base class for checkpoint related exceptions.
@@ -40,12 +41,20 @@ public class CheckpointException extends Exception {
 	}
 
 	public CheckpointException(CheckpointFailureReason failureReason, Throwable cause) {
-		super(failureReason.message(), cause);
+		// Defensively replace the cause with a SerializedThrowable in case it's a user-defined exception
+		// that doesn't exist on the JobManager's default classpath.
+		super(
+			failureReason.message(),
+			cause == null ? null : new SerializedThrowable(cause));
 		this.checkpointFailureReason = Preconditions.checkNotNull(failureReason);
 	}
 
 	public CheckpointException(String message, CheckpointFailureReason failureReason, Throwable cause) {
-		super(message + " Failure reason: " + failureReason.message(), cause);
+		// Defensively replace the cause with a SerializedThrowable in case it's a user-defined exception
+		// that doesn't exist on the JobManager's default classpath.
+		super(
+			message + " Failure reason: " + failureReason.message(),
+			cause == null ? null : new SerializedThrowable(cause));
 		this.checkpointFailureReason = Preconditions.checkNotNull(failureReason);
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index 2970b87..84c0104 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -47,6 +47,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.ExceptionUtils.findSerializedThrowable;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
@@ -167,7 +168,7 @@ public class TwoPhaseCommitSinkFunctionTest {
 			harness.snapshot(2, 5);
 			fail("something should fail");
 		} catch (Exception ex) {
-			if (!(ex.getCause() instanceof ContentDump.NotWritableException)) {
+			if (!findSerializedThrowable(ex, ContentDump.NotWritableException.class, ClassLoader.getSystemClassLoader()).isPresent()) {
 				throw ex;
 			}
 			// ignore
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 36fb867..f9d9aa5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -558,7 +558,7 @@ public class AbstractStreamOperatorTest {
 					new MemCheckpointStreamFactory(Integer.MAX_VALUE));
 			fail("Exception expected.");
 		} catch (Exception e) {
-			assertEquals(failingException, e.getCause());
+			assertEquals(failingException.getMessage(), e.getCause().getMessage());
 		}
 	}
 
@@ -636,7 +636,7 @@ public class AbstractStreamOperatorTest {
 					new MemCheckpointStreamFactory(Integer.MAX_VALUE));
 			fail("Exception expected.");
 		} catch (Exception e) {
-			assertEquals(failingException, e.getCause());
+			assertEquals(failingException.getMessage(), e.getCause().getMessage());
 		}
 
 		// verify that the context has been closed, the operator snapshot result has been cancelled
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
index 903b237..5c1568e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/ContentDump.java
@@ -124,7 +124,7 @@ public class ContentDump {
 	/**
 	 * Exception thrown for an attempt to write into read-only {@link ContentDump}.
 	 */
-	public class NotWritableException extends RuntimeException {
+	public static class NotWritableException extends RuntimeException {
 		public NotWritableException(String name) {
 			super(String.format("File [%s] is not writable", name));
 		}