You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/06/02 13:42:08 UTC

flink git commit: [FLINK-7789] Add handler for Async IO operator timeouts

Repository: flink
Updated Branches:
  refs/heads/master b03805501 -> 56df69046


[FLINK-7789] Add handler for Async IO operator timeouts

This closes #6091.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/56df6904
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/56df6904
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/56df6904

Branch: refs/heads/master
Commit: 56df6904688642b1c8f9a287646c163dfae7edfd
Parents: b038055
Author: blueszheng <ki...@163.com>
Authored: Thu May 10 08:41:34 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Sat Jun 2 15:41:41 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/asyncio.md            |   6 +
 .../api/functions/async/AsyncFunction.java      |  14 ++
 .../api/operators/async/AsyncWaitOperator.java  |   4 +-
 .../operators/async/AsyncWaitOperatorTest.java  |  51 +++++--
 .../streaming/api/scala/AsyncDataStream.scala   |   6 +
 .../api/scala/async/AsyncFunction.scala         |  14 ++
 .../api/scala/AsyncDataStreamITCase.scala       | 137 +++++++++++++++++++
 7 files changed, 221 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/docs/dev/stream/operators/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md
index d27bf62..e92e7a9 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -190,6 +190,12 @@ The following two parameters control the asynchronous operations:
     is exhausted.
 
 
+### Timeout Handling
+
+When an async I/O request times out, by default an exception is thrown and job is restarted.
+If you want to handle timeouts, you can override the `AsyncFunction#timeout` method.
+
+
 ### Order of Results
 
 The concurrent requests issued by the `AsyncFunction` frequently complete in some undefined order, based on which request finished first.

http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
index 2ac218d..14a7a84 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 
 import java.io.Serializable;
+import java.util.concurrent.TimeoutException;
 
 /**
  * A function to trigger Async I/O operation.
@@ -84,4 +85,17 @@ public interface AsyncFunction<IN, OUT> extends Function, Serializable {
 	 * trigger fail-over process.
 	 */
 	void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
+
+	/**
+	 * {@link AsyncFunction#asyncInvoke} timeout occurred.
+	 * By default, the result future is exceptionally completed with a timeout exception.
+	 *
+	 * @param input element coming from an upstream task
+	 * @param resultFuture to be completed with the result data
+	 */
+	default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
+		resultFuture.completeExceptionally(
+			new TimeoutException("Async function call has timed out."));
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a7b9438..2555c3b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -53,7 +53,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * The {@link AsyncWaitOperator} allows to asynchronously process incoming stream records. For that
@@ -209,8 +208,7 @@ public class AsyncWaitOperator<IN, OUT>
 				new ProcessingTimeCallback() {
 					@Override
 					public void onProcessingTime(long timestamp) throws Exception {
-						streamRecordBufferEntry.completeExceptionally(
-							new TimeoutException("Async function call has timed out."));
+						userFunction.timeout(element.getValue(), streamRecordBufferEntry);
 					}
 				});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 17d654e..bd229bc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -76,9 +76,11 @@ import javax.annotation.Nonnull;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -215,6 +217,19 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	}
 
 	/**
+	 * A special {@link LazyAsyncFunction} for timeout handling.
+	 * Complete the result future with 3 times the input when the timeout occurred.
+	 */
+	private static class IgnoreTimeoutLazyAsyncFunction extends LazyAsyncFunction {
+		private static final long serialVersionUID = 1428714561365346128L;
+
+		@Override
+		public void timeout(Integer input, ResultFuture<Integer> resultFuture) throws Exception {
+			resultFuture.complete(Collections.singletonList(input * 3));
+		}
+	}
+
+	/**
 	 * A {@link Comparator} to compare {@link StreamRecord} while sorting them.
 	 */
 	private class StreamRecordComparator implements Comparator<Object> {
@@ -601,11 +616,29 @@ public class AsyncWaitOperatorTest extends TestLogger {
 	}
 
 	@Test
-	public void testAsyncTimeout() throws Exception {
+	public void testAsyncTimeoutFailure() throws Exception {
+		testAsyncTimeout(
+			new LazyAsyncFunction(),
+			Optional.of(TimeoutException.class),
+			new StreamRecord<>(2, 5L));
+	}
+
+	@Test
+	public void testAsyncTimeoutIgnore() throws Exception {
+		testAsyncTimeout(
+			new IgnoreTimeoutLazyAsyncFunction(),
+			Optional.empty(),
+			new StreamRecord<>(3, 0L),
+			new StreamRecord<>(2, 5L));
+	}
+
+	private void testAsyncTimeout(LazyAsyncFunction lazyAsyncFunction,
+			Optional<Class<? extends Throwable>> expectedException,
+			StreamRecord<Integer>... expectedRecords) throws Exception {
 		final long timeout = 10L;
 
 		final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(
-			new LazyAsyncFunction(),
+			lazyAsyncFunction,
 			timeout,
 			2,
 			AsyncDataStream.OutputMode.ORDERED);
@@ -633,21 +666,23 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		testHarness.setProcessingTime(initialTime + timeout + 1L);
 
 		// allow the second async stream record to be processed
-		LazyAsyncFunction.countDown();
+		lazyAsyncFunction.countDown();
 
 		// wait until all async collectors in the buffer have been emitted out.
 		synchronized (testHarness.getCheckpointLock()) {
 			testHarness.close();
 		}
 
-		expectedOutput.add(new StreamRecord<>(2, initialTime + 5L));
+		expectedOutput.addAll(Arrays.asList(expectedRecords));
 
 		TestHarnessUtil.assertOutputEquals("Output with watermark was not correct.", expectedOutput, testHarness.getOutput());
 
-		ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
-
-		assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent());
-		ExceptionUtils.findThrowable(mockEnvironment.getActualExternalFailureCause().get(), TimeoutException.class);
+		if (expectedException.isPresent()) {
+			assertTrue(mockEnvironment.getActualExternalFailureCause().isPresent());
+			assertTrue(ExceptionUtils.findThrowable(
+				mockEnvironment.getActualExternalFailureCause().get(),
+				expectedException.get()).isPresent());
+		}
 	}
 
 	@Nonnull

http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
index e91922a..a1568c2 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
@@ -71,6 +71,9 @@ object AsyncDataStream {
       override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
         asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper(resultFuture))
       }
+      override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+        asyncFunction.timeout(input, new JavaResultFutureWrapper(resultFuture))
+      }
     }
 
     val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
@@ -198,6 +201,9 @@ object AsyncDataStream {
       override def asyncInvoke(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
         asyncFunction.asyncInvoke(input, new JavaResultFutureWrapper[OUT](resultFuture))
       }
+      override def timeout(input: IN, resultFuture: JavaResultFuture[OUT]): Unit = {
+        asyncFunction.timeout(input, new JavaResultFutureWrapper[OUT](resultFuture))
+      }
     }
 
     val outType : TypeInformation[OUT] = implicitly[TypeInformation[OUT]]

http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
index d5e9e28..d6965b7 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncFunction.scala
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.scala.async
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.functions.Function
 
+import java.util.concurrent.TimeoutException
+
 /**
   * A function to trigger async I/O operations.
   *
@@ -46,4 +48,16 @@ trait AsyncFunction[IN, OUT] extends Function {
     * @param resultFuture to be completed with the result data
     */
   def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
+
+  /**
+    * [[AsyncFunction.asyncInvoke]] timeout occurred.
+    * By default, the result future is exceptionally completed with a timeout exception.
+    *
+    * @param input element coming from an upstream task
+    * @param resultFuture to be completed with the result data
+    */
+  def timeout(input: IN, resultFuture: ResultFuture[OUT]): Unit = {
+    resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/56df6904/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
new file mode 100644
index 0000000..d0a2cec
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
+import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
+import org.apache.flink.test.util.AbstractTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+object AsyncDataStreamITCase {
+  val timeout = 1000L
+  private var testResult: mutable.ArrayBuffer[Int] = _
+}
+
+class AsyncDataStreamITCase extends AbstractTestBase {
+
+  @Test
+  def testOrderedWait(): Unit = {
+    testAsyncWait(true)
+  }
+
+  @Test
+  def testUnorderedWait(): Unit = {
+    testAsyncWait(false)
+  }
+
+  private def testAsyncWait(ordered: Boolean): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+
+    val source = env.fromElements(1, 2)
+
+    val asyncMapped = if (ordered) {
+      AsyncDataStream.orderedWait(
+        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+    } else {
+      AsyncDataStream.unorderedWait(
+        source, new MyAsyncFunction(), timeout, TimeUnit.MILLISECONDS)
+    }
+
+    executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 6))
+  }
+
+  private def executeAndValidate(ordered: Boolean,
+      env: StreamExecutionEnvironment,
+      dataStream: DataStream[Int],
+      expectedResult: mutable.ArrayBuffer[Int]): Unit = {
+
+    testResult = mutable.ArrayBuffer[Int]()
+    dataStream.addSink(new SinkFunction[Int]() {
+      override def invoke(value: Int) {
+        testResult += value
+      }
+    })
+
+    env.execute("testAsyncDataStream")
+
+    if (ordered) {
+      assertEquals(expectedResult, testResult)
+    } else {
+      assertEquals(expectedResult, testResult.sorted)
+    }
+  }
+
+  @Test
+  def testOrderedWaitUsingAnonymousFunction(): Unit = {
+    testAsyncWaitUsingAnonymousFunction(true)
+  }
+
+  @Test
+  def testUnorderedWaitUsingAnonymousFunction(): Unit = {
+    testAsyncWaitUsingAnonymousFunction(false)
+  }
+
+  private def testAsyncWaitUsingAnonymousFunction(ordered: Boolean): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+
+    val source = env.fromElements(1, 2)
+
+    val asyncFunction: (Int, ResultFuture[Int]) => Unit =
+      (input, collector: ResultFuture[Int]) => Future {
+          collector.complete(Seq(input * 2))
+      }(ExecutionContext.global)
+    val asyncMapped = if (ordered) {
+      AsyncDataStream.orderedWait(source, timeout, TimeUnit.MILLISECONDS) {
+        asyncFunction
+      }
+    } else {
+      AsyncDataStream.unorderedWait(source, timeout, TimeUnit.MILLISECONDS) {
+        asyncFunction
+      }
+    }
+
+    executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 4))
+  }
+
+}
+
+class MyAsyncFunction extends AsyncFunction[Int, Int] {
+  override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit = {
+    Future {
+      // trigger the timeout of the even input number
+      if (input % 2 == 0) {
+        Thread.sleep(AsyncDataStreamITCase.timeout + 1000)
+      }
+
+      resultFuture.complete(Seq(input * 2))
+    } (ExecutionContext.global)
+  }
+  override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
+    resultFuture.complete(Seq(input * 3))
+  }
+}