You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/11/17 09:54:09 UTC

[flink] branch master updated: [FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d426489c9e5 [FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes
d426489c9e5 is described below

commit d426489c9e5c634e2eec8fde6c71356700b7d4b2
Author: Eric Xiao <er...@shopify.com>
AuthorDate: Sun Oct 16 18:12:27 2022 -0400

    [FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes
    
    This closes #21077.
---
 .../docs/dev/datastream/operators/asyncio.md       |  10 +-
 .../docs/dev/datastream/operators/asyncio.md       |  10 +-
 .../api/operators/async/AsyncWaitOperator.java     |   2 +-
 .../api/scala/async/AsyncRetryStrategies.scala     | 130 +++++++++++++++++++++
 .../api/scala/async/RetryPredicates.scala          |  38 ++++++
 .../api/scala/AsyncDataStreamITCase.scala          |  42 ++-----
 6 files changed, 194 insertions(+), 38 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
index 6dbddab5824..0caa5e8ce3b 100644
--- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
@@ -125,8 +125,8 @@ DataStream<Tuple2<String, String>> resultStream =
 // 通过工具类创建一个异步重试策略, 或用户实现自定义的策略
 AsyncRetryStrategy asyncRetryStrategy =
 	new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
-		.retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
-		.retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+		.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+		.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
 		.build();
 
 // 应用异步 I/O 转换操作并启用重试
@@ -170,7 +170,11 @@ val resultStream: DataStream[(String, String)] =
 
 // 或 应用异步 I/O 转换操作并启用重试
 // 创建一个异步重试策略
-val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
+val asyncRetryStrategy: AsyncRetryStrategy[String] =
+  new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
+    .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+    .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+    .build();
 
 // 应用异步 I/O 转换操作并启用重试
 val resultStream: DataStream[(String, String)] =
diff --git a/docs/content/docs/dev/datastream/operators/asyncio.md b/docs/content/docs/dev/datastream/operators/asyncio.md
index a6d218fff55..631c83eaa1b 100644
--- a/docs/content/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content/docs/dev/datastream/operators/asyncio.md
@@ -140,8 +140,8 @@ DataStream<Tuple2<String, String>> resultStream =
 // create an async retry strategy via utility class or a user defined strategy
 AsyncRetryStrategy asyncRetryStrategy =
 	new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
-		.retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
-		.retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+		.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+		.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
 		.build();
 
 // apply the async I/O transformation with retry
@@ -185,7 +185,11 @@ val resultStream: DataStream[(String, String)] =
 
 // apply the async I/O transformation with retry
 // create an AsyncRetryStrategy
-val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
+val asyncRetryStrategy: AsyncRetryStrategy[String] =
+  new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
+    .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+    .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+    .build();
 
 // apply the async I/O transformation with retry
 val resultStream: DataStream[(String, String)] =
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 0d88943b21e..7d2685f7828 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -430,7 +430,7 @@ public class AsyncWaitOperator<IN, OUT>
         /**
          * A guard similar to ResultHandler#complete to prevent repeated complete calls from
          * ill-written AsyncFunction. This flag indicates a retry is in-flight, new retry will be
-         * rejected if it is ture, and it will be reset to false after the retry fired.
+         * rejected if it is true, and it will be reset to false after the retry fired.
          */
         private final AtomicBoolean retryAwaiting = new AtomicBoolean(false);
 
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala
new file mode 100644
index 00000000000..2e317b574bc
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.streaming.api.functions.async
+import org.apache.flink.streaming.api.functions.async.{AsyncRetryStrategy => JAsyncRetryStrategy}
+import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => JAsyncRetryStrategies}
+
+import java.{util => ju}
+import java.util.function.Predicate
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+object AsyncRetryStrategies {
+
+  final private class JavaToScalaRetryStrategy[T](retryStrategy: JAsyncRetryStrategy[T])
+    extends AsyncRetryStrategy[T] {
+
+    /** @return whether the next attempt can happen */
+    override def canRetry(currentAttempts: Int): Boolean = retryStrategy.canRetry(currentAttempts)
+
+    /** @return the delay time of next attempt */
+    override def getBackoffTimeMillis(currentAttempts: Int): Long =
+      retryStrategy.getBackoffTimeMillis(currentAttempts)
+
+    /** @return the defined retry predicate {@link AsyncRetryPredicate} */
+    override def getRetryPredicate(): AsyncRetryPredicate[T] = new AsyncRetryPredicate[T] {
+      val retryPredicates: async.AsyncRetryPredicate[T] = retryStrategy.getRetryPredicate
+
+      /**
+       * An Optional Java {@Predicate } that defines a condition on asyncFunction's future result
+       * which will trigger a later reattempt operation, will be called before user's
+       * ResultFuture#complete.
+       *
+       * @return
+       *   predicate on result of {@link ju.Collection}
+       */
+      override def resultPredicate: Option[Predicate[ju.Collection[T]]] = Option(
+        retryPredicates.resultPredicate.orElse(null))
+
+      /**
+       * An Optional Java {@Predicate } that defines a condition on asyncFunction's exception which
+       * will trigger a later reattempt operation, will be called before user's
+       * ResultFuture#completeExceptionally.
+       *
+       * @return
+       *   predicate on {@link Throwable} exception
+       */
+      override def exceptionPredicate: Option[Predicate[Throwable]] = Option(
+        retryPredicates.exceptionPredicate.orElse(null))
+    }
+  }
+
+  /**
+   * FixedDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with fixed delay
+   * retrying behaviours.
+   */
+  @PublicEvolving
+  @SerialVersionUID(1L)
+  class FixedDelayRetryStrategyBuilder[OUT](
+      private val maxAttempts: Int,
+      private val backoffTimeMillis: Long
+  ) {
+    private var builder =
+      new JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[OUT](maxAttempts, backoffTimeMillis)
+
+    def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]])
+        : FixedDelayRetryStrategyBuilder[OUT] = {
+      this.builder = this.builder.ifResult(resultRetryPredicate)
+      this
+    }
+
+    def ifException(
+        exceptionRetryPredicate: Predicate[Throwable]): FixedDelayRetryStrategyBuilder[OUT] = {
+      this.builder = this.builder.ifException(exceptionRetryPredicate)
+      this
+    }
+
+    def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build())
+  }
+
+  /**
+   * ExponentialBackoffDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with
+   * exponential delay retrying behaviours.
+   */
+  @PublicEvolving
+  @SerialVersionUID(1L)
+  class ExponentialBackoffDelayRetryStrategyBuilder[OUT](
+      private val maxAttempts: Int,
+      private val initialDelay: Long,
+      private val maxRetryDelay: Long,
+      private val multiplier: Double
+  ) {
+    private var builder =
+      new JAsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder[OUT](
+        maxAttempts,
+        initialDelay,
+        maxRetryDelay,
+        multiplier)
+
+    def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]])
+        : ExponentialBackoffDelayRetryStrategyBuilder[OUT] = {
+      this.builder = this.builder.ifResult(resultRetryPredicate)
+      this
+    }
+
+    def ifException(exceptionRetryPredicate: Predicate[Throwable])
+        : ExponentialBackoffDelayRetryStrategyBuilder[OUT] = {
+      this.builder = this.builder.ifException(exceptionRetryPredicate)
+      this
+    }
+
+    def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build())
+  }
+}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala
new file mode 100644
index 00000000000..56a37eaca12
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.streaming.util.retryable.{RetryPredicates => JRetryPredicates}
+
+import java.util
+import java.util.function.Predicate
+
+/** Utility class to create concrete retry predicates. */
+@PublicEvolving
+object RetryPredicates {
+
+  /** A predicate matches empty result which means an empty {@link Collection}. */
+  def EMPTY_RESULT_PREDICATE[T]: Predicate[util.Collection[T]] =
+    JRetryPredicates.EMPTY_RESULT_PREDICATE.asInstanceOf[Predicate[util.Collection[T]]]
+
+  /** A predicate matches any exception which means a non-null{@link Throwable}. */
+  def HAS_EXCEPTION_PREDICATE: Predicate[Throwable] =
+    JRetryPredicates.HAS_EXCEPTION_PREDICATE.asInstanceOf[Predicate[Throwable]]
+
+}
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
index f09e499be55..edd53b660d7 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
-import org.apache.flink.streaming.api.scala.async.{AsyncRetryPredicate, AsyncRetryStrategy, ResultFuture, RichAsyncFunction}
+import org.apache.flink.streaming.api.scala.async.{AsyncRetryStrategies, ResultFuture, RetryPredicates, RichAsyncFunction}
 import org.apache.flink.test.util.AbstractTestBase
 
 import org.junit.Assert._
@@ -31,7 +31,6 @@ import org.junit.runners.Parameterized.Parameters
 
 import java.{util => ju}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.function.Predicate
 
 import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
@@ -174,7 +173,11 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
 
     val asyncFunction = new OddInputReturnEmptyAsyncFunc
 
-    val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
+    val asyncRetryStrategy =
+      new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 10)
+        .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE[Int])
+        .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+        .build()
 
     val timeout = 10000L
     val asyncMapped = if (ordered) {
@@ -196,33 +199,6 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
     executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 4, 6))
   }
 
-  private def createFixedRetryStrategy[OUT](
-      maxAttempts: Int,
-      fixedDelayMs: Long): AsyncRetryStrategy[OUT] = {
-    new AsyncRetryStrategy[OUT] {
-
-      override def canRetry(currentAttempts: Int): Boolean = {
-        currentAttempts <= maxAttempts
-      }
-
-      override def getBackoffTimeMillis(currentAttempts: Int): Long = fixedDelayMs
-
-      override def getRetryPredicate(): AsyncRetryPredicate[OUT] = {
-        new AsyncRetryPredicate[OUT] {
-          override def resultPredicate: Option[Predicate[ju.Collection[OUT]]] = {
-            Option(
-              new Predicate[ju.Collection[OUT]] {
-                override def test(t: ju.Collection[OUT]): Boolean = t.isEmpty
-              }
-            )
-          }
-
-          override def exceptionPredicate: Option[Predicate[Throwable]] = Option.empty
-        }
-      }
-    }
-  }
-
   @Test
   def testAsyncWaitWithRetryUsingAnonymousFunction(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -245,7 +221,8 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
       }
 
     val timeout = 10000L
-    val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
+    val asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 10)
+      .build()
 
     val asyncMapped = if (ordered) {
       AsyncDataStream.orderedWaitWithRetry(
@@ -283,6 +260,7 @@ class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
       resultFuture.complete(Seq(input * 2))
     }(ExecutionContext.global)
   }
+
   override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
     resultFuture.complete(Seq(input * 3))
     invokeLatch.countDown()
@@ -307,6 +285,7 @@ class AsyncFunctionWithoutTimeoutExpired extends RichAsyncFunction[Int, Int] {
       timeoutLatch.countDown()
     }(ExecutionContext.global)
   }
+
   override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
     // this sleeping helps reproducing race condition with cancellation
     Thread.sleep(10)
@@ -326,6 +305,7 @@ class MyRichAsyncFunction extends RichAsyncFunction[Int, Int] {
       resultFuture.complete(Seq(input * 2))
     }(ExecutionContext.global)
   }
+
   override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
     resultFuture.complete(Seq(input * 3))
   }