You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/09 14:48:15 UTC
flink git commit: [FLINK-5037] Fixed instability in
AbstractUdfStreamOperatorLifecycleTest
Repository: flink
Updated Branches:
refs/heads/master 7a5189525 -> ddba618d9
[FLINK-5037] Fixed instability in AbstractUdfStreamOperatorLifecycleTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddba618d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddba618d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddba618d
Branch: refs/heads/master
Commit: ddba618d9b9be9bab3da2544eabb2a9975bc8d9c
Parents: 7a51895
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Nov 9 11:19:30 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 9 15:47:18 2016 +0100
----------------------------------------------------------------------
.../AbstractUdfStreamOperatorLifecycleTest.java | 20 ++++++++------------
1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ddba618d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index cbb833b..965aec6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -231,19 +231,15 @@ public class AbstractUdfStreamOperatorLifecycleTest {
testCheckpointer = new Thread() {
@Override
public void run() {
- long id = 0;
- while (true) {
- try {
- Thread.sleep(50);
- if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
- new CheckpointMetaData(id++, System.currentTimeMillis()))) {
- LifecycleTrackingStreamSource.runFinish.trigger();
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
+ try {
+ runStarted.await();
+ if (getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
+ new CheckpointMetaData(0, System.currentTimeMillis()))) {
+ LifecycleTrackingStreamSource.runFinish.trigger();
}
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
}
}
};