You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:51 UTC

[flink] branch master updated (1e7732f18a8 -> 4bbf3194dc3)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 1e7732f18a8 [FLINK-28090][python] Support attachAsDataStream in Python Table API
     new 6080840e9c1 [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.
     new b589f9f29f5 [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.
     new 16393625ed5 [FLINK-23528][connectors/kinesis] Gracefully shutdown  shard consumer to avoid InterruptionExceptions.
     new 7496b68d29f [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.
     new 4bbf3194dc3 [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kinesis/internals/KinesisDataFetcher.java      |   7 +-
 .../publisher/fanout/FanOutShardSubscriber.java    |   3 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 187 +++++++++++++++------
 .../kinesis/internals/KinesisDataFetcherTest.java  |   2 +-
 .../fanout/FanOutShardSubscriberTest.java          |  20 +++
 .../testutils/FakeKinesisBehavioursFactory.java    |  25 +--
 .../testutils/TestableKinesisDataFetcher.java      | 108 ++++++++----
 .../api/operators/collect/CollectSinkOperator.java |   4 +-
 8 files changed, 254 insertions(+), 102 deletions(-)


[flink] 04/05: [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7496b68d29f6bed83a5ac6dc21716281717adba0
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 22:23:15 2021 +0200

    [FLINK-23528][connectors/kinesis] Reenable FlinkKinesisITCase and rewrite stopWithSavepoint.
---
 .../connectors/kinesis/FlinkKinesisITCase.java     | 139 +++++++++++++++------
 1 file changed, 99 insertions(+), 40 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index f02bb5b066e..e2fe28a7a3d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -19,41 +19,53 @@ package org.apache.flink.streaming.connectors.kinesis;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.connectors.kinesis.testutils.KinesaliteContainer;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestNameProvider;
 
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.utility.DockerImageName;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 
 /** IT cases for using Kinesis consumer/producer based on Kinesalite. */
-@Ignore("See FLINK-23528")
 public class FlinkKinesisITCase extends TestLogger {
-    public static final String TEST_STREAM = "test_stream";
+    private String stream;
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
 
     @ClassRule
     public static final MiniClusterWithClientResource MINI_CLUSTER =
@@ -66,20 +78,34 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
     private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
 
     private KinesisPubsubClient client;
 
     @Before
-    public void setupClient() {
+    public void setupClient() throws Exception {
         client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+        stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+        client.createTopic(stream, 1, new Properties());
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        testStopWithSavepoint(false);
+    }
+
+    @Test
+    public void testStopWithSavepointWithDrain() throws Exception {
+        testStopWithSavepoint(true);
     }
 
     /**
      * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
      *
      * <ol>
-     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *   <li>The test setups up a stream with 1000 records and creates a Flink job that reads them
      *       with very slowly (using up a large chunk of time of the mailbox).
      *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
      *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
@@ -87,82 +113,115 @@ public class FlinkKinesisITCase extends TestLogger {
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Test
-    public void testStopWithSavepoint() throws Exception {
-        client.createTopic(TEST_STREAM, 1, new Properties());
-
+    private void testStopWithSavepoint(boolean drain) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
-                TEST_STREAM,
+                stream,
                 IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(100L);
 
         Properties config = kinesalite.getContainerProperties();
         config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
         FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+                new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
 
-        DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper());
+        SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
+        DataStream<String> stream =
+                env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
                         .submit(
                                 () -> {
-                                    WaitingMapper.firstElement.await();
-                                    stopWithSavepoint();
-                                    WaitingMapper.stopped = true;
+                                    savepointTrigger.get().await();
+                                    stopWithSavepoint(drain);
                                     return null;
                                 });
         try {
             List<String> result = stream.executeAndCollect(10000);
-            // stop with savepoint will most likely only return a small subset of the elements
-            // validate that the prefix is as expected
-            assertThat(result).size().isLessThan(numElements);
-            assertThat(result)
-                    .isEqualTo(
-                            IntStream.range(0, numElements)
-                                    .mapToObj(String::valueOf)
-                                    .collect(Collectors.toList())
-                                    .subList(0, result.size()));
+            if (drain) {
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, numElements)
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            } else {
+                // stop with savepoint will most likely only return a small subset of the elements
+                // validate that the prefix is as expected
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, result.size())
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            }
         } finally {
-            stopTask.cancel(true);
+            stopTask.get();
         }
     }
 
-    private String stopWithSavepoint() throws Exception {
+    private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
         return MINI_CLUSTER
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
-                        true,
+                        drain,
                         temp.getRoot().getAbsolutePath(),
                         SavepointFormatType.CANONICAL)
                 .get();
     }
 
-    private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement;
-        static volatile boolean stopped;
+    private static class WaitingMapper
+            implements MapFunction<String, String>, CheckpointedFunction {
+        private final SharedReference<CountDownLatch> savepointTrigger;
+        private volatile boolean savepointTriggered;
+        // keep track on when the last checkpoint occurred
+        private transient Deadline checkpointDeadline;
+        private final AtomicInteger numElements = new AtomicInteger();
+
+        WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+            this.savepointTrigger = savepointTrigger;
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+        }
 
-        WaitingMapper() {
-            firstElement = new CountDownLatch(1);
-            stopped = false;
+        private void readObject(ObjectInputStream stream)
+                throws ClassNotFoundException, IOException {
+            stream.defaultReadObject();
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
         }
 
         @Override
         public String map(String value) throws Exception {
-            if (firstElement.getCount() > 0) {
-                firstElement.countDown();
-            }
-            if (!stopped) {
-                Thread.sleep(100);
+            numElements.incrementAndGet();
+            if (!savepointTriggered) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                }
+                savepointTriggered = checkpointDeadline.isOverdue();
             }
             return value;
         }
+
+        @Override
+        public void snapshotState(FunctionSnapshotContext context) {
+            // assume that after the first savepoint, this function will only see new checkpoint
+            // when the final savepoint is triggered
+            if (numElements.get() > 0) {
+                this.checkpointDeadline = Deadline.fromNow(Duration.ofSeconds(1));
+                savepointTrigger.get().countDown();
+            }
+            LOG.info("snapshotState {} {}", context.getCheckpointId(), numElements);
+        }
+
+        @Override
+        public void initializeState(FunctionInitializationContext context) {}
     }
 }


[flink] 02/05: [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b589f9f29f5ca5ef35695f80d007cb3d3bddf439
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Wed Sep 8 18:22:34 2021 +0200

    [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest.
    
    Mockito makes it so much harder to debug tests. Here, we replace Mockito with a small test class that emulates the previously dispersed functionality in a simpler way.
---
 .../kinesis/internals/KinesisDataFetcherTest.java  |   2 +-
 .../testutils/FakeKinesisBehavioursFactory.java    |  25 ++---
 .../testutils/TestableKinesisDataFetcher.java      | 108 ++++++++++++++-------
 3 files changed, 89 insertions(+), 46 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 02b41a81b84..f8d80bbea72 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -89,7 +89,7 @@ public class KinesisDataFetcherTest extends TestLogger {
         assertThat(fetcher.isRunning()).isTrue();
     }
 
-    @Test(timeout = 10000)
+    @Test
     public void testIsRunningFalseAfterShutDown() throws InterruptedException {
         KinesisDataFetcher<String> fetcher =
                 createTestDataFetcherWithNoShards(10, 2, "test-stream");
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index 1e737ac3358..46cc797ff7d 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle;
@@ -660,17 +661,19 @@ public class FakeKinesisBehavioursFactory {
                             shardIterator);
             List<Record> records = Collections.emptyList();
             try {
-                String data = queue.take();
-                Record record =
-                        new Record()
-                                .withData(
-                                        ByteBuffer.wrap(
-                                                data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
-                                .withPartitionKey(UUID.randomUUID().toString())
-                                .withApproximateArrivalTimestamp(
-                                        new Date(System.currentTimeMillis()))
-                                .withSequenceNumber(String.valueOf(0));
-                records = Collections.singletonList(record);
+                String data = queue.poll(100, TimeUnit.MILLISECONDS);
+                if (data != null) {
+                    Record record =
+                            new Record()
+                                    .withData(
+                                            ByteBuffer.wrap(
+                                                    data.getBytes(ConfigConstants.DEFAULT_CHARSET)))
+                                    .withPartitionKey(UUID.randomUUID().toString())
+                                    .withApproximateArrivalTimestamp(
+                                            new Date(System.currentTimeMillis()))
+                                    .withSequenceNumber(String.valueOf(0));
+                    records = Collections.singletonList(record);
+                }
             } catch (InterruptedException e) {
                 shardIterator = null;
             }
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
index b4c69eebb8c..4bdb95eb78a 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
@@ -26,23 +26,19 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface
 import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
 import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 
-import org.mockito.invocation.InvocationOnMock;
-
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 /** Extension of the {@link KinesisDataFetcher} for testing. */
 public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 
@@ -50,9 +46,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     private final Semaphore discoveryWaiter = new Semaphore(0);
     private final OneShotLatch shutdownWaiter;
 
-    private volatile boolean running;
-    private volatile boolean executorServiceShutdownNowCalled;
-
     public TestableKinesisDataFetcher(
             List<String> fakeStreams,
             SourceFunction.SourceContext<T> sourceContext,
@@ -109,8 +102,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
 
         this.runWaiter = new OneShotLatch();
         this.shutdownWaiter = new OneShotLatch();
-
-        this.running = true;
     }
 
     @Override
@@ -130,28 +121,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     @Override
     protected ExecutorService createShardConsumersThreadPool(String subtaskName) {
         // this is just a dummy fetcher, so no need to create a thread pool for shard consumers
-        ExecutorService mockExecutorService = mock(ExecutorService.class);
-        when(mockExecutorService.isTerminated())
-                .thenAnswer((InvocationOnMock invocation) -> !running);
-        when(mockExecutorService.shutdownNow())
-                .thenAnswer(
-                        invocationOnMock -> {
-                            executorServiceShutdownNowCalled = true;
-                            return Collections.emptyList();
-                        });
-        try {
-            when(mockExecutorService.awaitTermination(anyLong(), any()))
-                    .thenAnswer(invocationOnMock -> !running && executorServiceShutdownNowCalled);
-        } catch (InterruptedException e) {
-            // We're just trying to stub the method. Must acknowledge the checked exception.
-        }
-        return mockExecutorService;
-    }
-
-    @Override
-    public void awaitTermination() throws InterruptedException {
-        this.running = false;
-        super.awaitTermination();
+        return new TestExecutorService();
     }
 
     @Override
@@ -174,4 +144,74 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     public void waitUntilDiscovery(int number) throws InterruptedException {
         discoveryWaiter.acquire(number);
     }
+
+    private static class TestExecutorService implements ExecutorService {
+        boolean terminated = false;
+
+        @Override
+        public void execute(Runnable command) {}
+
+        @Override
+        public void shutdown() {
+            terminated = true;
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            terminated = true;
+            return Collections.emptyList();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return terminated;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return terminated;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) {
+            return terminated;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return null;
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(
+                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
+            return null;
+        }
+    }
 }


[flink] 05/05: [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4bbf3194dc3816dcaa6a76ed7adf0fd228b16293
Author: Krzysztof Dziolak <dz...@amazon.com>
AuthorDate: Fri Jul 1 10:08:45 2022 +0100

    [FLINK-23528][connectors/kinesis]Graceful shutdown of Kinesis Consumer in EFO mode
---
 .../kinesis/internals/KinesisDataFetcher.java      |  7 ++-
 .../publisher/fanout/FanOutShardSubscriber.java    |  3 +-
 .../connectors/kinesis/FlinkKinesisITCase.java     | 68 +++++++++++++++-------
 .../fanout/FanOutShardSubscriberTest.java          | 20 +++++++
 4 files changed, 74 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index e12bdaa0b3a..4fcc80a250e 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -819,7 +819,7 @@ public class KinesisDataFetcher<T> {
                 LOG.warn("Encountered exception closing record publisher factory", e);
             }
         } finally {
-            shardConsumersExecutor.shutdown();
+            gracefulShutdownShardConsumers();
 
             cancelFuture.complete(null);
 
@@ -852,6 +852,11 @@ public class KinesisDataFetcher<T> {
         StreamConsumerRegistrarUtil.deregisterStreamConsumers(configProps, streams);
     }
 
+    /** Gracefully stops shardConsumersExecutor without interrupting running threads. */
+    private void gracefulShutdownShardConsumers() {
+        shardConsumersExecutor.shutdown();
+    }
+
     /**
      * Returns a flag indicating if this fetcher is running.
      *
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
index afda248773c..a280a8f6537 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
@@ -361,8 +361,7 @@ public class FanOutShardSubscriber {
                 }
             } else if (subscriptionEvent.isSubscriptionComplete()) {
                 // The subscription is complete, but the shard might not be, so we return incomplete
-                result = false;
-                break;
+                return false;
             } else {
                 handleError(subscriptionEvent.getThrowable());
                 result = false;
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index e2fe28a7a3d..0db61953b50 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -39,7 +39,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TestNameProvider;
 
 import org.junit.Before;
-import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,6 +58,8 @@ import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFO_CONSUMER_NAME;
+import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RECORD_PUBLISHER_TYPE;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -67,13 +69,13 @@ public class FlinkKinesisITCase extends TestLogger {
     private String stream;
     private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisITCase.class);
 
-    @ClassRule
-    public static final MiniClusterWithClientResource MINI_CLUSTER =
+    @Rule
+    public final MiniClusterWithClientResource miniCluster =
             new MiniClusterWithClientResource(
                     new MiniClusterResourceConfiguration.Builder().build());
 
-    @ClassRule
-    public static KinesaliteContainer kinesalite =
+    @Rule
+    public KinesaliteContainer kinesalite =
             new KinesaliteContainer(DockerImageName.parse(DockerImageVersions.KINESALITE));
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
@@ -86,19 +88,31 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Before
     public void setupClient() throws Exception {
-        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+        client = new KinesisPubsubClient(getContainerProperties());
         stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
         client.createTopic(stream, 1, new Properties());
     }
 
     @Test
     public void testStopWithSavepoint() throws Exception {
-        testStopWithSavepoint(false);
+        testStopWithSavepoint(false, false);
     }
 
     @Test
     public void testStopWithSavepointWithDrain() throws Exception {
-        testStopWithSavepoint(true);
+        testStopWithSavepoint(true, false);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithEfo() throws Exception {
+        testStopWithSavepoint(false, true);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+        testStopWithSavepoint(true, true);
     }
 
     /**
@@ -113,7 +127,7 @@ public class FlinkKinesisITCase extends TestLogger {
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    private void testStopWithSavepoint(boolean drain) throws Exception {
+    private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
@@ -124,14 +138,10 @@ public class FlinkKinesisITCase extends TestLogger {
         env.setParallelism(1);
         env.enableCheckpointing(100L);
 
-        Properties config = kinesalite.getContainerProperties();
-        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
-        FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
-
         SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
-        DataStream<String> stream =
-                env.addSource(consumer).map(new WaitingMapper(savepointTrigger));
+        DataStream<String> outputStream =
+                env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));
+
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
@@ -142,7 +152,7 @@ public class FlinkKinesisITCase extends TestLogger {
                                     return null;
                                 });
         try {
-            List<String> result = stream.executeAndCollect(10000);
+            List<String> result = outputStream.executeAndCollect(10000);
             if (drain) {
                 assertThat(
                         result,
@@ -165,10 +175,24 @@ public class FlinkKinesisITCase extends TestLogger {
         }
     }
 
+    private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        if (efo) {
+            config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+            config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
+        }
+        return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+    }
+
+    private Properties getContainerProperties() {
+        return kinesalite.getContainerProperties();
+    }
+
     private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
-                MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
-        return MINI_CLUSTER
+                miniCluster.getClusterClient().listJobs().get().stream().findFirst().get();
+        return miniCluster
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
@@ -188,13 +212,15 @@ public class FlinkKinesisITCase extends TestLogger {
 
         WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
             this.savepointTrigger = savepointTrigger;
-            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+            // effectively set 1 hour timeout on the wait
+            // this is reduced to 1 second once the data starts flowing
+            checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
         }
 
         private void readObject(ObjectInputStream stream)
                 throws ClassNotFoundException, IOException {
             stream.defaultReadObject();
-            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+            checkpointDeadline = Deadline.fromNow(Duration.ofMinutes(10));
         }
 
         @Override
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
index a41e7026984..9c711230096 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriberTest.java
@@ -31,6 +31,7 @@ import software.amazon.awssdk.services.kinesis.model.StartingPosition;
 import java.time.Duration;
 
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT;
+import static org.junit.Assert.assertFalse;
 
 /** Tests for {@link FanOutShardSubscriber}. */
 public class FanOutShardSubscriberTest {
@@ -120,6 +121,25 @@ public class FanOutShardSubscriberTest {
         subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
     }
 
+    @Test
+    public void testSubscriptionCompletion() throws Exception {
+        FakeKinesisFanOutBehavioursFactory.AbstractSingleShardFanOutKinesisV2 errorKinesisV2 =
+                FakeKinesisFanOutBehavioursFactory.emptyBatchFollowedBySingleRecord();
+
+        FanOutShardSubscriber subscriber =
+                new FanOutShardSubscriber(
+                        "consumerArn",
+                        "shardId",
+                        errorKinesisV2,
+                        DEFAULT_SUBSCRIBE_TO_SHARD_TIMEOUT);
+
+        StartingPosition startingPosition = StartingPosition.builder().build();
+        boolean result =
+                subscriber.subscribeToShardAndConsumeRecords(startingPosition, event -> {});
+
+        assertFalse(result);
+    }
+
     @Test
     public void testTimeoutSubscribingToShard() throws Exception {
         thrown.expect(FanOutShardSubscriber.RecoverableFanOutSubscriberException.class);


[flink] 01/05: [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6080840e9c1f1ded457845a675dbfab4cacad12c
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 20:02:42 2021 +0200

    [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.
    
    DataStream#executeAndCollect expects the CollectSinkOperator to register the accumulator at the end of the application or fails with some exception.
    However, a stop-with-savepoint without drain would not trigger CollectSinkOperator#finish and thus skip the registration.
---
 .../flink/streaming/api/operators/collect/CollectSinkOperator.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
index 8b78bc30fc9..5c84c9b1378 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
@@ -51,9 +51,9 @@ public class CollectSinkOperator<IN> extends StreamSink<IN> implements OperatorE
     }
 
     @Override
-    public void finish() throws Exception {
+    public void close() throws Exception {
         sinkFunction.accumulateFinalResults();
-        super.finish();
+        super.close();
     }
 
     public CompletableFuture<OperatorID> getOperatorIdFuture() {


[flink] 03/05: [FLINK-23528][connectors/kinesis] Gracefully shutdown shard consumer to avoid InterruptionExceptions.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 16393625ed5e01833ec42e66f533a93c00ac4471
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 20:03:36 2021 +0200

    [FLINK-23528][connectors/kinesis] Gracefully shutdown  shard consumer to avoid InterruptionExceptions.
---
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 21b3736c789..e12bdaa0b3a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -819,7 +819,7 @@ public class KinesisDataFetcher<T> {
                 LOG.warn("Encountered exception closing record publisher factory", e);
             }
         } finally {
-            shardConsumersExecutor.shutdownNow();
+            shardConsumersExecutor.shutdown();
 
             cancelFuture.complete(null);