You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/04 13:54:37 UTC

[GitHub] [flink] dzikosc commented on a diff in pull request #20133: Kinesis Consumer stop-with-savepoint support including EFO

dzikosc commented on code in PR #20133:
URL: https://github.com/apache/flink/pull/20133#discussion_r913028922


##########
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java:
##########
@@ -69,104 +81,174 @@ public class FlinkKinesisITCase extends TestLogger {
 
     @Rule public TemporaryFolder temp = new TemporaryFolder();
 
+    @Rule public SharedObjects sharedObjects = SharedObjects.create();
+
     private static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
 
     private KinesisPubsubClient client;
 
     @Before
-    public void setupClient() {
-        client = new KinesisPubsubClient(kinesalite.getContainerProperties());
+    public void setupClient() throws Exception {
+        client = new KinesisPubsubClient(getContainerProperties());
+        stream = TestNameProvider.getCurrentTestName().replaceAll("\\W", "");
+        client.createTopic(stream, 1, new Properties());
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        testStopWithSavepoint(false, false);
+    }
+
+    @Test
+    public void testStopWithSavepointWithDrain() throws Exception {
+        testStopWithSavepoint(true, false);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithEfo() throws Exception {
+        testStopWithSavepoint(false, true);
+    }
+
+    @Test
+    @Ignore("Kinesalite does not support EFO")
+    public void testStopWithSavepointWithDrainAndEfo() throws Exception {
+        testStopWithSavepoint(true, true);
     }
 
     /**
      * Tests that pending elements do not cause a deadlock during stop with savepoint (FLINK-17170).
      *
      * <ol>
-     *   <li>The test setups up a stream with 100 records and creates a Flink job that reads them
+     *   <li>The test setups up a stream with 1000 records and creates a Flink job that reads them
      *       with very slowly (using up a large chunk of time of the mailbox).
      *   <li>After ensuring that consumption has started, the job is stopped in a parallel thread.
      *   <li>Without the fix of FLINK-17170, the job now has a high chance to deadlock during
      *       cancel.
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Test
-    public void testStopWithSavepoint() throws Exception {
-        client.createTopic(TEST_STREAM, 1, new Properties());
-
+    private void testStopWithSavepoint(boolean drain, boolean efo) throws Exception {
         // add elements to the test stream
         int numElements = 1000;
         client.sendMessage(
-                TEST_STREAM,
+                stream,
                 IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.enableCheckpointing(100L);
 
-        Properties config = kinesalite.getContainerProperties();
-        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
-        FlinkKinesisConsumer<String> consumer =
-                new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
+        SharedReference<CountDownLatch> savepointTrigger = sharedObjects.add(new CountDownLatch(1));
+        DataStream<String> outputStream =
+                env.addSource(createKinesisConsumer(efo)).map(new WaitingMapper(savepointTrigger));
 
-        DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper());
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
                         .submit(
                                 () -> {
-                                    WaitingMapper.firstElement.await();
-                                    stopWithSavepoint();
-                                    WaitingMapper.stopped = true;
+                                    savepointTrigger.get().await();
+                                    stopWithSavepoint(drain);
                                     return null;
                                 });
         try {
-            List<String> result = stream.executeAndCollect(10000);
+            List<String> result = outputStream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset of the elements
             // validate that the prefix is as expected
-            assertThat(result, hasSize(lessThan(numElements)));
-            assertThat(
-                    result,
-                    equalTo(
-                            IntStream.range(0, numElements)
-                                    .mapToObj(String::valueOf)
-                                    .collect(Collectors.toList())
-                                    .subList(0, result.size())));
+            if (drain) {
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, numElements)
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            } else {
+                // stop with savepoint will most likely only return a small subset of the elements
+                // validate that the prefix is as expected
+                assertThat(
+                        result,
+                        contains(
+                                IntStream.range(0, result.size())
+                                        .mapToObj(String::valueOf)
+                                        .toArray()));
+            }
         } finally {
-            stopTask.cancel(true);
+            stopTask.get();
+        }
+    }
+
+    private FlinkKinesisConsumer<String> createKinesisConsumer(boolean efo) {
+        Properties config = getContainerProperties();
+        config.setProperty(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON.name());
+        if (efo) {
+            config.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO");
+            config.putIfAbsent(EFO_CONSUMER_NAME, "efo-flink-app");
         }
+        return new FlinkKinesisConsumer<>(stream, STRING_SCHEMA, config);
+    }
+
+    private Properties getContainerProperties() {
+        return kinesalite.getContainerProperties();
     }
 
-    private String stopWithSavepoint() throws Exception {
+    private String stopWithSavepoint(boolean drain) throws Exception {
         JobStatusMessage job =
                 MINI_CLUSTER.getClusterClient().listJobs().get().stream().findFirst().get();
         return MINI_CLUSTER
                 .getClusterClient()
                 .stopWithSavepoint(
                         job.getJobId(),
-                        true,
+                        drain,
                         temp.getRoot().getAbsolutePath(),
                         SavepointFormatType.CANONICAL)
                 .get();
     }
 
-    private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement;
-        static volatile boolean stopped;
+    private static class WaitingMapper
+            implements MapFunction<String, String>, CheckpointedFunction {
+        private final SharedReference<CountDownLatch> savepointTrigger;
+        private volatile boolean savepointTriggered;
+        // keep track on when the last checkpoint occurred
+        private transient Deadline checkpointDeadline;
+        private final AtomicInteger numElements = new AtomicInteger();
+
+        WaitingMapper(SharedReference<CountDownLatch> savepointTrigger) {
+            this.savepointTrigger = savepointTrigger;
+            checkpointDeadline = Deadline.fromNow(Duration.ofDays(1));

Review Comment:
   Left a comment explaining some of the context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org