You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/18 08:41:50 UTC
flink git commit: [hotfix] [tests] Workaround for
WaitForAllVerticesToBeRunning not working in TestingCluster
Repository: flink
Updated Branches:
refs/heads/master b8d6778f7 -> d7f9f9e3e
[hotfix] [tests] Workaround for WaitForAllVerticesToBeRunning not working in TestingCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7f9f9e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7f9f9e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7f9f9e3
Branch: refs/heads/master
Commit: d7f9f9e3ef4c3906ac9816fb9d68bdfe3be1f08f
Parents: b8d6778
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jul 17 15:16:25 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jul 18 10:41:25 2017 +0200
----------------------------------------------------------------------
.../runtime/testingUtils/TestingCluster.scala | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d7f9f9e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 0e3eae5..48c4d85 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -391,13 +391,20 @@ class TestingCluster(
result match {
case success: CheckpointRequestSuccess => success.path
case fail: CheckpointRequestFailure => {
- if (fail.cause.getMessage.contains("tasks not ready")) {
- // retry if the tasks are not ready yet.
- Thread.sleep(50)
- requestCheckpoint(jobId, options)
- } else {
- throw new IOException(fail.cause)
+ // TODO right now, this is a dirty way to detect whether the checkpoint
+ // failed because tasks were not ready.This would not be required if
+ // TestingJobManagerMessages.WaitForAllVerticesToBeRunning(...) works
+ // properly.
+ if (fail.cause != null) {
+ val innerCause = fail.cause.getCause
+ if (innerCause != null
+ && innerCause.getMessage.contains("tasks not ready")) {
+ // retry if the tasks are not ready yet.
+ Thread.sleep(50)
+ return requestCheckpoint(jobId, options)
+ }
}
+ throw new IOException(fail.cause)
}
case _ => throw new IllegalStateException("Trigger checkpoint failed")
}