You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:04 UTC

[flink] 04/05: [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8fb2c3a4cf3a37b87403e462f675bb5d614a4b51
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 22:23:15 2021 +0200

    [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.
---
 .../connectors/kinesis/FlinkKinesisITCase.java     | 141 +++++++++++++++------
 .../kinesis/internals/KinesisDataFetcherTest.java  |  11 +-
 2 files changed, 105 insertions(+), 47 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 6a394860450..705be463475 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -19,44 +19,53 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestNameProvider;
 
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.utility.DockerImageName;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 
 /** IT cases for using Kinesis consumer/producer based on Kinesalite. */
-@Ignore("See FLINK-23528")
 public class FlinkKinesisITCase extends TestLogger {
-    public static final String TEST_STREAM = "test_stream";
+    private String stream;
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
 
     @ClassRule
     public static final MiniClusterWithClientResource MINI_CLUSTER =
@@ -69,20 +78,34 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
     private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
 
     private KinesisPubsubClient client;
 
     @Before
-    public void setupClient() {
+    public void setupClient() throws Exception {
         client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+        stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+        client.createTopic(stream, 1, new Properties());
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        testStopWithSavepoint(false);
+    }
+
+    @Test
+    public void testStopWithSavepointWithDrain() throws Exception {
+        testStopWithSavepoint(true);
     }
 
     /**
      * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
      *
      * <ol>
-     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *   <li>The test setups up a stream with 1000 records and creates a Flink job that reads them
      *       with very slowly (using up a large chunk of time of the mailbox).
      *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
      *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
@@ -90,83 +113,117 @@ public class FlinkKinesisITCase extends TestLogger {
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Test
-    public void testStopWithSavepoint() throws Exception {
-        client.createTopic(TEST_STREAM, 1, new Properties());
-
+    private void testStopWithSavepoint(boolean drain) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
-                TEST_STREAM,
+                stream,
                 IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(100L);
 
         Properties config = kinesalite.getContainerProperties();
         config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
         FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+                new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
 
-        DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper());
+        SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
+        DataStream<String> stream =
+                env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
                         .submit(
                                 () -> {
-                                    WaitingMapper.firstElement.await();
-                                    stopWithSavepoint();
-                                    WaitingMapper.stopped = true;
+                                    savepointTrigger.get().await();
+                                    stopWithSavepoint(drain);
                                     return null;
                                 });
         try {
             List<String> result = stream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset of the elements
             // validate that the prefix is as expected
-            assertThat(result, hasSize(lessThan(numElements)));
-            assertThat(
-                    result,
-                    equalTo(
-                            IntStream.range(0, numElements)
-                                    .mapToObj(String::valueOf)
-                                    .collect(Collectors.toList())
-                                    .subList(0, result.size())));
+            if (drain) {
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, numElements)
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            } else {
+                // stop with savepoint will most likely only return a small subset of the elements
+                // validate that the prefix is as expected
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, result.size())
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            }
         } finally {
-            stopTask.cancel(true);
+            stopTask.get();
         }
     }
 
-    private String stopWithSavepoint() throws Exception {
+    private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
         return MINI_CLUSTER
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
-                        true,
+                        drain,
                         temp.getRoot().getAbsolutePath(),
                         SavepointFormatType.CANONICAL)
                 .get();
     }
 
-    private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement;
-        static volatile boolean stopped;
+    private static class WaitingMapper
+            implements MapFunction<String, String>, CheckpointedFunction {
+        private final SharedReference<CountDownLatch> savepointTrigger;
+        private volatile boolean savepointTriggered;
+        // keep track on when the last checkpoint occurred
+        private transient Deadline checkpointDeadline;
+        private final AtomicInteger numElements = new AtomicInteger();
+
+        WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+            this.savepointTrigger = savepointTrigger;
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+        }
 
-        WaitingMapper() {
-            firstElement = new CountDownLatch(1);
-            stopped = false;
+        private void readObject(ObjectInputStream stream)
+                throws ClassNotFoundException, IOException {
+            stream.defaultReadObject();
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
         }
 
         @Override
         public String map(String value) throws Exception {
-            if (firstElement.getCount() > 0) {
-                firstElement.countDown();
-            }
-            if (!stopped) {
-                Thread.sleep(100);
+            numElements.incrementAndGet();
+            if (!savepointTriggered) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                }
+                savepointTriggered = checkpointDeadline.isOverdue();
             }
             return value;
         }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) {
+            // assume that after the first savepoint, this function will only see new checkpoint
+            // when the final savepoint is triggered
+            if (numElements.get() > 0) {
+                this.checkpointDeadline = Deadline.fromNow(Duration.ofSeconds(1));
+                savepointTrigger.get().countDown();
+            }
+            LOG.info("snapshotState {} {}", context.getCheckpointId(), numElements);
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) {}
     }
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 8be2bb41097..9f6d8e3f27f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
@@ -49,6 +44,12 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;