You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/21 10:32:43 UTC

[flink-connector-aws] branch main updated: [FLINK-30224][Connectors/Kinesis] Added an IT test for slow FlinKinesisConsumer's run which caused NPE in close (#41)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ee0fe3  [FLINK-30224][Connectors/Kinesis] Added an IT test for slow FlinKinesisConsumer's run which caused NPE in close (#41)
9ee0fe3 is described below

commit 9ee0fe32d1a9e6d62e514824ab553956fe88ee9d
Author: Astamur Kirillin <as...@gmail.com>
AuthorDate: Wed Dec 21 10:32:38 2022 +0000

    [FLINK-30224][Connectors/Kinesis] Added an IT test for slow FlinKinesisConsumer's run which caused NPE in close (#41)
    
    Co-authored-by: Astamur Kirillin <as...@amazon.com>
---
 .../connectors/kinesis/FlinkKinesisITCase.java     | 99 +++++++++++++++++++++-
 1 file changed, 97 insertions(+), 2 deletions(-)

diff --git a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 7f7676a..6066d82 100644
--- a/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
@@ -29,7 +30,10 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
+import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.junit.SharedObjects;
@@ -38,6 +42,7 @@ import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TestNameProvider;
 
+import org.hamcrest.MatcherAssert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -52,9 +57,12 @@ import java.io.ObjectInputStream;
 import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -63,6 +71,8 @@ import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfi
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertTrue;
 
 /** IT cases for using Kinesis consumer/producer based on Kinesalite. */
 public class FlinkKinesisITCase extends TestLogger {
@@ -115,6 +125,40 @@ public class FlinkKinesisITCase extends TestLogger {
         testStopWithSavepoint(true, true);
     }
 
+    /**
+     * Tests stop with savepoint while {@link
+     * FlinkKinesisConsumer#run(SourceFunction.SourceContext)}} with a slow run method is still
+     * hasn't finished run method and hasn't set a {@link KinesisDataFetcher} yet.
+     */
+    @Test
+    public void testStopWithSavepointWithSlowConsumer() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(100L);
+
+        SharedReference<CyclicBarrier> savepointTrigger = sharedObjects.add(new CyclicBarrier(2));
+        env.addSource(createSlowKinesisConsumer(savepointTrigger)).addSink(new DiscardingSink<>());
+
+        ForkJoinTask<String> stopTask =
+                ForkJoinPool.commonPool()
+                        .submit(
+                                () -> {
+                                    // Wait until run method is reached in the consumer
+                                    savepointTrigger.get().await();
+                                    String result = stopWithSavepoint(savepointTrigger);
+                                    log.info("StopWithSavepoint result: {}", result);
+                                    return result;
+                                });
+
+        env.execute();
+        String result = stopTask.get(1, TimeUnit.MINUTES);
+
+        MatcherAssert.assertThat("Savepoint result shouldn't be null", result, notNullValue());
+        assertTrue(
+                "Unexpected savepoint file's name format",
+                result.matches("^file:.*/savepoint-.*$"));
+    }
+
     /**
      * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
      *
@@ -183,21 +227,42 @@ public class FlinkKinesisITCase extends TestLogger {
         return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
     }
 
+    private FlinkKinesisConsumer<String> createSlowKinesisConsumer(
+            SharedReference<CyclicBarrier> savepointTrigger) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        return new SlowFlinkKinesisConsumer<>(stream, STRING_SCHEMA, config, savepointTrigger);
+    }
+
     private Properties getContainerProperties() {
         return kinesalite.getContainerProperties();
     }
 
     private String stopWithSavepoint(boolean drain) throws Exception {
+        return callStopWithSavepoint(drain).get();
+    }
+
+    private String stopWithSavepoint(SharedReference<CyclicBarrier> savepointTrigger)
+            throws Exception {
+        CompletableFuture<String> resultFuture = callStopWithSavepoint(false);
+        // Release barrier in consumer after stop was called
+        savepointTrigger.get().await();
+
+        return resultFuture.get();
+    }
+
+    private CompletableFuture<String> callStopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+
+        log.info("Calling stopWithSavepoint: {}", job.getJobId());
         return miniCluster
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
                         drain,
                         temp.getRoot().getAbsolutePath(),
-                        SavepointFormatType.CANONICAL)
-                .get();
+                        SavepointFormatType.CANONICAL);
     }
 
     private static class WaitingMapper
@@ -248,4 +313,34 @@ public class FlinkKinesisITCase extends TestLogger {
         @Override
         public void initializeState(FunctionInitializationContext context) {}
     }
+
+    /**
+     * A simple implementation of {@link FlinkKinesisConsumer} with a slow run() method's execution
+     * which is controlled by incoming {@link CyclicBarrier} from the test.
+     */
+    private static class SlowFlinkKinesisConsumer<T> extends FlinkKinesisConsumer<T> {
+        private final SharedReference<CyclicBarrier> savepointTrigger;
+
+        public SlowFlinkKinesisConsumer(
+                String stream,
+                DeserializationSchema<T> deserializer,
+                Properties configProps,
+                SharedReference<CyclicBarrier> savepointTrigger) {
+            super(stream, deserializer, configProps);
+            this.savepointTrigger = savepointTrigger;
+        }
+
+        @Override
+        public void run(SourceContext<T> sourceContext) throws Exception {
+            // Wait until it's allowed to do a stop-with-savepoint operation
+            // (a job and a task are both in RUNNING state)
+            savepointTrigger.get().await();
+
+            // Start a new waiting cycle and imitate a slow run operation until stop is called
+            savepointTrigger.get().reset();
+            savepointTrigger.get().await();
+
+            super.run(sourceContext);
+        }
+    }
 }