You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/08/07 09:19:00 UTC
[flink] branch master updated: [FLINK-13605][tests] Fix unstable
case AsyncDataStreamITCase#testUnorderedWait
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8b3430b [FLINK-13605][tests] Fix unstable case AsyncDataStreamITCase#testUnorderedWait
8b3430b is described below
commit 8b3430bb288500ed7ff381458a2ff8519876c993
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Wed Aug 7 15:59:08 2019 +0800
[FLINK-13605][tests] Fix unstable case AsyncDataStreamITCase#testUnorderedWait
This closes #9378
---
.../api/scala/AsyncDataStreamITCase.scala | 25 ++++++----------------
1 file changed, 6 insertions(+), 19 deletions(-)
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
index 8d53b4d..a4a9772 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -51,7 +51,7 @@ class AsyncDataStreamITCase extends AbstractTestBase {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
- val source = env.fromElements(1, 2)
+ val source = env.fromElements(1)
val timeout = 1L
val asyncMapped = if (ordered) {
@@ -62,7 +62,7 @@ class AsyncDataStreamITCase extends AbstractTestBase {
source, new AsyncFunctionWithTimeoutExpired(), timeout, TimeUnit.MILLISECONDS)
}
- executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 6))
+ executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](3))
}
private def executeAndValidate(ordered: Boolean,
@@ -139,34 +139,21 @@ class AsyncDataStreamITCase extends AbstractTestBase {
}
class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
- @transient var timeoutLatch: CountDownLatch = _
@transient var invokeLatch: CountDownLatch = _
override def open(parameters: Configuration): Unit = {
- timeoutLatch = new CountDownLatch(1)
invokeLatch = new CountDownLatch(1)
}
override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit = {
Future {
- // trigger the timeout of the even input number
- if (input % 2 == 0) {
- invokeLatch.await()
- resultFuture.complete(Seq(input * 2))
- } else {
- resultFuture.complete(Seq(input * 2))
- timeoutLatch.countDown()
- }
+ invokeLatch.await()
+ resultFuture.complete(Seq(input * 2))
} (ExecutionContext.global)
}
override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
- if (input % 2 == 0) {
- resultFuture.complete(Seq(input * 3))
- invokeLatch.countDown()
- } else {
- timeoutLatch.await()
- resultFuture.complete(Seq(input * 3))
- }
+ resultFuture.complete(Seq(input * 3))
+ invokeLatch.countDown()
}
}