You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/16 02:33:25 UTC

[GitHub] [flink] lincoln-lil opened a new pull request, #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

lincoln-lil opened a new pull request, #19983:
URL: https://github.com/apache/flink/pull/19983

   ## What is the purpose of the change
   This is the implementation of [FLIP-232](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)
   
   ## Brief change log
   * add new interface and api proposed in the flip to support retry
   * add a RetryableResultHandlerDelegator to  AsyncWaitOperator to support retry, if retry disabled, all the behaviors keep the same as previous version which does not support retry
   * add tests to ensure retry logic is covered under both ordered and unordered mode
   
   ## Verifying this change
   AsyncWaitOperatorTest &  AsyncDataStreamITCase
   
   ## Does this pull request potentially affect one of the following parts:
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
     - The serializers: (no )
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r921759245


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {

Review Comment:
   nit: could be private



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +193,20 @@ public void setup(
             default:
                 throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
         }
+        if (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {
+            this.retryResultPredicate =
+                    asyncRetryStrategy
+                            .getRetryPredicate()
+                            .resultPredicate()
+                            .orElse(ignore -> false);
+        }
+        if (asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent()) {

Review Comment:
   The `if` here is wrong.
   
   The `orElse` gives the default predict if user not set that, but with the `if`, in this case the predict will remain `null`. 
   
   This also could be wrapped by if (retryEnabled) {... }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r912973469


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(

Review Comment:
   done



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(

Review Comment:
   done



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {

Review Comment:
   Agree with you that an inner class here is better



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;

Review Comment:
   Already marked as final.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(
+                    maxAttempts, backoffTimeMillis, resultPredicate, exceptionPredicate);
+        }
+    }
+
+    /** ExponentialBackoffDelayRetryStrategy. */
+    public static class ExponentialBackoffDelayRetryStrategy<OUT>
+            implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+
+        private final long initialDelay;
+        private final long maxRetryDelay;
+
+        private final double multiplier;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private long lastRetryDelay;
+
+        public ExponentialBackoffDelayRetryStrategy(
+                int maxAttempts,
+                long initialDelay,
+                long maxRetryDelay,
+                double multiplier,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.initialDelay = initialDelay;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+            this.lastRetryDelay = initialDelay;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            long backoff = Math.min((long) (lastRetryDelay * multiplier), maxRetryDelay);

Review Comment:
   It's my oversight here, the `initialDelay` should take effect for the first retry.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(
+                    maxAttempts, backoffTimeMillis, resultPredicate, exceptionPredicate);
+        }
+    }
+
+    /** ExponentialBackoffDelayRetryStrategy. */
+    public static class ExponentialBackoffDelayRetryStrategy<OUT>
+            implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+
+        private final long initialDelay;
+        private final long maxRetryDelay;
+
+        private final double multiplier;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private long lastRetryDelay;
+
+        public ExponentialBackoffDelayRetryStrategy(
+                int maxAttempts,
+                long initialDelay,
+                long maxRetryDelay,
+                double multiplier,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.initialDelay = initialDelay;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+            this.lastRetryDelay = initialDelay;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            long backoff = Math.min((long) (lastRetryDelay * multiplier), maxRetryDelay);
+            this.lastRetryDelay = backoff;
+            return backoff;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+    }
+
+    /**
+     * ExponentialBackoffDelayRetryStrategyBuilder for building a
+     * ExponentialBackoffDelayRetryStrategy.
+     */
+    public static class ExponentialBackoffDelayRetryStrategyBuilder<OUT> {
+        private final int maxAttempts;
+        private final long initialDelay;
+        private final long maxRetryDelay;
+        private final double multiplier;
+
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public ExponentialBackoffDelayRetryStrategyBuilder(
+                int maxAttempts, long initialDelay, long maxRetryDelay, double multiplier) {
+            this.maxAttempts = maxAttempts;
+            this.initialDelay = initialDelay;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+        }
+
+        public ExponentialBackoffDelayRetryStrategyBuilder ifResult(

Review Comment:
   done



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/RetryPredicates.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete retry predicates. */
+public class RetryPredicates {
+
+    /** A predicate matches empty result which means an empty {@link Collection}. */
+    public static final EmptyResultPredicate EMPTY_RESULT_PREDICATE = new EmptyResultPredicate();
+
+    /** A predicate matches any exception which means a non-null{@link Throwable}. */
+    public static final HasExceptionPredicate HAS_EXCEPTION_PREDICATE = new HasExceptionPredicate();
+
+    /**
+     * Create a predicate on given exception type.

Review Comment:
   done



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(
+                    maxAttempts, backoffTimeMillis, resultPredicate, exceptionPredicate);
+        }
+    }
+
+    /** ExponentialBackoffDelayRetryStrategy. */
+    public static class ExponentialBackoffDelayRetryStrategy<OUT>
+            implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+
+        private final long initialDelay;

Review Comment:
   You're right, `initialDelay` can be omitted here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r907988175


##########
docs/content.zh/docs/dev/datastream/operators/asyncio.md:
##########
@@ -30,6 +30,8 @@ under the License.
 对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。
 
 提示:这篇文档 [FLIP-12: 异步 I/O 的设计和实现](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673)介绍了关于设计和实现异步 I/O 功能的细节。
+对于新增的重试支持涉及和实现细节可以参考[FLIP-232: 为 DataStream API 异步 I/O 操作增加重试支持](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)

Review Comment:
   `对于新增的重试支持的实现细节可以参考` ? 
   Add period at the end of sentence.



##########
docs/content/docs/dev/datastream/operators/asyncio.md:
##########
@@ -32,7 +32,8 @@ event-driven programming may be useful preparation.
 
 Note: Details about the design and implementation of the asynchronous I/O utility can be found in the proposal and design document
 [FLIP-12: Asynchronous I/O Design and Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).
-
+Details about the new retry support can be found in document
+[FLIP-232: Add Retry Support For Async I/O In DataStream API](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)。

Review Comment:
   Use English period. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -66,7 +70,29 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
             long timeout,
             int bufSize,
             OutputMode mode) {
+        return addOperator(in, func, timeout, bufSize, mode, NO_RETRY_STRATEGY);
+    }
 
+    /**
+     * Add an AsyncWaitOperator.

Review Comment:
   Following the new customs, the method comments usually use third person singular, namely `Adds ...`. 
   
   And might also the same with other existing methods and newly added methods. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -66,7 +70,29 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
             long timeout,
             int bufSize,
             OutputMode mode) {
+        return addOperator(in, func, timeout, bufSize, mode, NO_RETRY_STRATEGY);

Review Comment:
   I tend to we directly remove this version of `addOperator`, since this is a private method and it seems no much need to keep the duplication to only avoid parameters for the following four calls. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
         return addOperator(
                 in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
     }
+
+    // ======= retryable ========

Review Comment:
   I tend to we might not need this line of comment~



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
         return addOperator(
                 in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
     }
+
+    // ======= retryable ========
+
+    /**
+     * Add an AsyncWaitOperator with an AsyncRetryStrategy to support retry of AsyncFunction. The
+     * order of output stream records may be reordered.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            AsyncRetryStrategy asyncRetryStrategy) {
+        Preconditions.checkArgument(

Review Comment:
   Perhaps we could move the check to the `addOperator` for unification? 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {

Review Comment:
   Should we also have some checks to avoid users call complete wrongly? Like users call complete multiple times in a single attempt, like the same done in the `ResultFuture`. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
         return addOperator(
                 in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
     }
+
+    // ======= retryable ========
+
+    /**
+     * Add an AsyncWaitOperator with an AsyncRetryStrategy to support retry of AsyncFunction. The
+     * order of output stream records may be reordered.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            AsyncRetryStrategy asyncRetryStrategy) {

Review Comment:
   `AsyncRetryStrategy` -> `AsyncRetryStrategy<OUT>`
   
   The same to the following method parameters. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (!resultHandler.completed.get() && ifRetryOrCompleted(results, null)) {
+                    return;
+                }
+            }
+            resultHandler.complete(results);
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (ifRetryOrCompleted(null, error)) {
+                    return;
+                }
+            }
+            resultHandler.completeExceptionally(error);
+        }
+
+        private void cleanupLastRetryInMailbox() {
+            if (retryInFlight.compareAndSet(true, false)) {
+                assert incompleteDelayRetryHandlers.contains(this);
+                // remove from delayed retry queue
+                incompleteDelayRetryHandlers.remove(this);
+                delayedRetryTimer = null;
+            }
+        }
+
+        private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable error) {

Review Comment:
   Is it possible we also move `ifRetryOrCompleted` into the main thread ? This method also does not seems to have slow logic, if so perhaps we might be able to simplify logic like `delayedRetryAvailable`. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record) throws Exception {
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
-        final ResultHandler resultHandler = new ResultHandler(element, entry);
+        final RetryableResultHandlerDelegator resultHandler =

Review Comment:
   Should we directly classify the two cases according to `retryEnabled`? 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (!resultHandler.completed.get() && ifRetryOrCompleted(results, null)) {
+                    return;
+                }
+            }
+            resultHandler.complete(results);
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (ifRetryOrCompleted(null, error)) {
+                    return;
+                }
+            }
+            resultHandler.completeExceptionally(error);
+        }
+
+        private void cleanupLastRetryInMailbox() {
+            if (retryInFlight.compareAndSet(true, false)) {
+                assert incompleteDelayRetryHandlers.contains(this);
+                // remove from delayed retry queue
+                incompleteDelayRetryHandlers.remove(this);
+                delayedRetryTimer = null;
+            }
+        }
+
+        private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable error) {
+            if (delayedRetryAvailable.get() && resultHandler.inputRecord.isRecord()) {
+                boolean satisfy = false;
+                if (null != results && null != retryResultPredicate) {
+                    satisfy = (satisfy || retryResultPredicate.test(results));
+                }
+                if (null != error && null != retryExceptionPredicate) {
+                    satisfy = (satisfy || retryExceptionPredicate.test(error));
+                }
+                if (satisfy) {
+                    if (asyncRetryStrategy.canRetry(getCurrentAttempts())) {
+                        long nextBackoffTimeMillis =
+                                asyncRetryStrategy.getBackoffTimeMillis(getCurrentAttempts());
+                        setBackoffTimeMillis(nextBackoffTimeMillis);
+                        if (delayedRetryAvailable.get()) {
+                            // timer thread will finally dispatch the task to mailbox executor
+                            mailboxExecutor.submit(
+                                    () -> trySubmitRetryInMailboxOrComplete(this, results, error),
+                                    "delayed retry or give up retry");
+                            return true;
+                        }
+                    }
+                }
+            }
+            return false;
+        }
+
+        private void trySubmitRetryInMailboxOrComplete(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            if (delayedRetryAvailable.get()) {
+                incompleteDelayRetryHandlers.add(resultHandlerDelegator);
+                retryInFlight.set(true);
+                final long delayedRetry =
+                        resultHandlerDelegator.getBackoffTimeMillis()
+                                + getProcessingTimeService().getCurrentProcessingTime();
+                delayedRetryTimer =
+                        processingTimeService.registerTimer(
+                                delayedRetry, timestamp -> doRetry(resultHandlerDelegator));
+            } else {
+                // give up retry and complete immediately
+                if (null != results) {
+                    resultHandler.complete(results);
+                } else {
+                    resultHandler.completeExceptionally(error);
+                }
+            }
+        }
+
+        public long getBackoffTimeMillis() {

Review Comment:
   We might could remove some of the getter / setter since they are only used in this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19983:
URL: https://github.com/apache/flink/pull/19983#issuecomment-1157166737

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7cac33be1271b73b8f28069ee17db53a790bef29",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7cac33be1271b73b8f28069ee17db53a790bef29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7cac33be1271b73b8f28069ee17db53a790bef29 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908429783


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (!resultHandler.completed.get() && ifRetryOrCompleted(results, null)) {
+                    return;
+                }
+            }
+            resultHandler.complete(results);
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (ifRetryOrCompleted(null, error)) {
+                    return;
+                }
+            }
+            resultHandler.completeExceptionally(error);
+        }
+
+        private void cleanupLastRetryInMailbox() {
+            if (retryInFlight.compareAndSet(true, false)) {
+                assert incompleteDelayRetryHandlers.contains(this);
+                // remove from delayed retry queue
+                incompleteDelayRetryHandlers.remove(this);
+                delayedRetryTimer = null;
+            }
+        }
+
+        private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable error) {

Review Comment:
   I'm not sure that all users' retry predicate will not slow down the main thread here, so I tend to keep it stays here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r919556536


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -128,6 +156,15 @@ public AsyncWaitOperator(
 
         this.timeout = timeout;
 
+        this.asyncRetryStrategy = asyncRetryStrategy;
+
+        this.retryEnabled =
+                // construct from utility class
+                asyncRetryStrategy != NO_RETRY_STRATEGY
+                        // construct from api
+                        || asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()

Review Comment:
   Should it be 
   
   `asyncRetryStrategy != NO_RETRY_STRATEGY` && (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent() ||asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent())` ?
   
   otherwise if user construct a new strategy return empty for both methods, it should be indeed not enabled?
   
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +190,14 @@ public void setup(
             default:
                 throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
         }
+        if (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {

Review Comment:
   I tend we do not leave retryResultPredicate / retryExceptionPredicate to be null to avoid possible errors. It could be 
   
   ```
           retryResultPredicate = asyncRetryStrategy.getRetryPredicate().resultPredicate()
               .orElse(ignored -> false);
           retryExceptionPredicate = asyncRetryStrategy.getRetryPredicate().exceptionPredicate()
               .orElse(ignored -> false);
   ```
   
   Then we could also simplify the test to 
   ```
   boolean satisfy =
           (null != results && retryResultPredicate.test(results))
                   || (null != error && retryExceptionPredicate.test(error));
   ```
   
   
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;

Review Comment:
   nit: it seems a bit non-intuitive for the return here. Could we remove the nested iteration by 
   
   ```
   if (satisfy
                       && asyncRetryStrategy.canRetry(currentAttempts)
                       && delayedRetryAvailable.get()) {
   ...
   }
   
   ``



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired

Review Comment:
   Is it necessary to remove / add it each time? Perhaps we could only add it once on the first time and remove it if not retry? 
   
   Also it seems we indeed do not remove the handle if the retry stopped now?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {

Review Comment:
   It seems a bit non-intuitive here since when completed `retryInFlight` seems should be `false`. Could we rename the variable to something like `lastRetryCompleted` ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record) throws Exception {
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
-        final ResultHandler resultHandler = new ResultHandler(element, entry);
+        final RetryableResultHandlerDelegator resultHandler =

Review Comment:
   I still think we'd better distinguish the two cases since it is in critical path (record level operations). For each record, we'll pay the cost of method calls and atomic variables reads, thus I think we should still use different handles.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -109,11 +130,18 @@
     /** Whether object reuse has been enabled or disabled. */
     private transient boolean isObjectReuseEnabled;
 
+    private transient Predicate<Collection<OUT>> retryResultPredicate;
+
+    private transient Predicate<Throwable> retryExceptionPredicate;
+
+    private transient AtomicBoolean delayedRetryAvailable;

Review Comment:
   Might add some comment for this flag?
   
   Also, if possible I'm a bit tend to rename the variable to `retryDislabledOnFinish` and reverse the judgement to make it more visible. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)

Review Comment:
   This could be moved into the `RetryableResultHandlerDelegator`. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");

Review Comment:
   It is not necessary to pass `this` as a separate parameter. The mix of accessing properties with `this` and `resultHandlerDelegator` in `processRetry` would complex the implementation. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;
+                    }
+                }
+            }
+            // retry unsatisfied, complete it
+            if (null != results) {
+                resultHandlerDelegator.resultHandler.complete(results);
+            } else {
+                resultHandlerDelegator.resultHandler.completeExceptionally(error);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {

Review Comment:
   nit: moved to right after method complete(...) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r919683662


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -109,11 +130,18 @@
     /** Whether object reuse has been enabled or disabled. */
     private transient boolean isObjectReuseEnabled;
 
+    private transient Predicate<Collection<OUT>> retryResultPredicate;
+
+    private transient Predicate<Throwable> retryExceptionPredicate;
+
+    private transient AtomicBoolean delayedRetryAvailable;

Review Comment:
   `retryDislabledOnFinish` seems a better choice!



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +190,14 @@ public void setup(
             default:
                 throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
         }
+        if (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {

Review Comment:
   This simplification is great!



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -128,6 +156,15 @@ public AsyncWaitOperator(
 
         this.timeout = timeout;
 
+        this.asyncRetryStrategy = asyncRetryStrategy;
+
+        this.retryEnabled =
+                // construct from utility class
+                asyncRetryStrategy != NO_RETRY_STRATEGY
+                        // construct from api
+                        || asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()

Review Comment:
   You're right, the top level '||' should be '&&'



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");

Review Comment:
   This is the debt of previous refactor, should be cleanup indeed.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {

Review Comment:
   I struggled for the variable name for a while,  how about naming it as `retryAwaitting` ?  It mainly indicates that it is waiting for a retry, but it is not triggered yet.
   The difference between `lastRetryCompleted` is it does not mean that the retryHandler is completed/finished, just indicates the status of waiting for retry. WDYT?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record) throws Exception {
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
-        final ResultHandler resultHandler = new ResultHandler(element, entry);
+        final RetryableResultHandlerDelegator resultHandler =

Review Comment:
   Performance perspective, I accept this approach. So the `retryEnabled` flag can be removed from `RetryableResultHandlerDelegator`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;
+                    }
+                }
+            }
+            // retry unsatisfied, complete it
+            if (null != results) {
+                resultHandlerDelegator.resultHandler.complete(results);
+            } else {
+                resultHandlerDelegator.resultHandler.completeExceptionally(error);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {

Review Comment:
   Yes, it looks more intuitive to let them together.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired

Review Comment:
   Do you mean that we add it since first attempt(in processElement) ?  I tried that, and give up for considering the possiblility of retry maybe relatively smaller against normal processing.
   But for the missing removal when retry finally complete, I'd like to change it like this:
   ```
   if (satisfy retry) {
      ....   
       if (currentAttempts == 1) {
       // add to incomplete retry handlers only for first time
           inFlightDelayRetryHandlers.add(this);
        }
   } else {
      // remove handle that has been tried from incomplete retry handlers
      if (currentAttempts > 1) {
            inFlightDelayRetryHandlers.remove(this);
      }
      // resultHandler complete...
   }
   ```
   
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;

Review Comment:
   make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r921778909


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {

Review Comment:
   ok



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +193,20 @@ public void setup(
             default:
                 throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
         }
+        if (asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {
+            this.retryResultPredicate =
+                    asyncRetryStrategy
+                            .getRetryPredicate()
+                            .resultPredicate()
+                            .orElse(ignore -> false);
+        }
+        if (asyncRetryStrategy.getRetryPredicate().exceptionPredicate().isPresent()) {

Review Comment:
   oh, my mistake here..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r909442128


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;

Review Comment:
   `maxAttempts` and `backoffTimeMillis` could be marked as final.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(
+                    maxAttempts, backoffTimeMillis, resultPredicate, exceptionPredicate);
+        }
+    }
+
+    /** ExponentialBackoffDelayRetryStrategy. */
+    public static class ExponentialBackoffDelayRetryStrategy<OUT>
+            implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+
+        private final long initialDelay;
+        private final long maxRetryDelay;
+
+        private final double multiplier;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private long lastRetryDelay;
+
+        public ExponentialBackoffDelayRetryStrategy(
+                int maxAttempts,
+                long initialDelay,
+                long maxRetryDelay,
+                double multiplier,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.initialDelay = initialDelay;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+            this.lastRetryDelay = initialDelay;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            long backoff = Math.min((long) (lastRetryDelay * multiplier), maxRetryDelay);
+            this.lastRetryDelay = backoff;
+            return backoff;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+    }
+
+    /**
+     * ExponentialBackoffDelayRetryStrategyBuilder for building a
+     * ExponentialBackoffDelayRetryStrategy.
+     */
+    public static class ExponentialBackoffDelayRetryStrategyBuilder<OUT> {
+        private final int maxAttempts;
+        private final long initialDelay;
+        private final long maxRetryDelay;
+        private final double multiplier;
+
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public ExponentialBackoffDelayRetryStrategyBuilder(
+                int maxAttempts, long initialDelay, long maxRetryDelay, double multiplier) {
+            this.maxAttempts = maxAttempts;
+            this.initialDelay = initialDelay;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+        }
+
+        public ExponentialBackoffDelayRetryStrategyBuilder ifResult(

Review Comment:
   Similarly `ExponentialBackoffDelayRetryStrategyBuilder` -> `ExponentialBackoffDelayRetryStrategyBuilder<OUT>`. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(

Review Comment:
   `FixedDelayRetryStrategyBuilder` -> `FixedDelayRetryStrategyBuilder<OUT>`, similarly for the remaining ones



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(

Review Comment:
   `FixedDelayRetryStrategy` -> FixedDelayRetryStrategy<OUT>, also for the return type.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {

Review Comment:
   Might be simplified with `return Optional.ofNullable(resultPredicate);`. The same to the other methods. 
   
   Also if not consider the possible changes to the API, I think we could extract a named inner `AsyncRetryPredicate` class that composes of two separate predicates. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/RetryPredicates.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete retry predicates. */
+public class RetryPredicates {
+
+    /** A predicate matches empty result which means an empty {@link Collection}. */
+    public static final EmptyResultPredicate EMPTY_RESULT_PREDICATE = new EmptyResultPredicate();
+
+    /** A predicate matches any exception which means a non-null{@link Throwable}. */
+    public static final HasExceptionPredicate HAS_EXCEPTION_PREDICATE = new HasExceptionPredicate();
+
+    /**
+     * Create a predicate on given exception type.

Review Comment:
   `Create` -> `Creates`. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(
+                    maxAttempts, backoffTimeMillis, resultPredicate, exceptionPredicate);
+        }
+    }
+
+    /** ExponentialBackoffDelayRetryStrategy. */
+    public static class ExponentialBackoffDelayRetryStrategy<OUT>
+            implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+
+        private final long initialDelay;
+        private final long maxRetryDelay;
+
+        private final double multiplier;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private long lastRetryDelay;
+
+        public ExponentialBackoffDelayRetryStrategy(
+                int maxAttempts,
+                long initialDelay,
+                long maxRetryDelay,
+                double multiplier,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.initialDelay = initialDelay;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+            this.lastRetryDelay = initialDelay;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            long backoff = Math.min((long) (lastRetryDelay * multiplier), maxRetryDelay);

Review Comment:
   Should the delay for the first time be the same with `initialDelay`? 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java:
##########
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    public static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new AsyncRetryPredicate() {
+                @Override
+                public Optional<Predicate<Collection>> resultPredicate() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    return Optional.empty();
+                }
+            };
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new AsyncRetryPredicate<OUT>() {
+                @Override
+                public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+                    if (null == resultPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(resultPredicate);
+                }
+
+                @Override
+                public Optional<Predicate<Throwable>> exceptionPredicate() {
+                    if (null == exceptionPredicate) {
+                        return Optional.empty();
+                    }
+                    return Optional.of(exceptionPredicate);
+                }
+            };
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy build() {
+            return new FixedDelayRetryStrategy(
+                    maxAttempts, backoffTimeMillis, resultPredicate, exceptionPredicate);
+        }
+    }
+
+    /** ExponentialBackoffDelayRetryStrategy. */
+    public static class ExponentialBackoffDelayRetryStrategy<OUT>
+            implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+
+        private final long initialDelay;

Review Comment:
   `initialDelay` is not necessary as a class variable? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908406445


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
         return addOperator(
                 in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
     }
+
+    // ======= retryable ========

Review Comment:
   Will remove it and also in scala.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r899684838


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/RetryPredicates.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete retry predicates. */
+public class RetryPredicates {
+
+    /** A predicate matches empty result which means an empty {@link Collection}. */
+    public static final EmptyResultPredicate EMPTY_RESULT_PREDICATE = new EmptyResultPredicate();
+
+    /** A predicate matches any exception which means a non-null{@link Throwable}. */
+    public static final HasExceptionPredicate HAS_EXCEPTION_PREDICATE = new HasExceptionPredicate();
+
+    /**
+     * Create a predicate on given exception type.
+     *
+     * @param exceptionClass
+     * @return predicate on exception type.
+     */
+    public static ExceptionTypePredicate createExceptionTypePredicate(
+            @Nonnull Class<? extends Throwable> exceptionClass) {
+        return new ExceptionTypePredicate(exceptionClass);
+    }
+
+    public static ExceptionTypePredicate exceptionTypePredicate(

Review Comment:
   Good catch! This candidate method is discarded, I'll remove it. Thanks for your reviewing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908405217


##########
docs/content.zh/docs/dev/datastream/operators/asyncio.md:
##########
@@ -30,6 +30,8 @@ under the License.
 对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。
 
 提示:这篇文档 [FLIP-12: 异步 I/O 的设计和实现](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673)介绍了关于设计和实现异步 I/O 功能的细节。
+对于新增的重试支持涉及和实现细节可以参考[FLIP-232: 为 DataStream API 异步 I/O 操作增加重试支持](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)

Review Comment:
   good catch!



##########
docs/content/docs/dev/datastream/operators/asyncio.md:
##########
@@ -32,7 +32,8 @@ event-driven programming may be useful preparation.
 
 Note: Details about the design and implementation of the asynchronous I/O utility can be found in the proposal and design document
 [FLIP-12: Asynchronous I/O Design and Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).
-
+Details about the new retry support can be found in document
+[FLIP-232: Add Retry Support For Async I/O In DataStream API](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)。

Review Comment:
   addressed.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -66,7 +70,29 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
             long timeout,
             int bufSize,
             OutputMode mode) {
+        return addOperator(in, func, timeout, bufSize, mode, NO_RETRY_STRATEGY);

Review Comment:
   Make sense to me, will delete it and use NO_RETRY_STRATEGY for the directly calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] gaoyunhaii closed pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API
URL: https://github.com/apache/flink/pull/19983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] chenzihao5 commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
chenzihao5 commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r898925689


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/RetryPredicates.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete retry predicates. */
+public class RetryPredicates {
+
+    /** A predicate matches empty result which means an empty {@link Collection}. */
+    public static final EmptyResultPredicate EMPTY_RESULT_PREDICATE = new EmptyResultPredicate();
+
+    /** A predicate matches any exception which means a non-null{@link Throwable}. */
+    public static final HasExceptionPredicate HAS_EXCEPTION_PREDICATE = new HasExceptionPredicate();
+
+    /**
+     * Create a predicate on given exception type.
+     *
+     * @param exceptionClass
+     * @return predicate on exception type.
+     */
+    public static ExceptionTypePredicate createExceptionTypePredicate(
+            @Nonnull Class<? extends Throwable> exceptionClass) {
+        return new ExceptionTypePredicate(exceptionClass);
+    }
+
+    public static ExceptionTypePredicate exceptionTypePredicate(

Review Comment:
   Hi, this method is repeated with the above method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] ericxiao251 commented on pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
ericxiao251 commented on PR #19983:
URL: https://github.com/apache/flink/pull/19983#issuecomment-1256536753

   Hi @lincoln-lil, how would one use the retry strategies in their scala flink pipelines? we noticed that the retry strategy builder classes were only exposed in the java codebase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908419613


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
         return addOperator(
                 in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
     }
+
+    // ======= retryable ========
+
+    /**
+     * Add an AsyncWaitOperator with an AsyncRetryStrategy to support retry of AsyncFunction. The
+     * order of output stream records may be reordered.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            AsyncRetryStrategy asyncRetryStrategy) {
+        Preconditions.checkArgument(

Review Comment:
   The 'timeout'  parameter does not necessarily need to be greater than 0 when retry is not enabled, so move this check into the addOperator and add a condition 'asyncRetryStrategy != NO_RETRY_STRATEGY'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908408465


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -66,7 +70,29 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
             long timeout,
             int bufSize,
             OutputMode mode) {
+        return addOperator(in, func, timeout, bufSize, mode, NO_RETRY_STRATEGY);
+    }
 
+    /**
+     * Add an AsyncWaitOperator.

Review Comment:
   will make it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on PR #19983:
URL: https://github.com/apache/flink/pull/19983#issuecomment-1178431723

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908410799


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java:
##########
@@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
         return addOperator(
                 in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
     }
+
+    // ======= retryable ========
+
+    /**
+     * Add an AsyncWaitOperator with an AsyncRetryStrategy to support retry of AsyncFunction. The
+     * order of output stream records may be reordered.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            AsyncRetryStrategy asyncRetryStrategy) {

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908426718


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record) throws Exception {
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
-        final ResultHandler resultHandler = new ResultHandler(element, entry);
+        final RetryableResultHandlerDelegator resultHandler =

Review Comment:
   I've tried this way, but the code seems a little redundant:
   ```
           if (retryEnabled) {
               final RetryableResultHandlerDelegator resultHandler =
                       new RetryableResultHandlerDelegator(element, entry, getProcessingTimeService());
   
               // register a timeout for the entry
               assert timeout > 0L;
               resultHandler.registerTimeout(timeout);
   
               userFunction.asyncInvoke(element.getValue(), resultHandler);
           } else {
               final ResultHandler resultHandler = new ResultHandler(element, entry);
   
               // register a timeout for the entry if timeout is configured
               if (timeout > 0L) {
                   resultHandler.registerTimeout(getProcessingTimeService(), timeout);
               }
   
               userFunction.asyncInvoke(element.getValue(), resultHandler);
           }
   ```
   So use the unified RetryableResultHandlerDelegator to deal with two branches according to retryEnabled, and this function call 'should' be inlined by jvm.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #19983: [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908601534


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {

Review Comment:
   Yes, this should be done.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> {
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (!resultHandler.completed.get() && ifRetryOrCompleted(results, null)) {
+                    return;
+                }
+            }
+            resultHandler.complete(results);
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry");
+                if (ifRetryOrCompleted(null, error)) {
+                    return;
+                }
+            }
+            resultHandler.completeExceptionally(error);
+        }
+
+        private void cleanupLastRetryInMailbox() {
+            if (retryInFlight.compareAndSet(true, false)) {
+                assert incompleteDelayRetryHandlers.contains(this);
+                // remove from delayed retry queue
+                incompleteDelayRetryHandlers.remove(this);
+                delayedRetryTimer = null;
+            }
+        }
+
+        private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable error) {
+            if (delayedRetryAvailable.get() && resultHandler.inputRecord.isRecord()) {
+                boolean satisfy = false;
+                if (null != results && null != retryResultPredicate) {
+                    satisfy = (satisfy || retryResultPredicate.test(results));
+                }
+                if (null != error && null != retryExceptionPredicate) {
+                    satisfy = (satisfy || retryExceptionPredicate.test(error));
+                }
+                if (satisfy) {
+                    if (asyncRetryStrategy.canRetry(getCurrentAttempts())) {
+                        long nextBackoffTimeMillis =
+                                asyncRetryStrategy.getBackoffTimeMillis(getCurrentAttempts());
+                        setBackoffTimeMillis(nextBackoffTimeMillis);
+                        if (delayedRetryAvailable.get()) {
+                            // timer thread will finally dispatch the task to mailbox executor
+                            mailboxExecutor.submit(
+                                    () -> trySubmitRetryInMailboxOrComplete(this, results, error),
+                                    "delayed retry or give up retry");
+                            return true;
+                        }
+                    }
+                }
+            }
+            return false;
+        }
+
+        private void trySubmitRetryInMailboxOrComplete(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            if (delayedRetryAvailable.get()) {
+                incompleteDelayRetryHandlers.add(resultHandlerDelegator);
+                retryInFlight.set(true);
+                final long delayedRetry =
+                        resultHandlerDelegator.getBackoffTimeMillis()
+                                + getProcessingTimeService().getCurrentProcessingTime();
+                delayedRetryTimer =
+                        processingTimeService.registerTimer(
+                                delayedRetry, timestamp -> doRetry(resultHandlerDelegator));
+            } else {
+                // give up retry and complete immediately
+                if (null != results) {
+                    resultHandler.complete(results);
+                } else {
+                    resultHandler.completeExceptionally(error);
+                }
+            }
+        }
+
+        public long getBackoffTimeMillis() {

Review Comment:
   Make sense, will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org