You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/08/10 06:05:17 UTC

[flink] 03/04: [hotfix][runtime] Do last attempt without successfully canceling the retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a2fc5ef34f563c906473cbe4bdd79a9d7eec48e
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Aug 9 17:53:04 2022 +0800

    [hotfix][runtime] Do last attempt without successfully canceling the retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator
    
    It is hard to reproduce this in runtime tests, but occasionally happens in AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry of FLINK-28849. It's better to add a separate test in runtime.
    
    This closes #20482
---
 .../flink/streaming/api/operators/async/AsyncWaitOperator.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index ba3f1c3ad87..0d88943b21e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -359,10 +359,10 @@ public class AsyncWaitOperator<IN, OUT>
             if (inFlightDelayRetryHandlers.size() > 0) {
                 for (RetryableResultHandlerDelegator delegator : inFlightDelayRetryHandlers) {
                     assert delegator.delayedRetryTimer != null;
-                    // cancel retry timer, cancel failure means retry action already being executed
-                    if (delegator.delayedRetryTimer.cancel(true)) {
-                        tryOnce(delegator);
-                    }
+                    // fire an attempt intermediately not rely on successfully canceling the retry
+                    // timer for two reasons: 1. cancel retry timer can not be 100% safe 2. there's
+                    // protection for repeated retries
+                    tryOnce(delegator);
                 }
                 inFlightDelayRetryHandlers.clear();
             }