You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/09/20 15:50:50 UTC
[flink] branch release-1.15 updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new eb65655f8ce [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
eb65655f8ce is described below
commit eb65655f8ce39627a6bd28c8bdddd0c92db44d2e
Author: harker2015 <hu...@163.com>
AuthorDate: Tue Sep 20 17:31:44 2022 +0200
[FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
* [FLINK-29324] Fix NPE for Kinesis connector when closing
* [FLINK-29324] Add unit test case
---
.../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java | 7 ++++++-
.../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java | 8 ++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b0a729fb11e..488a1f54e85 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -418,7 +418,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
public void close() throws Exception {
cancel();
// safe-guard when the fetcher has been interrupted, make sure to not leak resources
- fetcher.awaitTermination();
+ // application might be stopped before connector subtask has been started
+ // so we must check if the fetcher is actually created
+ KinesisDataFetcher fetcher = this.fetcher;
+ if (fetcher != null) {
+ fetcher.awaitTermination();
+ }
this.fetcher = null;
super.close();
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d48e04ed4e6..6ad94f823e8 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -1194,6 +1194,14 @@ public class FlinkKinesisConsumerTest extends TestLogger {
testHarness.close();
}
+ @Test
+ public void testCloseConnectorBeforeSubtaskStart() throws Exception {
+ Properties config = TestUtils.getStandardProperties();
+ FlinkKinesisConsumer<String> consumer =
+ new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
+ consumer.close();
+ }
+
private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count)
throws Exception {
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));