You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/08/07 09:18:05 UTC

[flink] branch release-1.9 updated: [FLINK-13605][tests] Fix unstable case AsyncDataStreamITCase#testUnorderedWait

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 7cb3c5f  [FLINK-13605][tests] Fix unstable case AsyncDataStreamITCase#testUnorderedWait
7cb3c5f is described below

commit 7cb3c5f21976f8e811c92db09a21f3c290f97bb7
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Wed Aug 7 15:59:08 2019 +0800

    [FLINK-13605][tests] Fix unstable case AsyncDataStreamITCase#testUnorderedWait
---
 .../api/scala/AsyncDataStreamITCase.scala          | 25 ++++++----------------
 1 file changed, 6 insertions(+), 19 deletions(-)

diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
index 8d53b4d..a4a9772 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -51,7 +51,7 @@ class AsyncDataStreamITCase extends AbstractTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(1)
 
-    val source = env.fromElements(1, 2)
+    val source = env.fromElements(1)
 
     val timeout = 1L
     val asyncMapped = if (ordered) {
@@ -62,7 +62,7 @@ class AsyncDataStreamITCase extends AbstractTestBase {
         source, new AsyncFunctionWithTimeoutExpired(), timeout, TimeUnit.MILLISECONDS)
     }
 
-    executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 6))
+    executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](3))
   }
 
   private def executeAndValidate(ordered: Boolean,
@@ -139,34 +139,21 @@ class AsyncDataStreamITCase extends AbstractTestBase {
 }
 
 class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
-  @transient var timeoutLatch: CountDownLatch = _
   @transient var invokeLatch: CountDownLatch = _
 
   override def open(parameters: Configuration): Unit = {
-    timeoutLatch = new CountDownLatch(1)
     invokeLatch = new CountDownLatch(1)
   }
 
   override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit = {
     Future {
-      // trigger the timeout of the even input number
-      if (input % 2 == 0) {
-        invokeLatch.await()
-        resultFuture.complete(Seq(input * 2))
-      } else {
-        resultFuture.complete(Seq(input * 2))
-        timeoutLatch.countDown()
-      }
+      invokeLatch.await()
+      resultFuture.complete(Seq(input * 2))
     } (ExecutionContext.global)
   }
   override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = {
-    if (input % 2 == 0) {
-      resultFuture.complete(Seq(input * 3))
-      invokeLatch.countDown()
-    } else {
-      timeoutLatch.await()
-      resultFuture.complete(Seq(input * 3))
-    }
+    resultFuture.complete(Seq(input * 3))
+    invokeLatch.countDown()
   }
 }