You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:00 UTC

[flink] branch release-1.15 updated (dbf1d62ee4b -> 7cd5fe93035)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


    from dbf1d62ee4b [FLINK-28544][python][e2e] Clean up python environment after e2e tests
     new ac6007116a1 [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.
     new f2e265f8192 [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.
     new 803dcad14e5 [FLINK-23528][connectors/kinesis] Gracefully shutdown  shard consumer to avoid InterruptionExceptions.
     new 8fb2c3a4cf3 [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.
     new 7cd5fe93035 [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kinesis/internals/KinesisDataFetcher.java      |   7 +-
 .../publisher/fanout/FanOutShardSubscriber.java    |   3 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 189 +++++++++++++++------
 .../kinesis/internals/KinesisDataFetcherTest.java  |   2 +-
 .../fanout/FanOutShardSubscriberTest.java          |  20 +++
 .../testutils/FakeKinesisBehavioursFactory.java    |  25 +--
 .../testutils/TestableKinesisDataFetcher.java      | 108 ++++++++----
 .../api/operators/collect/CollectSinkOperator.java |   4 +-
 8 files changed, 254 insertions(+), 104 deletions(-)


[flink] 03/05: [FLINK-23528][connectors/kinesis] Gracefully shutdown shard consumer to avoid InterruptionExceptions.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 803dcad14e5f7276b7f5926250ab9d31825e0a78
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 20:03:36 2021 +0200

    [FLINK-23528][connectors/kinesis] Gracefully shutdown  shard consumer to avoid InterruptionExceptions.
---
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 21b3736c789..e12bdaa0b3a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -819,7 +819,7 @@ public class KinesisDataFetcher<T> {
                 LOG.warn("Encountered exception closing record publisher factory", e);
             }
         } finally {
-            shardConsumersExecutor.shutdownNow();
+            shardConsumersExecutor.shutdown();
 
             cancelFuture.complete(null);
 


[flink] 02/05: [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2e265f81927d9871d408dec5d21ed6ebddd7910
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed Sep 8 18:22:34 2021 +0200

    [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.
    
    Mockito makes it so much harder to debug tests. Here, we replace Mockito with a small test class that emulates the previously dispersed functionality in a simpler way.
---
 .../kinesis/internals/KinesisDataFetcherTest.java  |  13 ++-
 .../testutils/FakeKinesisBehavioursFactory.java    |  25 ++---
 .../testutils/TestableKinesisDataFetcher.java      | 108 ++++++++++++++-------
 3 files changed, 94 insertions(+), 52 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index c2d6dde7d96..8be2bb41097 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
@@ -44,12 +49,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
-
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.mutable.MutableLong;
 import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
@@ -93,7 +92,7 @@ public class KinesisDataFetcherTest extends TestLogger {
         assertTrue(fetcher.isRunning());
     }
 
-    @Test(timeout = 10000)
+    @Test
     public void testIsRunningFalseAfterShutDown() throws InterruptedException {
         KinesisDataFetcher<String> fetcher =
                 createTestDataFetcherWithNoShards(10, 2, "test-stream");
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 1e737ac3358..46cc797ff7d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
@@ -660,17 +661,19 @@ public class FakeKinesisBehavioursFactory {
                             shardIterator);
             List<Record> records = Collections.emptyList();
             try {
-                String data = queue.take();
-                Record record =
-                        new Record()
-                                .withData(
-                                        ByteBuffer.wrap(
-                                                data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
-                                .withPartitionKey(UUID.randomUUID().toString())
-                                .withApproximateArrivalTimestamp(
-                                        new Date(System.currentTimeMillis()))
-                                .withSequenceNumber(String.valueOf(0));
-                records = Collections.singletonList(record);
+                String data = queue.poll(100, TimeUnit.MILLISECONDS);
+                if (data != null) {
+                    Record record =
+                            new Record()
+                                    .withData(
+                                            ByteBuffer.wrap(
+                                                    data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
+                                    .withPartitionKey(UUID.randomUUID().toString())
+                                    .withApproximateArrivalTimestamp(
+                                            new Date(System.currentTimeMillis()))
+                                    .withSequenceNumber(String.valueOf(0));
+                    records = Collections.singletonList(record);
+                }
             } catch (InterruptedException e) {
                 shardIterator = null;
             }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index b4c69eebb8c..4bdb95eb78a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -26,23 +26,19 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
-import org.mockito.invocation.InvocationOnMock;
-
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /** Extension of the {@link KinesisDataFetcher} for testing. */
 public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 
@@ -50,9 +46,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     private final Semaphore discoveryWaiter = new Semaphore(0);
     private final OneShotLatch shutdownWaiter;
 
-    private volatile boolean running;
-    private volatile boolean executorServiceShutdownNowCalled;
-
     public TestableKinesisDataFetcher(
             List<String> fakeStreams,
             SourceFunction.SourceContext<T> sourceContext,
@@ -109,8 +102,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 
         this.runWaiter = new OneShotLatch();
         this.shutdownWaiter = new OneShotLatch();
-
-        this.running = true;
     }
 
     @Override
@@ -130,28 +121,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     @Override
     protected ExecutorService createShardConsumersThreadPool(String subtaskName) {
         // this is just a dummy fetcher, so no need to create a thread pool for shard consumers
-        ExecutorService mockExecutorService = mock(ExecutorService.class);
-        when(mockExecutorService.isTerminated())
-                .thenAnswer((InvocationOnMock invocation) -> !running);
-        when(mockExecutorService.shutdownNow())
-                .thenAnswer(
-                        invocationOnMock -> {
-                            executorServiceShutdownNowCalled = true;
-                            return Collections.emptyList();
-                        });
-        try {
-            when(mockExecutorService.awaitTermination(anyLong(), any()))
-                    .thenAnswer(invocationOnMock -> !running && executorServiceShutdownNowCalled);
-        } catch (InterruptedException e) {
-            // We're just trying to stub the method. Must acknowledge the checked exception.
-        }
-        return mockExecutorService;
-    }
-
-    @Override
-    public void awaitTermination() throws InterruptedException {
-        this.running = false;
-        super.awaitTermination();
+        return new TestExecutorService();
     }
 
     @Override
@@ -174,4 +144,74 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     public void waitUntilDiscovery(int number) throws InterruptedException {
         discoveryWaiter.acquire(number);
     }
+
+    private static class TestExecutorService implements ExecutorService {
+        boolean terminated = false;
+
+        @Override
+        public void execute(Runnable command) {}
+
+        @Override
+        public void shutdown() {
+            terminated = true;
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            terminated = true;
+            return Collections.emptyList();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return terminated;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return terminated;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) {
+            return terminated;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return null;
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+            return null;
+        }
+    }
 }


[flink] 04/05: [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8fb2c3a4cf3a37b87403e462f675bb5d614a4b51
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 22:23:15 2021 +0200

    [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.
---
 .../connectors/kinesis/FlinkKinesisITCase.java     | 141 +++++++++++++++------
 .../kinesis/internals/KinesisDataFetcherTest.java  |  11 +-
 2 files changed, 105 insertions(+), 47 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 6a394860450..705be463475 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -19,44 +19,53 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestNameProvider;
 
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.utility.DockerImageName;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThan;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 
 /** IT cases for using Kinesis consumer/producer based on Kinesalite. */
-@Ignore("See FLINK-23528")
 public class FlinkKinesisITCase extends TestLogger {
-    public static final String TEST_STREAM = "test_stream";
+    private String stream;
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
 
     @ClassRule
     public static final MiniClusterWithClientResource MINI_CLUSTER =
@@ -69,20 +78,34 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
     private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
 
     private KinesisPubsubClient client;
 
     @Before
-    public void setupClient() {
+    public void setupClient() throws Exception {
         client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+        stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+        client.createTopic(stream, 1, new Properties());
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        testStopWithSavepoint(false);
+    }
+
+    @Test
+    public void testStopWithSavepointWithDrain() throws Exception {
+        testStopWithSavepoint(true);
     }
 
     /**
      * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
      *
      * <ol>
-     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *   <li>The test setups up a stream with 1000 records and creates a Flink job that reads them
      *       with very slowly (using up a large chunk of time of the mailbox).
      *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
      *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
@@ -90,83 +113,117 @@ public class FlinkKinesisITCase extends TestLogger {
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Test
-    public void testStopWithSavepoint() throws Exception {
-        client.createTopic(TEST_STREAM, 1, new Properties());
-
+    private void testStopWithSavepoint(boolean drain) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
-                TEST_STREAM,
+                stream,
                 IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(100L);
 
         Properties config = kinesalite.getContainerProperties();
         config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
         FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+                new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
 
-        DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper());
+        SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
+        DataStream<String> stream =
+                env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
                         .submit(
                                 () -> {
-                                    WaitingMapper.firstElement.await();
-                                    stopWithSavepoint();
-                                    WaitingMapper.stopped = true;
+                                    savepointTrigger.get().await();
+                                    stopWithSavepoint(drain);
                                     return null;
                                 });
         try {
             List<String> result = stream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset of the elements
             // validate that the prefix is as expected
-            assertThat(result, hasSize(lessThan(numElements)));
-            assertThat(
-                    result,
-                    equalTo(
-                            IntStream.range(0, numElements)
-                                    .mapToObj(String::valueOf)
-                                    .collect(Collectors.toList())
-                                    .subList(0, result.size())));
+            if (drain) {
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, numElements)
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            } else {
+                // stop with savepoint will most likely only return a small subset of the elements
+                // validate that the prefix is as expected
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, result.size())
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            }
         } finally {
-            stopTask.cancel(true);
+            stopTask.get();
         }
     }
 
-    private String stopWithSavepoint() throws Exception {
+    private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
         return MINI_CLUSTER
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
-                        true,
+                        drain,
                         temp.getRoot().getAbsolutePath(),
                         SavepointFormatType.CANONICAL)
                 .get();
     }
 
-    private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement;
-        static volatile boolean stopped;
+    private static class WaitingMapper
+            implements MapFunction<String, String>, CheckpointedFunction {
+        private final SharedReference<CountDownLatch> savepointTrigger;
+        private volatile boolean savepointTriggered;
+        // keep track on when the last checkpoint occurred
+        private transient Deadline checkpointDeadline;
+        private final AtomicInteger numElements = new AtomicInteger();
+
+        WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+            this.savepointTrigger = savepointTrigger;
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+        }
 
-        WaitingMapper() {
-            firstElement = new CountDownLatch(1);
-            stopped = false;
+        private void readObject(ObjectInputStream stream)
+                throws ClassNotFoundException, IOException {
+            stream.defaultReadObject();
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
         }
 
         @Override
         public String map(String value) throws Exception {
-            if (firstElement.getCount() > 0) {
-                firstElement.countDown();
-            }
-            if (!stopped) {
-                Thread.sleep(100);
+            numElements.incrementAndGet();
+            if (!savepointTriggered) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                }
+                savepointTriggered = checkpointDeadline.isOverdue();
             }
             return value;
         }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) {
+            // assume that after the first savepoint, this function will only see new checkpoint
+            // when the final savepoint is triggered
+            if (numElements.get() > 0) {
+                this.checkpointDeadline = Deadline.fromNow(Duration.ofSeconds(1));
+                savepointTrigger.get().countDown();
+            }
+            LOG.info("snapshotState {} {}", context.getCheckpointId(), numElements);
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) {}
     }
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 8be2bb41097..9f6d8e3f27f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
-import com.amazonaws.services.kinesis.model.HashKeyRange;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
-import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.CheckedThread;
@@ -49,6 +44,12 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa
 import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;


[flink] 05/05: [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cd5fe93035c5d875d10d9f3cb1d84d4ed045dc0
Author: Krzysztof Dziolak <dz...@amazon.com>
AuthorDate: Fri Jul 1 10:08:45 2022 +0100

    [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode
---
 .../kinesis/internals/KinesisDataFetcher.java      |  7 ++-
 .../publisher/fanout/FanOutShardSubscriber.java    |  3 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 68 +++++++++++++++-------
 .../fanout/FanOutShardSubscriberTest.java          | 20 +++++++
 4 files changed, 74 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index e12bdaa0b3a..4fcc80a250e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -819,7 +819,7 @@ public class KinesisDataFetcher<T> {
                 LOG.warn("Encountered exception closing record publisher factory", e);
             }
         } finally {
-            shardConsumersExecutor.shutdown();
+            gracefulShutdownShardConsumers();
 
             cancelFuture.complete(null);
 
@@ -852,6 +852,11 @@ public class KinesisDataFetcher<T> {
         StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams);
     }
 
+    /** Gracefully stops shardConsumersExecutor without interrupting running threads. */
+    private void gracefulShutdownShardConsumers() {
+        shardConsumersExecutor.shutdown();
+    }
+
     /**
      * Returns a flag indicating if this fetcher is running.
      *
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index afda248773c..a280a8f6537 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -361,8 +361,7 @@ public class FanOutShardSubscriber {
                 }
             } else if (subscriptionEvent.isSubscriptionComplete()) {
                 // The subscription is complete, but the shard might not be, so we return incomplete
-                result = false;
-                break;
+                return false;
             } else {
                 handleError(subscriptionEvent.getThrowable());
                 result = false;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 705be463475..ee8066a61aa 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -39,7 +39,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TestNameProvider;
 
 import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,6 +58,8 @@ import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -67,13 +69,13 @@ public class FlinkKinesisITCase extends TestLogger {
     private String stream;
     private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
 
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER =
+    @Rule
+    public final MiniClusterWithClientResource miniCluster =
             new MiniClusterWithClientResource(
                     new MiniClusterResourceConfiguration.Builder().build());
 
-    @ClassRule
-    public static KinesaliteContainer kinesalite =
+    @Rule
+    public KinesaliteContainer kinesalite =
             new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE));
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
@@ -86,19 +88,31 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Before
     public void setupClient() throws Exception {
-        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+        client = new KinesisPubsubClient(getContainerProperties());
         stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
         client.createTopic(stream, 1, new Properties());
     }
 
     @Test
     public void testStopWithSavepoint() throws Exception {
-        testStopWithSavepoint(false);
+        testStopWithSavepoint(false, false);
     }
 
     @Test
     public void testStopWithSavepointWithDrain() throws Exception {
-        testStopWithSavepoint(true);
+        testStopWithSavepoint(true, false);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithEfo() throws Exception {
+        testStopWithSavepoint(false, true);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+        testStopWithSavepoint(true, true);
     }
 
     /**
@@ -113,7 +127,7 @@ public class FlinkKinesisITCase extends TestLogger {
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    private void testStopWithSavepoint(boolean drain) throws Exception {
+    private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
@@ -124,14 +138,10 @@ public class FlinkKinesisITCase extends TestLogger {
         env.setParallelism(1);
         env.enableCheckpointing(100L);
 
-        Properties config = kinesalite.getContainerProperties();
-        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
-        FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
-
         SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
-        DataStream<String> stream =
-                env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
+        DataStream<String> outputStream =
+                env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));
+
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
@@ -142,7 +152,7 @@ public class FlinkKinesisITCase extends TestLogger {
                                     return null;
                                 });
         try {
-            List<String> result = stream.executeAndCollect(10000);
+            List<String> result = outputStream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset of the elements
             // validate that the prefix is as expected
             if (drain) {
@@ -167,10 +177,24 @@ public class FlinkKinesisITCase extends TestLogger {
         }
     }
 
+    private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        if (efo) {
+            config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+            config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
+        }
+        return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+    }
+
+    private Properties getContainerProperties() {
+        return kinesalite.getContainerProperties();
+    }
+
     private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
-                MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
-        return MINI_CLUSTER
+                miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+        return miniCluster
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
@@ -190,13 +214,15 @@ public class FlinkKinesisITCase extends TestLogger {
 
         WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
             this.savepointTrigger = savepointTrigger;
-            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+            // effectively set 1 hour timeout on the wait
+            // this is reduced to 1 second once the data starts flowing
+            checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
         }
 
         private void readObject(ObjectInputStream stream)
                 throws ClassNotFoundException, IOException {
             stream.defaultReadObject();
-            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+            checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
         }
 
         @Override
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index a41e7026984..9c711230096 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -31,6 +31,7 @@ import software.amazon.awssdk.services.kinesis.model.StartingPosition;
 import java.time.Duration;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT;
+import static org.junit.Assert.assertFalse;
 
 /** Tests for {@link FanOutShardSubscriber}. */
 public class FanOutShardSubscriberTest {
@@ -120,6 +121,25 @@ public class FanOutShardSubscriberTest {
         subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
     }
 
+    @Test
+    public void testSubscriptionCompletion() throws Exception {
+        FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2 errorKinesisV2 =
+                FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord();
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
+
+        StartingPosition startingPosition = StartingPosition.builder().build();
+        boolean result =
+                subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
+
+        assertFalse(result);
+    }
+
     @Test
     public void testTimeoutSubscribingToShard() throws Exception {
         thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);


[flink] 01/05: [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac6007116a199bcade7cb258552fca7d9efa1d01
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 20:02:42 2021 +0200

    [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.
    
    DataStream#executeAndCollect expects the CollectSinkOperator to register the accumulator at the end of the application or fails with some exception.
    However, a stop-with-savepoint without drain would not trigger CollectSinkOperator#finish and thus skip the registration.
---
 .../flink/streaming/api/operators/collect/CollectSinkOperator.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
index 8b78bc30fc9..5c84c9b1378 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
@@ -51,9 +51,9 @@ public class CollectSinkOperator<IN> extends StreamSink<IN> implements OperatorE
     }
 
     @Override
-    public void finish() throws Exception {
+    public void close() throws Exception {
         sinkFunction.accumulateFinalResults();
-        super.finish();
+        super.close();
     }
 
     public CompletableFuture<OperatorID> getOperatorIdFuture() {