You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/02/21 07:23:34 UTC

[flink] branch master updated (7bc7930 -> dddbc2b)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7bc7930  [FLINK-15652][kubernetes] Change imagePullSecrets option to list ConfigOption
     new 0f71ab3  [FLINK-16019][runtime] fix ContinuousFileReaderOperator error handling
     new dddbc2b  [FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/hdfstests/ContinuousFileProcessingTest.java  | 18 ++++++++++++++++++
 .../functions/source/ContinuousFileReaderOperator.java |  9 ++++-----
 2 files changed, 22 insertions(+), 5 deletions(-)


[flink] 01/02: [FLINK-16019][runtime] fix ContinuousFileReaderOperator error handling

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f71ab3edb9318f2c6cf301be0245087aeb35445
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Feb 12 15:31:56 2020 +0100

    [FLINK-16019][runtime] fix ContinuousFileReaderOperator error handling
---
 .../streaming/api/functions/source/ContinuousFileReaderOperator.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 3b2e2af..ed8ab4a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -376,7 +376,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
 			try {
 				cleanUp();
 			} catch (Exception ex) {
-				e = ExceptionUtils.firstOrSuppressed(ex, e);
+				e = ex;
 			}
 		}
 		{
@@ -442,7 +442,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
 			try {
 				r.run();
 			} catch (Exception e) {
-				firstException = ExceptionUtils.firstOrSuppressed(firstException, e);
+				firstException = ExceptionUtils.firstOrSuppressed(e, firstException);
 			}
 		}
 		currentSplit = null;


[flink] 02/02: [FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dddbc2b63caf25571e37978796bd4ce47f5e046a
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Feb 20 14:44:56 2020 +0100

    [FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug
---
 .../flink/hdfstests/ContinuousFileProcessingTest.java  | 18 ++++++++++++++++++
 .../functions/source/ContinuousFileReaderOperator.java |  5 ++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 1e783f1..a370d7e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
@@ -150,6 +151,23 @@ public class ContinuousFileProcessingTest {
 		}
 	}
 
+	@Test(expected = ExpectedTestException.class)
+	public void testExceptionHandling() throws Exception {
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI + "/" + UUID.randomUUID() + "/")) {
+			@Override
+			public void close() {
+				throw new ExpectedTestException();
+			}
+		};
+
+		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness = createHarness(format);
+		harness.getExecutionConfig().setAutoWatermarkInterval(10);
+		harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		try (OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester = harness) {
+			tester.open();
+		}
+	}
+
 	@Test
 	public void testFileReadingOperatorWithIngestionTime() throws Exception {
 		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index ed8ab4a..0ade509 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -428,7 +427,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
 		cleanUp();
 	}
 
-	private void cleanUp() {
+	private void cleanUp() throws Exception {
 		LOG.debug("cleanup, state={}", state);
 
 		RunnableWithException[] runClose = {
@@ -447,7 +446,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
 		}
 		currentSplit = null;
 		if (firstException != null) {
-			throw new FlinkRuntimeException("Unable to properly cleanup ContinuousFileReaderOperator", firstException);
+			throw firstException;
 		}
 	}