You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/09 14:48:15 UTC

flink git commit: [FLINK-5037] Fixed instability in AbstractUdfStreamOperatorLifecycleTest

Repository: flink
Updated Branches:
  refs/heads/master 7a5189525 -> ddba618d9


[FLINK-5037] Fixed instability in AbstractUdfStreamOperatorLifecycleTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddba618d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddba618d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddba618d

Branch: refs/heads/master
Commit: ddba618d9b9be9bab3da2544eabb2a9975bc8d9c
Parents: 7a51895
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Nov 9 11:19:30 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 9 15:47:18 2016 +0100

----------------------------------------------------------------------
 .../AbstractUdfStreamOperatorLifecycleTest.java | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ddba618d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index cbb833b..965aec6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -231,19 +231,15 @@ public class AbstractUdfStreamOperatorLifecycleTest {
 				testCheckpointer = new Thread() {
 					@Override
 					public void run() {
-						long id = 0;
-						while (true) {
-							try {
-								Thread.sleep(50);
-								if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
-										new CheckpointMetaData(id++, System.currentTimeMillis()))) {
-									LifecycleTrackingStreamSource.runFinish.trigger();
-									break;
-								}
-							} catch (Exception e) {
-								e.printStackTrace();
-								Assert.fail();
+						try {
+							runStarted.await();
+							if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
+									new CheckpointMetaData(0, System.currentTimeMillis()))) {
+								LifecycleTrackingStreamSource.runFinish.trigger();
 							}
+						} catch (Exception e) {
+							e.printStackTrace();
+							Assert.fail();
 						}
 					}
 				};