You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/02/21 07:23:36 UTC
[flink] 02/02: [FLINK-16019][runtime][test] Implement test coverage
for FLINK-16019 bug
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit dddbc2b63caf25571e37978796bd4ce47f5e046a
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Feb 20 14:44:56 2020 +0100
[FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug
---
.../flink/hdfstests/ContinuousFileProcessingTest.java | 18 ++++++++++++++++++
.../functions/source/ContinuousFileReaderOperator.java | 5 ++---
2 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 1e783f1..a370d7e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
@@ -150,6 +151,23 @@ public class ContinuousFileProcessingTest {
}
}
+ @Test(expected = ExpectedTestException.class)
+ public void testExceptionHandling() throws Exception {
+ TextInputFormat format = new TextInputFormat(new Path(hdfsURI + "/" + UUID.randomUUID() + "/")) {
+ @Override
+ public void close() {
+ throw new ExpectedTestException();
+ }
+ };
+
+ OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness = createHarness(format);
+ harness.getExecutionConfig().setAutoWatermarkInterval(10);
+ harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ try (OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester = harness) {
+ tester.open();
+ }
+ }
+
@Test
public void testFileReadingOperatorWithIngestionTime() throws Exception {
String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index ed8ab4a..0ade509 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
@@ -428,7 +427,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
cleanUp();
}
- private void cleanUp() {
+ private void cleanUp() throws Exception {
LOG.debug("cleanup, state={}", state);
RunnableWithException[] runClose = {
@@ -447,7 +446,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
}
currentSplit = null;
if (firstException != null) {
- throw new FlinkRuntimeException("Unable to properly cleanup ContinuousFileReaderOperator", firstException);
+ throw firstException;
}
}