You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/06/02 13:42:08 UTC
flink git commit: [FLINK-7789] Add handler for Async IO operator
timeouts
Repository: flink
Updated Branches:
refs/heads/master b03805501 -> 56df69046
[FLINK-7789] Add handler for Async IO operator timeouts
This closes #6091.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56df6904
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56df6904
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56df6904
Branch: refs/heads/master
Commit: 56df6904688642b1c8f9a287646c163dfae7edfd
Parents: b038055
Author: blueszheng <ki...@163.com>
Authored: Thu May 10 08:41:34 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Sat Jun 2 15:41:41 2018 +0200
----------------------------------------------------------------------
docs/dev/stream/operators/asyncio.md | 6 +
.../api/functions/async/AsyncFunction.java | 14 ++
.../api/operators/async/AsyncWaitOperator.java | 4 +-
.../operators/async/AsyncWaitOperatorTest.java | 51 +++++--
.../streaming/api/scala/AsyncDataStream.scala | 6 +
.../api/scala/async/AsyncFunction.scala | 14 ++
.../api/scala/AsyncDataStreamITCase.scala | 137 +++++++++++++++++++
7 files changed, 221 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/docs/dev/stream/operators/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md
index d27bf62..e92e7a9 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -190,6 +190,12 @@ The following two parameters control the asynchronous operations:
is exhausted.
+### Timeout Handling
+
+When an async I/O request times out, by default an exception is thrown and job is restarted.
+If you want to handle timeouts, you can override the `AsyncFunction#timeout` method.
+
+
### Order of Results
The concurrent requests issued by the `AsyncFunction` frequently complete in some undefined order, based on which request finished first.
http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
index 2ac218d..14a7a84 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import java.io.Serializable;
+import java.util.concurrent.TimeoutException;
/**
* A function to trigger Async I/O operation.
@@ -84,4 +85,17 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
* trigger fail-over process.
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
+
+ /**
+ * {@link AsyncFunction#asyncInvoke} timeout occurred.
+ * By default, the result future is exceptionally completed with a timeout exception.
+ *
+ * @param input element coming from an upstream task
+ * @param resultFuture to be completed with the result data
+ */
+ default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
+ resultFuture.completeExceptionally(
+ new TimeoutException("Async function call has timed out."));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a7b9438..2555c3b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -53,7 +53,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
@@ -209,8 +208,7 @@ public class AsyncWaitOperator<IN, OUT>
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
- streamRecordBufferEntry.completeExceptionally(
- new TimeoutException("Async function call has timed out."));
+ userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 17d654e..bd229bc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -76,9 +76,11 @@ import javax.annotation.Nonnull;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -215,6 +217,19 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
/**
+ * A special {@link LazyAsyncFunction} for timeout handling.
+ * Complete the result future with 3 times the input when the timeout occurred.
+ */
+ private static class IgnoreTimeoutLazyAsyncFunction extends LazyAsyncFunction {
+ private static final long serialVersionUID = 1428714561365346128L;
+
+ @Override
+ public void timeout(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
+ resultFuture.complete(Collections.singletonList(input * 3));
+ }
+ }
+
+ /**
* A {@link Comparator} to compare {@link StreamRecord} while sorting them.
*/
private class StreamRecordComparator implements Comparator<Object> {
@@ -601,11 +616,29 @@ public class AsyncWaitOperatorTest extends TestLogger {
}
@Test
- public void testAsyncTimeout() throws Exception {
+ public void testAsyncTimeoutFailure() throws Exception {
+ testAsyncTimeout(
+ new LazyAsyncFunction(),
+ Optional.of(TimeoutException.class),
+ new StreamRecord<>(2, 5L));
+ }
+
+ @Test
+ public void testAsyncTimeoutIgnore() throws Exception {
+ testAsyncTimeout(
+ new IgnoreTimeoutLazyAsyncFunction(),
+ Optional.empty(),
+ new StreamRecord<>(3, 0L),
+ new StreamRecord<>(2, 5L));
+ }
+
+ private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction,
+ Optional<Class<? extends Throwable>> expectedException,
+ StreamRecord<Integer>... expectedRecords) throws Exception {
final long timeout = 10L;
final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
- new LazyAsyncFunction(),
+ lazyAsyncFunction,
timeout,
2,
AsyncDataStream.OutputMode.ORDERED);
@@ -633,21 +666,23 @@ public class AsyncWaitOperatorTest extends TestLogger {
testHarness.setProcessingTime(initialTime + timeout + 1L);
// allow the second async stream record to be processed
- LazyAsyncFunction.countDown();
+ lazyAsyncFunction.countDown();
// wait until all async collectors in the buffer have been emitted out.
synchronized (testHarness.getCheckpointLock()) {
testHarness.close();
}
- expectedOutput.add(new StreamRecord<>(2, initialTime + 5L));
+ expectedOutput.addAll(Arrays.asList(expectedRecords));
TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
- ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
-
- assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent());
- ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
+ if (expectedException.isPresent()) {
+ assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent());
+ assertTrue(ExceptionUtils.findThrowable(
+ mockEnvironment.getActualExternalFailureCause().get(),
+ expectedException.get()).isPresent());
+ }
}
@Nonnull
http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
index e91922a..a1568c2 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
@@ -71,6 +71,9 @@ object AsyncDataStream {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
}
+ override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+ asyncFunction.timeout(input, new JavaResultFutureWrapper(resultFuture))
+ }
}
val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
@@ -198,6 +201,9 @@ object AsyncDataStream {
override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture))
}
+ override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+ asyncFunction.timeout(input, new JavaResultFutureWrapper[OUT](resultFuture))
+ }
}
val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
index d5e9e28..d6965b7 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala.async
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.Function
+import java.util.concurrent.TimeoutException
+
/**
* A function to trigger async I/O operations.
*
@@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function {
* @param resultFuture to be completed with the result data
*/
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
+
+ /**
+ * [[AsyncFunction.asyncInvoke]] timeout occurred.
+ * By default, the result future is exceptionally completed with a timeout exception.
+ *
+ * @param input element coming from an upstream task
+ * @param resultFuture to be completed with the result data
+ */
+ def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
+ resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."))
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
new file mode 100644
index 0000000..d0a2cec
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.flink.test.util.AbstractTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncDataStreamITCase {
+ val timeout = 1000L
+ private var testResult: mutable.ArrayBuffer[Int] = _
+}
+
+class AsyncDataStreamITCase extends AbstractTestBase {
+
+ @Test
+ def testOrderedWait(): Unit = {
+ testAsyncWait(true)
+ }
+
+ @Test
+ def testUnorderedWait(): Unit = {
+ testAsyncWait(false)
+ }
+
+ private def testAsyncWait(ordered: Boolean): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+
+ val source = env.fromElements(1, 2)
+
+ val asyncMapped = if (ordered) {
+ AsyncDataStream.orderedWait(
+ source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ } else {
+ AsyncDataStream.unorderedWait(
+ source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+ }
+
+ executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 6))
+ }
+
+ private def executeAndValidate(ordered: Boolean,
+ env: StreamExecutionEnvironment,
+ dataStream: DataStream[Int],
+ expectedResult: mutable.ArrayBuffer[Int]): Unit = {
+
+ testResult = mutable.ArrayBuffer[Int]()
+ dataStream.addSink(new SinkFunction[Int]() {
+ override def invoke(value: Int) {
+ testResult += value
+ }
+ })
+
+ env.execute("testAsyncDataStream")
+
+ if (ordered) {
+ assertEquals(expectedResult, testResult)
+ } else {
+ assertEquals(expectedResult, testResult.sorted)
+ }
+ }
+
+ @Test
+ def testOrderedWaitUsingAnonymousFunction(): Unit = {
+ testAsyncWaitUsingAnonymousFunction(true)
+ }
+
+ @Test
+ def testUnorderedWaitUsingAnonymousFunction(): Unit = {
+ testAsyncWaitUsingAnonymousFunction(false)
+ }
+
+ private def testAsyncWaitUsingAnonymousFunction(ordered: Boolean): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+
+ val source = env.fromElements(1, 2)
+
+ val asyncFunction: (Int, ResultFuture[Int]) => Unit =
+ (input, collector: ResultFuture[Int]) => Future {
+ collector.complete(Seq(input * 2))
+ }(ExecutionContext.global)
+ val asyncMapped = if (ordered) {
+ AsyncDataStream.orderedWait(source, timeout, TimeUnit.MILLISECONDS) {
+ asyncFunction
+ }
+ } else {
+ AsyncDataStream.unorderedWait(source, timeout, TimeUnit.MILLISECONDS) {
+ asyncFunction
+ }
+ }
+
+ executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 4))
+ }
+
+}
+
+class MyAsyncFunction extends AsyncFunction[Int, Int] {
+ override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit = {
+ Future {
+ // trigger the timeout of the even input number
+ if (input % 2 == 0) {
+ Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
+ }
+
+ resultFuture.complete(Seq(input * 2))
+ } (ExecutionContext.global)
+ }
+ override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
+ resultFuture.complete(Seq(input * 3))
+ }
+}