You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/09/20 15:50:50 UTC

[flink] branch release-1.15 updated: [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new eb65655f8ce [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
eb65655f8ce is described below

commit eb65655f8ce39627a6bd28c8bdddd0c92db44d2e
Author: harker2015 <hu...@163.com>
AuthorDate: Tue Sep 20 17:31:44 2022 +0200

    [FLINK-29324][Connectors/Kinesis] Fix NPE for Kinesis connector when closing (#20853)
    
    * [FLINK-29324] Fix NPE for Kinesis connector when closing
    
    * [FLINK-29324] Add unit test case
---
 .../flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java  | 7 ++++++-
 .../streaming/connectors/kinesis/FlinkKinesisConsumerTest.java    | 8 ++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
index b0a729fb11e..488a1f54e85 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
@@ -418,7 +418,12 @@ public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T>
     public void close() throws Exception {
         cancel();
         // safe-guard when the fetcher has been interrupted, make sure to not leak resources
-        fetcher.awaitTermination();
+        // application might be stopped before connector subtask has been started
+        // so we must check if the fetcher is actually created
+        KinesisDataFetcher fetcher = this.fetcher;
+        if (fetcher != null) {
+            fetcher.awaitTermination();
+        }
         this.fetcher = null;
         super.close();
     }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index d48e04ed4e6..6ad94f823e8 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -1194,6 +1194,14 @@ public class FlinkKinesisConsumerTest extends TestLogger {
         testHarness.close();
     }
 
+    @Test
+    public void testCloseConnectorBeforeSubtaskStart() throws Exception {
+        Properties config = TestUtils.getStandardProperties();
+        FlinkKinesisConsumer<String> consumer =
+                new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
+        consumer.close();
+    }
+
     private void awaitRecordCount(ConcurrentLinkedQueue<? extends Object> queue, int count)
             throws Exception {
         Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));