You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/08/25 07:57:07 UTC
[flink] 02/02: [FLINK-19040][task] Close SourceReader in
SourceOperator
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 89ad9ca93cfd25231d38db4b28a141452bc00f82
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Aug 24 17:57:49 2020 +0200
[FLINK-19040][task] Close SourceReader in SourceOperator
---
.../java/org/apache/flink/streaming/api/operators/SourceOperator.java | 1 +
.../org/apache/flink/streaming/api/operators/SourceOperatorTest.java | 1 +
2 files changed, 2 insertions(+)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 74f9360..33e95f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -162,6 +162,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
@Override
public void close() throws Exception {
+ sourceReader.close();
eventTimeLogic.stopPeriodicWatermarkEmits();
super.close();
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index a2ef758..44a5d44 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -101,6 +101,7 @@ public class SourceOperatorTest {
finally {
operator.close();
}
+ assertTrue(mockSourceReader.isClosed());
}
@Test