You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/02/21 07:23:36 UTC

[flink] 02/02: [FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dddbc2b63caf25571e37978796bd4ce47f5e046a
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Feb 20 14:44:56 2020 +0100

    [FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug
---
 .../flink/hdfstests/ContinuousFileProcessingTest.java  | 18 ++++++++++++++++++
 .../functions/source/ContinuousFileReaderOperator.java |  5 ++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 1e783f1..a370d7e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
@@ -150,6 +151,23 @@ public class ContinuousFileProcessingTest {
 		}
 	}
 
+	@Test(expected = ExpectedTestException.class)
+	public void testExceptionHandling() throws Exception {
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI + "/" + UUID.randomUUID() + "/")) {
+			@Override
+			public void close() {
+				throw new ExpectedTestException();
+			}
+		};
+
+		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness = createHarness(format);
+		harness.getExecutionConfig().setAutoWatermarkInterval(10);
+		harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		try (OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester = harness) {
+			tester.open();
+		}
+	}
+
 	@Test
 	public void testFileReadingOperatorWithIngestionTime() throws Exception {
 		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index ed8ab4a..0ade509 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -428,7 +427,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
 		cleanUp();
 	}
 
-	private void cleanUp() {
+	private void cleanUp() throws Exception {
 		LOG.debug("cleanup, state={}", state);
 
 		RunnableWithException[] runClose = {
@@ -447,7 +446,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
 		}
 		currentSplit = null;
 		if (firstException != null) {
-			throw new FlinkRuntimeException("Unable to properly cleanup ContinuousFileReaderOperator", firstException);
+			throw firstException;
 		}
 	}