You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/01 17:16:27 UTC

[GitHub] [flink] dannycranmer commented on a diff in pull request #20133: Kinesis Consumer stop-with-savepoint support including EFO

dannycranmer commented on code in PR #20133:
URL: https://github.com/apache/flink/pull/20133#discussion_r912126766


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java:
##########
@@ -51,9 +51,9 @@ public void handleOperatorEvent(OperatorEvent evt) {
     }
 
     @Override
-    public void finish() throws Exception {
+    public void close() throws Exception {
         sinkFunction.accumulateFinalResults();
-        super.finish();
+        super.close();
     }

Review Comment:
   Why did we need to change this? Does this impact any other connectors?



##########
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java:
##########
@@ -149,7 +149,9 @@ public void run() {
                     // we can close this consumer thread once we've reached the end of the
                     // subscribed shard
                     break;
-                } else if (result == CANCELLED) {
+                } else if (result == CANCELLED && isRunning()) {
+                    // cancellation might be signalled as part of graceful shutdown during
+                    // stop-with-savepoint, we should ignore it when the consumer is not running

Review Comment:
   nit: Since we have changed the `shutdownFetcher` method, I think `running == false` is going to be the new normal shutdown shutdown procedure for EFO



##########
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java:
##########
@@ -69,104 +81,174 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
     private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
 
     private KinesisPubsubClient client;
 
     @Before
-    public void setupClient() {
-        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+    public void setupClient() throws Exception {
+        client = new KinesisPubsubClient(getContainerProperties());
+        stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+        client.createTopic(stream, 1, new Properties());
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        testStopWithSavepoint(false, false);
+    }
+
+    @Test
+    public void testStopWithSavepointWithDrain() throws Exception {
+        testStopWithSavepoint(true, false);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithEfo() throws Exception {
+        testStopWithSavepoint(false, true);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+        testStopWithSavepoint(true, true);
     }
 
     /**
      * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
      *
      * <ol>
-     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *   <li>The test setups up a stream with 1000 records and creates a Flink job that reads them
      *       with very slowly (using up a large chunk of time of the mailbox).
      *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
      *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
      *       cancel.
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Test
-    public void testStopWithSavepoint() throws Exception {
-        client.createTopic(TEST_STREAM, 1, new Properties());
-
+    private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
-                TEST_STREAM,
+                stream,
                 IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(100L);
 
-        Properties config = kinesalite.getContainerProperties();
-        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
-        FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+        SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
+        DataStream<String> outputStream =
+                env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));
 
-        DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper());
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
                         .submit(
                                 () -> {
-                                    WaitingMapper.firstElement.await();
-                                    stopWithSavepoint();
-                                    WaitingMapper.stopped = true;
+                                    savepointTrigger.get().await();
+                                    stopWithSavepoint(drain);
                                     return null;
                                 });
         try {
-            List<String> result = stream.executeAndCollect(10000);
+            List<String> result = outputStream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset of the elements
             // validate that the prefix is as expected
-            assertThat(result, hasSize(lessThan(numElements)));
-            assertThat(
-                    result,
-                    equalTo(
-                            IntStream.range(0, numElements)
-                                    .mapToObj(String::valueOf)
-                                    .collect(Collectors.toList())
-                                    .subList(0, result.size())));
+            if (drain) {
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, numElements)
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            } else {
+                // stop with savepoint will most likely only return a small subset of the elements
+                // validate that the prefix is as expected
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, result.size())
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            }
         } finally {
-            stopTask.cancel(true);
+            stopTask.get();
+        }
+    }
+
+    private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        if (efo) {
+            config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+            config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
         }
+        return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+    }
+
+    private Properties getContainerProperties() {
+        return kinesalite.getContainerProperties();
     }
 
-    private String stopWithSavepoint() throws Exception {
+    private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
         return MINI_CLUSTER
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
-                        true,
+                        drain,
                         temp.getRoot().getAbsolutePath(),
                         SavepointFormatType.CANONICAL)
                 .get();
     }
 
-    private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement;
-        static volatile boolean stopped;
+    private static class WaitingMapper
+            implements MapFunction<String, String>, CheckpointedFunction {
+        private final SharedReference<CountDownLatch> savepointTrigger;
+        private volatile boolean savepointTriggered;
+        // keep track on when the last checkpoint occurred
+        private transient Deadline checkpointDeadline;
+        private final AtomicInteger numElements = new AtomicInteger();
+
+        WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+            this.savepointTrigger = savepointTrigger;
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
+        }
 
-        WaitingMapper() {
-            firstElement = new CountDownLatch(1);
-            stopped = false;
+        private void readObject(ObjectInputStream stream)
+                throws ClassNotFoundException, IOException {
+            stream.defaultReadObject();
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));
         }
 
         @Override
         public String map(String value) throws Exception {
-            if (firstElement.getCount() > 0) {
-                firstElement.countDown();
-            }
-            if (!stopped) {
-                Thread.sleep(100);
+            numElements.incrementAndGet();
+            if (!savepointTriggered) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {

Review Comment:
   Is swallowing the exception the right thing todo? Can you add a comment to explain this?



##########
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java:
##########
@@ -69,104 +81,174 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
     private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
 
     private KinesisPubsubClient client;
 
     @Before
-    public void setupClient() {
-        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+    public void setupClient() throws Exception {
+        client = new KinesisPubsubClient(getContainerProperties());
+        stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+        client.createTopic(stream, 1, new Properties());
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        testStopWithSavepoint(false, false);
+    }
+
+    @Test
+    public void testStopWithSavepointWithDrain() throws Exception {
+        testStopWithSavepoint(true, false);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithEfo() throws Exception {
+        testStopWithSavepoint(false, true);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+        testStopWithSavepoint(true, true);
     }
 
     /**
      * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
      *
      * <ol>
-     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *   <li>The test setups up a stream with 1000 records and creates a Flink job that reads them
      *       with very slowly (using up a large chunk of time of the mailbox).
      *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
      *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
      *       cancel.
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Test
-    public void testStopWithSavepoint() throws Exception {
-        client.createTopic(TEST_STREAM, 1, new Properties());
-
+    private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
-                TEST_STREAM,
+                stream,
                 IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(100L);
 
-        Properties config = kinesalite.getContainerProperties();
-        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
-        FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+        SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
+        DataStream<String> outputStream =
+                env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));
 
-        DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper());
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
                         .submit(
                                 () -> {
-                                    WaitingMapper.firstElement.await();
-                                    stopWithSavepoint();
-                                    WaitingMapper.stopped = true;
+                                    savepointTrigger.get().await();
+                                    stopWithSavepoint(drain);
                                     return null;
                                 });
         try {
-            List<String> result = stream.executeAndCollect(10000);
+            List<String> result = outputStream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset of the elements
             // validate that the prefix is as expected
-            assertThat(result, hasSize(lessThan(numElements)));
-            assertThat(
-                    result,
-                    equalTo(
-                            IntStream.range(0, numElements)
-                                    .mapToObj(String::valueOf)
-                                    .collect(Collectors.toList())
-                                    .subList(0, result.size())));
+            if (drain) {
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, numElements)
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            } else {
+                // stop with savepoint will most likely only return a small subset of the elements
+                // validate that the prefix is as expected
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, result.size())
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            }
         } finally {
-            stopTask.cancel(true);
+            stopTask.get();
+        }
+    }
+
+    private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        if (efo) {
+            config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+            config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
         }
+        return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+    }
+
+    private Properties getContainerProperties() {
+        return kinesalite.getContainerProperties();
     }
 
-    private String stopWithSavepoint() throws Exception {
+    private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
         return MINI_CLUSTER
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
-                        true,
+                        drain,
                         temp.getRoot().getAbsolutePath(),
                         SavepointFormatType.CANONICAL)
                 .get();
     }
 
-    private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement;
-        static volatile boolean stopped;
+    private static class WaitingMapper
+            implements MapFunction<String, String>, CheckpointedFunction {
+        private final SharedReference<CountDownLatch> savepointTrigger;
+        private volatile boolean savepointTriggered;
+        // keep track on when the last checkpoint occurred
+        private transient Deadline checkpointDeadline;
+        private final AtomicInteger numElements = new AtomicInteger();
+
+        WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+            this.savepointTrigger = savepointTrigger;
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));

Review Comment:
   Why is this deadline so high?
   



##########
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java:
##########
@@ -154,7 +155,8 @@ private RecordPublisherRunResult runWithBackoff(
                     fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
                             toSdkV2StartingPosition(nextStartingPosition), eventConsumer);
             attempt = 0;
-        } catch (FanOutSubscriberInterruptedException ex) {
+        } catch (FanOutSubscriberInterruptedException | RejectedExecutionException ex) {
+            // RejectedExecutionException occurs during soft shutdown, e.g. stop with savepoint

Review Comment:
   Did we also test stop without savepoint? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org