You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/11/17 09:54:09 UTC
[flink] branch master updated: [FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d426489c9e5 [FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes
d426489c9e5 is described below
commit d426489c9e5c634e2eec8fde6c71356700b7d4b2
Author: Eric Xiao <er...@shopify.com>
AuthorDate: Sun Oct 16 18:12:27 2022 -0400
[FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes
This closes #21077.
---
.../docs/dev/datastream/operators/asyncio.md | 10 +-
.../docs/dev/datastream/operators/asyncio.md | 10 +-
.../api/operators/async/AsyncWaitOperator.java | 2 +-
.../api/scala/async/AsyncRetryStrategies.scala | 130 +++++++++++++++++++++
.../api/scala/async/RetryPredicates.scala | 38 ++++++
.../api/scala/AsyncDataStreamITCase.scala | 42 ++-----
6 files changed, 194 insertions(+), 38 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
index 6dbddab5824..0caa5e8ce3b 100644
--- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
@@ -125,8 +125,8 @@ DataStream<Tuple2<String, String>> resultStream =
// 通过工具类创建一个异步重试策略, 或用户实现自定义的策略
AsyncRetryStrategy asyncRetryStrategy =
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
- .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
- .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+ .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+ .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
.build();
// 应用异步 I/O 转换操作并启用重试
@@ -170,7 +170,11 @@ val resultStream: DataStream[(String, String)] =
// 或 应用异步 I/O 转换操作并启用重试
// 创建一个异步重试策略
-val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
+val asyncRetryStrategy: AsyncRetryStrategy[String] =
+ new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
+ .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+ .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+ .build();
// 应用异步 I/O 转换操作并启用重试
val resultStream: DataStream[(String, String)] =
diff --git a/docs/content/docs/dev/datastream/operators/asyncio.md b/docs/content/docs/dev/datastream/operators/asyncio.md
index a6d218fff55..631c83eaa1b 100644
--- a/docs/content/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content/docs/dev/datastream/operators/asyncio.md
@@ -140,8 +140,8 @@ DataStream<Tuple2<String, String>> resultStream =
// create an async retry strategy via utility class or a user defined strategy
AsyncRetryStrategy asyncRetryStrategy =
new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
- .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
- .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+ .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+ .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
.build();
// apply the async I/O transformation with retry
@@ -185,7 +185,11 @@ val resultStream: DataStream[(String, String)] =
// apply the async I/O transformation with retry
// create an AsyncRetryStrategy
-val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
+val asyncRetryStrategy: AsyncRetryStrategy[String] =
+ new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
+ .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+ .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+ .build();
// apply the async I/O transformation with retry
val resultStream: DataStream[(String, String)] =
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 0d88943b21e..7d2685f7828 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -430,7 +430,7 @@ public class AsyncWaitOperator<IN, OUT>
/**
* A guard similar to ResultHandler#complete to prevent repeated complete calls from
* ill-written AsyncFunction. This flag indicates a retry is in-flight, new retry will be
- * rejected if it is ture, and it will be reset to false after the retry fired.
+ * rejected if it is true, and it will be reset to false after the retry fired.
*/
private final AtomicBoolean retryAwaiting = new AtomicBoolean(false);
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala
new file mode 100644
index 00000000000..2e317b574bc
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.streaming.api.functions.async
+import org.apache.flink.streaming.api.functions.async.{AsyncRetryStrategy => JAsyncRetryStrategy}
+import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => JAsyncRetryStrategies}
+
+import java.{util => ju}
+import java.util.function.Predicate
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+object AsyncRetryStrategies {
+
+ final private class JavaToScalaRetryStrategy[T](retryStrategy: JAsyncRetryStrategy[T])
+ extends AsyncRetryStrategy[T] {
+
+ /** @return whether the next attempt can happen */
+ override def canRetry(currentAttempts: Int): Boolean = retryStrategy.canRetry(currentAttempts)
+
+ /** @return the delay time of next attempt */
+ override def getBackoffTimeMillis(currentAttempts: Int): Long =
+ retryStrategy.getBackoffTimeMillis(currentAttempts)
+
+ /** @return the defined retry predicate {@link AsyncRetryPredicate} */
+ override def getRetryPredicate(): AsyncRetryPredicate[T] = new AsyncRetryPredicate[T] {
+ val retryPredicates: async.AsyncRetryPredicate[T] = retryStrategy.getRetryPredicate
+
+ /**
+ * An Optional Java {@Predicate } that defines a condition on asyncFunction's future result
+ * which will trigger a later reattempt operation, will be called before user's
+ * ResultFuture#complete.
+ *
+ * @return
+ * predicate on result of {@link ju.Collection}
+ */
+ override def resultPredicate: Option[Predicate[ju.Collection[T]]] = Option(
+ retryPredicates.resultPredicate.orElse(null))
+
+ /**
+ * An Optional Java {@Predicate } that defines a condition on asyncFunction's exception which
+ * will trigger a later reattempt operation, will be called before user's
+ * ResultFuture#completeExceptionally.
+ *
+ * @return
+ * predicate on {@link Throwable} exception
+ */
+ override def exceptionPredicate: Option[Predicate[Throwable]] = Option(
+ retryPredicates.exceptionPredicate.orElse(null))
+ }
+ }
+
+ /**
+ * FixedDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with fixed delay
+ * retrying behaviours.
+ */
+ @PublicEvolving
+ @SerialVersionUID(1L)
+ class FixedDelayRetryStrategyBuilder[OUT](
+ private val maxAttempts: Int,
+ private val backoffTimeMillis: Long
+ ) {
+ private var builder =
+ new JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[OUT](maxAttempts, backoffTimeMillis)
+
+ def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]])
+ : FixedDelayRetryStrategyBuilder[OUT] = {
+ this.builder = this.builder.ifResult(resultRetryPredicate)
+ this
+ }
+
+ def ifException(
+ exceptionRetryPredicate: Predicate[Throwable]): FixedDelayRetryStrategyBuilder[OUT] = {
+ this.builder = this.builder.ifException(exceptionRetryPredicate)
+ this
+ }
+
+ def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build())
+ }
+
+ /**
+ * ExponentialBackoffDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with
+ * exponential delay retrying behaviours.
+ */
+ @PublicEvolving
+ @SerialVersionUID(1L)
+ class ExponentialBackoffDelayRetryStrategyBuilder[OUT](
+ private val maxAttempts: Int,
+ private val initialDelay: Long,
+ private val maxRetryDelay: Long,
+ private val multiplier: Double
+ ) {
+ private var builder =
+ new JAsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder[OUT](
+ maxAttempts,
+ initialDelay,
+ maxRetryDelay,
+ multiplier)
+
+ def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]])
+ : ExponentialBackoffDelayRetryStrategyBuilder[OUT] = {
+ this.builder = this.builder.ifResult(resultRetryPredicate)
+ this
+ }
+
+ def ifException(exceptionRetryPredicate: Predicate[Throwable])
+ : ExponentialBackoffDelayRetryStrategyBuilder[OUT] = {
+ this.builder = this.builder.ifException(exceptionRetryPredicate)
+ this
+ }
+
+ def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build())
+ }
+}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala
new file mode 100644
index 00000000000..56a37eaca12
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.streaming.util.retryable.{RetryPredicates => JRetryPredicates}
+
+import java.util
+import java.util.function.Predicate
+
+/** Utility class to create concrete retry predicates. */
+@PublicEvolving
+object RetryPredicates {
+
+ /** A predicate matches empty result which means an empty {@link Collection}. */
+ def EMPTY_RESULT_PREDICATE[T]: Predicate[util.Collection[T]] =
+ JRetryPredicates.EMPTY_RESULT_PREDICATE.asInstanceOf[Predicate[util.Collection[T]]]
+
+ /** A predicate matches any exception which means a non-null{@link Throwable}. */
+ def HAS_EXCEPTION_PREDICATE: Predicate[Throwable] =
+ JRetryPredicates.HAS_EXCEPTION_PREDICATE.asInstanceOf[Predicate[Throwable]]
+
+}
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
index f09e499be55..edd53b660d7 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
-import org.apache.flink.streaming.api.scala.async.{AsyncRetryPredicate, AsyncRetryStrategy, ResultFuture, RichAsyncFunction}
+import org.apache.flink.streaming.api.scala.async.{AsyncRetryStrategies, ResultFuture, RetryPredicates, RichAsyncFunction}
import org.apache.flink.test.util.AbstractTestBase
import org.junit.Assert._
@@ -31,7 +31,6 @@ import org.junit.runners.Parameterized.Parameters
import java.{util => ju}
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.function.Predicate
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
@@ -174,7 +173,11 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
val asyncFunction = new OddInputReturnEmptyAsyncFunc
- val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
+ val asyncRetryStrategy =
+ new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 10)
+ .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE[Int])
+ .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+ .build()
val timeout = 10000L
val asyncMapped = if (ordered) {
@@ -196,33 +199,6 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 4, 6))
}
- private def createFixedRetryStrategy[OUT](
- maxAttempts: Int,
- fixedDelayMs: Long): AsyncRetryStrategy[OUT] = {
- new AsyncRetryStrategy[OUT] {
-
- override def canRetry(currentAttempts: Int): Boolean = {
- currentAttempts <= maxAttempts
- }
-
- override def getBackoffTimeMillis(currentAttempts: Int): Long = fixedDelayMs
-
- override def getRetryPredicate(): AsyncRetryPredicate[OUT] = {
- new AsyncRetryPredicate[OUT] {
- override def resultPredicate: Option[Predicate[ju.Collection[OUT]]] = {
- Option(
- new Predicate[ju.Collection[OUT]] {
- override def test(t: ju.Collection[OUT]): Boolean = t.isEmpty
- }
- )
- }
-
- override def exceptionPredicate: Option[Predicate[Throwable]] = Option.empty
- }
- }
- }
- }
-
@Test
def testAsyncWaitWithRetryUsingAnonymousFunction(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -245,7 +221,8 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase {
}
val timeout = 10000L
- val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
+ val asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 10)
+ .build()
val asyncMapped = if (ordered) {
AsyncDataStream.orderedWaitWithRetry(
@@ -283,6 +260,7 @@ class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
resultFuture.complete(Seq(input * 2))
}(ExecutionContext.global)
}
+
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
resultFuture.complete(Seq(input * 3))
invokeLatch.countDown()
@@ -307,6 +285,7 @@ class AsyncFunctionWithoutTimeoutExpired extends RichAsyncFunction[Int, Int] {
timeoutLatch.countDown()
}(ExecutionContext.global)
}
+
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
// this sleeping helps reproducing race condition with cancellation
Thread.sleep(10)
@@ -326,6 +305,7 @@ class MyRichAsyncFunction extends RichAsyncFunction[Int, Int] {
resultFuture.complete(Seq(input * 2))
}(ExecutionContext.global)
}
+
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
resultFuture.complete(Seq(input * 3))
}