You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:05 UTC

[flink] 05/05: [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cd5fe93035c5d875d10d9f3cb1d84d4ed045dc0
Author: Krzysztof Dziolak <dz...@amazon.com>
AuthorDate: Fri Jul 1 10:08:45 2022 +0100

    [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode
---
 .../kinesis/internals/KinesisDataFetcher.java      |  7 ++-
 .../publisher/fanout/FanOutShardSubscriber.java    |  3 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 68 +++++++++++++++-------
 .../fanout/FanOutShardSubscriberTest.java          | 20 +++++++
 4 files changed, 74 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index e12bdaa0b3a..4fcc80a250e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -819,7 +819,7 @@ public class KinesisDataFetcher<T> {
                 LOG.warn("Encountered exception closing record publisher factory", e);
             }
         } finally {
-            shardConsumersExecutor.shutdown();
+            gracefulShutdownShardConsumers();
 
             cancelFuture.complete(null);
 
@@ -852,6 +852,11 @@ public class KinesisDataFetcher<T> {
         StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams);
     }
 
+    /** Gracefully stops shardConsumersExecutor without interrupting running threads. */
+    private void gracefulShutdownShardConsumers() {
+        shardConsumersExecutor.shutdown();
+    }
+
     /**
      * Returns a flag indicating if this fetcher is running.
      *
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index afda248773c..a280a8f6537 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -361,8 +361,7 @@ public class FanOutShardSubscriber {
                 }
             } else if (subscriptionEvent.isSubscriptionComplete()) {
                 // The subscription is complete, but the shard might not be, so we return incomplete
-                result = false;
-                break;
+                return false;
             } else {
                 handleError(subscriptionEvent.getThrowable());
                 result = false;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 705be463475..ee8066a61aa 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -39,7 +39,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TestNameProvider;
 
 import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,6 +58,8 @@ import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -67,13 +69,13 @@ public class FlinkKinesisITCase extends TestLogger {
     private String stream;
     private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
 
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER =
+    @Rule
+    public final MiniClusterWithClientResource miniCluster =
             new MiniClusterWithClientResource(
                     new MiniClusterResourceConfiguration.Builder().build());
 
-    @ClassRule
-    public static KinesaliteContainer kinesalite =
+    @Rule
+    public KinesaliteContainer kinesalite =
             new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE));
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
@@ -86,19 +88,31 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Before
     public void setupClient() throws Exception {
-        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+        client = new KinesisPubsubClient(getContainerProperties());
         stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
         client.createTopic(stream, 1, new Properties());
     }
 
     @Test
     public void testStopWithSavepoint() throws Exception {
-        testStopWithSavepoint(false);
+        testStopWithSavepoint(false, false);
     }
 
     @Test
     public void testStopWithSavepointWithDrain() throws Exception {
-        testStopWithSavepoint(true);
+        testStopWithSavepoint(true, false);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithEfo() throws Exception {
+        testStopWithSavepoint(false, true);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+        testStopWithSavepoint(true, true);
     }
 
     /**
@@ -113,7 +127,7 @@ public class FlinkKinesisITCase extends TestLogger {
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    private void testStopWithSavepoint(boolean drain) throws Exception {
+    private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
@@ -124,14 +138,10 @@ public class FlinkKinesisITCase extends TestLogger {
         env.setParallelism(1);
         env.enableCheckpointing(100L);
 
-        Properties config = kinesalite.getContainerProperties();
-        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
-        FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
-
         SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
-        DataStream<String> stream =
-                env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
+        DataStream<String> outputStream =
+                env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));
+
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
@@ -142,7 +152,7 @@ public class FlinkKinesisITCase extends TestLogger {
                                     return null;
                                 });
         try {
-            List<String> result = stream.executeAndCollect(10000);
+            List<String> result = outputStream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset of the elements
             // validate that the prefix is as expected
             if (drain) {
@@ -167,10 +177,24 @@ public class FlinkKinesisITCase extends TestLogger {
         }
     }
 
+    private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        if (efo) {
+            config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+            config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
+        }
+        return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+    }
+
+    private Properties getContainerProperties() {
+        return kinesalite.getContainerProperties();
+    }
+
     private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
-                MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
-        return MINI_CLUSTER
+                miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+        return miniCluster
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
@@ -190,13 +214,15 @@ public class FlinkKinesisITCase extends TestLogger {
 
         WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
             this.savepointTrigger = savepointTrigger;
-            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+            // effectively set 1 hour timeout on the wait
+            // this is reduced to 1 second once the data starts flowing
+            checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
         }
 
         private void readObject(ObjectInputStream stream)
                 throws ClassNotFoundException, IOException {
             stream.defaultReadObject();
-            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+            checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
         }
 
         @Override
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index a41e7026984..9c711230096 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -31,6 +31,7 @@ import software.amazon.awssdk.services.kinesis.model.StartingPosition;
 import java.time.Duration;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT;
+import static org.junit.Assert.assertFalse;
 
 /** Tests for {@link FanOutShardSubscriber}. */
 public class FanOutShardSubscriberTest {
@@ -120,6 +121,25 @@ public class FanOutShardSubscriberTest {
         subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
     }
 
+    @Test
+    public void testSubscriptionCompletion() throws Exception {
+        FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2 errorKinesisV2 =
+                FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord();
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
+
+        StartingPosition startingPosition = StartingPosition.builder().build();
+        boolean result =
+                subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
+
+        assertFalse(result);
+    }
+
     @Test
     public void testTimeoutSubscribingToShard() throws Exception {
         thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);