You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/21 13:32:32 UTC

[GitHub] [flink] lincoln-lil opened a new pull request, #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

lincoln-lil opened a new pull request, #21546:
URL: https://github.com/apache/flink/pull/21546

   ## What is the purpose of the change
   There's an issue in AsyncWaitOperator that it not properly blocking retries when timeout occurs, reported by user ml https://lists.apache.org/thread/n1rqml8h9j8zkhxwc48rdvj7jrw2rjcy. This happens when a retry timer is unfired and then the user function timeout was triggered first, the current RetryableResultHandlerDelegator doesn't take the timeout process properly and will cause more unexpected retries. This pr aims to fix it.
   
   ## Brief change log
   Overwrite the timeout logic in RetryableResultHandlerDelegator and add guard for retryAwaiting flag to prevent unexpected retries.
   
   ## Verifying this change
   add case to current AsyncWaitOperatorTest
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
     - The serializers: (no )
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
     - Does this pull request introduce a new feature? (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21546:
URL: https://github.com/apache/flink/pull/21546#issuecomment-1361323566

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c4eb5fa8a11c1f747b207ece6caf71e94bfdb13c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4eb5fa8a11c1f747b207ece6caf71e94bfdb13c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c4eb5fa8a11c1f747b207ece6caf71e94bfdb13c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by "lincoln-lil (via GitHub)" <gi...@apache.org>.
lincoln-lil commented on code in PR #21546:
URL: https://github.com/apache/flink/pull/21546#discussion_r1090472706


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1260,6 +1263,79 @@ private void testProcessingTimeWithRetry(
         }
     }
 
+    /**
+     * Test the AsyncWaitOperator with an always-timeout async function under unordered mode and
+     * processing time.
+     */
+    @Test
+    public void testProcessingTimeWithTimeoutFunctionUnorderedWithRetry() throws Exception {
+        testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.UNORDERED);
+    }
+
+    /**
+     * Test the AsyncWaitOperator with an always-timeout async function under ordered mode and
+     * processing time.
+     */
+    @Test
+    public void testProcessingTimeWithTimeoutFunctionOrderedWithRetry() throws Exception {
+        testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    private void testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode mode)
+            throws Exception {
+
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+
+        AsyncRetryStrategy exceptionRetryStrategy =
+                new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(5, 100L)
+                        .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+                        .build();
+        AlwaysTimeoutWithDefaultValueAsyncFunction asyncFunction =
+                new AlwaysTimeoutWithDefaultValueAsyncFunction();
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<>(
+                                        asyncFunction, TIMEOUT, 10, mode, exceptionRetryStrategy))
+                        .build()) {
+
+            final long initialTime = 0L;
+            final Queue<Object> expectedOutput = new ArrayDeque<>();
+
+            testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+            testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+
+            expectedOutput.add(new StreamRecord<>(-1, initialTime + 1));
+            expectedOutput.add(new StreamRecord<>(-1, initialTime + 2));
+
+            Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+            while (testHarness.getOutput().size() < expectedOutput.size()
+                    && deadline.hasTimeLeft()) {
+                testHarness.processAll();
+                //noinspection BusyWait
+                Thread.sleep(100);
+            }
+
+            if (mode == AsyncDataStream.OutputMode.ORDERED) {
+                TestHarnessUtil.assertOutputEquals(
+                        "ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
+            } else {
+                TestHarnessUtil.assertOutputEqualsSorted(
+                        "UNORDERED Output was not correct.",
+                        expectedOutput,
+                        testHarness.getOutput(),
+                        new StreamRecordComparator());
+            }
+
+            // verify the elements' try count
+            assertTrue(asyncFunction.getTryCount(1) == 2);

Review Comment:
   make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by "gaoyunhaii (via GitHub)" <gi...@apache.org>.
gaoyunhaii commented on code in PR #21546:
URL: https://github.com/apache/flink/pull/21546#discussion_r1090468404


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -442,36 +456,50 @@ public RetryableResultHandlerDelegator(
             this.processingTimeService = processingTimeService;
         }
 
-        public void registerTimeout(long timeout) {
-            resultHandler.registerTimeout(processingTimeService, timeout);
+        private void registerTimeout(long timeout) {
+            resultHandler.timeoutTimer =
+                    registerTimer(processingTimeService, timeout, t -> timerTriggered());
+        }
+
+        private void cancelRetryTimer() {
+            if (delayedRetryTimer != null) {
+                delayedRetryTimer.cancel(true);

Review Comment:
   We may use false here to avoid interrupt the timer thread unexpectically and cause possible failures. 



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1260,6 +1263,79 @@ private void testProcessingTimeWithRetry(
         }
     }
 
+    /**
+     * Test the AsyncWaitOperator with an always-timeout async function under unordered mode and
+     * processing time.
+     */
+    @Test
+    public void testProcessingTimeWithTimeoutFunctionUnorderedWithRetry() throws Exception {
+        testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.UNORDERED);
+    }
+
+    /**
+     * Test the AsyncWaitOperator with an always-timeout async function under ordered mode and
+     * processing time.
+     */
+    @Test
+    public void testProcessingTimeWithTimeoutFunctionOrderedWithRetry() throws Exception {
+        testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    private void testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.OutputMode mode)
+            throws Exception {
+
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+
+        AsyncRetryStrategy exceptionRetryStrategy =
+                new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(5, 100L)
+                        .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+                        .build();
+        AlwaysTimeoutWithDefaultValueAsyncFunction asyncFunction =
+                new AlwaysTimeoutWithDefaultValueAsyncFunction();
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<>(
+                                        asyncFunction, TIMEOUT, 10, mode, exceptionRetryStrategy))
+                        .build()) {
+
+            final long initialTime = 0L;
+            final Queue<Object> expectedOutput = new ArrayDeque<>();
+
+            testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+            testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+
+            expectedOutput.add(new StreamRecord<>(-1, initialTime + 1));
+            expectedOutput.add(new StreamRecord<>(-1, initialTime + 2));
+
+            Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+            while (testHarness.getOutput().size() < expectedOutput.size()
+                    && deadline.hasTimeLeft()) {
+                testHarness.processAll();
+                //noinspection BusyWait
+                Thread.sleep(100);
+            }
+
+            if (mode == AsyncDataStream.OutputMode.ORDERED) {
+                TestHarnessUtil.assertOutputEquals(
+                        "ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
+            } else {
+                TestHarnessUtil.assertOutputEqualsSorted(
+                        "UNORDERED Output was not correct.",
+                        expectedOutput,
+                        testHarness.getOutput(),
+                        new StreamRecordComparator());
+            }
+
+            // verify the elements' try count
+            assertTrue(asyncFunction.getTryCount(1) == 2);

Review Comment:
   Here we relies on TIMEOUT and the `sleep(500)` in the testing async function. In consideration of the status of the testing machine, there might be drift for the timers, which might cause the counts to be other values. 
   
   For now we may first make it <= 2, and see if the 500 slack is large enough for the other side. If there are failures, we may need to change to some kind of more  deterministic logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by "lincoln-lil (via GitHub)" <gi...@apache.org>.
lincoln-lil commented on PR #21546:
URL: https://github.com/apache/flink/pull/21546#issuecomment-1408415268

   @gaoyunhaii thanks for reviewing this! I've updated this pr according to your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #21546:
URL: https://github.com/apache/flink/pull/21546#discussion_r1066547072


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -443,33 +445,53 @@ public RetryableResultHandlerDelegator(
         }
 
         public void registerTimeout(long timeout) {
-            resultHandler.registerTimeout(processingTimeService, timeout);
+            // must overwrite the registerTimeout here to control the callback logic
+            registerTimeout(processingTimeService, timeout, resultHandler);
+        }
+
+        private void registerTimeout(

Review Comment:
   Might be extracted as a utility method? 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -443,33 +445,53 @@ public RetryableResultHandlerDelegator(
         }
 
         public void registerTimeout(long timeout) {
-            resultHandler.registerTimeout(processingTimeService, timeout);
+            // must overwrite the registerTimeout here to control the callback logic
+            registerTimeout(processingTimeService, timeout, resultHandler);
+        }
+
+        private void registerTimeout(
+                ProcessingTimeService processingTimeService,
+                long timeout,
+                ResultHandler resultHandler) {
+            final long timeoutTimestamp =
+                    timeout + processingTimeService.getCurrentProcessingTime();
+
+            resultHandler.timeoutTimer =
+                    processingTimeService.registerTimer(
+                            timeoutTimestamp, timestamp -> timerTriggered());
+        }
+
+        /** Rewrite the timeout process to deal with retry state. */
+        private void timerTriggered() throws Exception {
+            if (!resultHandler.completed.get()) {
+                // cancel delayed retry timer first
+                if (delayedRetryTimer != null) {
+                    delayedRetryTimer.cancel(true);
+                }
+                // force reset retryAwaiting to prevent the handler to trigger retry unnecessarily
+                retryAwaiting.set(false);
+
+                userFunction.timeout(resultHandler.inputRecord.getValue(), this);
+            }
         }
 
         @Override
         public void complete(Collection<OUT> results) {
             Preconditions.checkNotNull(
                     results, "Results must not be null, use empty collection to emit nothing");
             if (!retryDisabledOnFinish.get() && resultHandler.inputRecord.isRecord()) {
-                // ignore repeated call(s)
-                if (!retryAwaiting.compareAndSet(false, true)) {
-                    return;
-                }
-
                 processRetryInMailBox(results, null);
             } else {
+                if (delayedRetryTimer != null) {

Review Comment:
   Should we also need to cancel the time on `completeExceptionally`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #21546:
URL: https://github.com/apache/flink/pull/21546#issuecomment-1362266272

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil merged pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by "lincoln-lil (via GitHub)" <gi...@apache.org>.
lincoln-lil merged PR #21546:
URL: https://github.com/apache/flink/pull/21546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #21546: [FLINK-30477][datastream] Fix AsyncWaitOperator that not properly blocking retries when timeout occurs

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #21546:
URL: https://github.com/apache/flink/pull/21546#discussion_r1067076982


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -443,33 +445,53 @@ public RetryableResultHandlerDelegator(
         }
 
         public void registerTimeout(long timeout) {
-            resultHandler.registerTimeout(processingTimeService, timeout);
+            // must overwrite the registerTimeout here to control the callback logic
+            registerTimeout(processingTimeService, timeout, resultHandler);
+        }
+
+        private void registerTimeout(

Review Comment:
   make sense



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -443,33 +445,53 @@ public RetryableResultHandlerDelegator(
         }
 
         public void registerTimeout(long timeout) {
-            resultHandler.registerTimeout(processingTimeService, timeout);
+            // must overwrite the registerTimeout here to control the callback logic
+            registerTimeout(processingTimeService, timeout, resultHandler);
+        }
+
+        private void registerTimeout(
+                ProcessingTimeService processingTimeService,
+                long timeout,
+                ResultHandler resultHandler) {
+            final long timeoutTimestamp =
+                    timeout + processingTimeService.getCurrentProcessingTime();
+
+            resultHandler.timeoutTimer =
+                    processingTimeService.registerTimer(
+                            timeoutTimestamp, timestamp -> timerTriggered());
+        }
+
+        /** Rewrite the timeout process to deal with retry state. */
+        private void timerTriggered() throws Exception {
+            if (!resultHandler.completed.get()) {
+                // cancel delayed retry timer first
+                if (delayedRetryTimer != null) {
+                    delayedRetryTimer.cancel(true);
+                }
+                // force reset retryAwaiting to prevent the handler to trigger retry unnecessarily
+                retryAwaiting.set(false);
+
+                userFunction.timeout(resultHandler.inputRecord.getValue(), this);
+            }
         }
 
         @Override
         public void complete(Collection<OUT> results) {
             Preconditions.checkNotNull(
                     results, "Results must not be null, use empty collection to emit nothing");
             if (!retryDisabledOnFinish.get() && resultHandler.inputRecord.isRecord()) {
-                // ignore repeated call(s)
-                if (!retryAwaiting.compareAndSet(false, true)) {
-                    return;
-                }
-
                 processRetryInMailBox(results, null);
             } else {
+                if (delayedRetryTimer != null) {

Review Comment:
   yes, I'll add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org