You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/25 07:57:07 UTC

[flink] 02/02: [FLINK-19040][task] Close SourceReader in SourceOperator

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 89ad9ca93cfd25231d38db4b28a141452bc00f82
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Aug 24 17:57:49 2020 +0200

    [FLINK-19040][task] Close SourceReader in SourceOperator
---
 .../java/org/apache/flink/streaming/api/operators/SourceOperator.java    | 1 +
 .../org/apache/flink/streaming/api/operators/SourceOperatorTest.java     | 1 +
 2 files changed, 2 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 74f9360..33e95f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -162,6 +162,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 
 	@Override
 	public void close() throws Exception {
+		sourceReader.close();
 		eventTimeLogic.stopPeriodicWatermarkEmits();
 		super.close();
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index a2ef758..44a5d44 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -101,6 +101,7 @@ public class SourceOperatorTest {
 		finally {
 			operator.close();
 		}
+		assertTrue(mockSourceReader.isClosed());
 	}
 
 	@Test