You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/18 08:41:50 UTC

flink git commit: [hotfix] [tests] Workaround for WaitForAllVerticesToBeRunning not working in TestingCluster

Repository: flink
Updated Branches:
  refs/heads/master b8d6778f7 -> d7f9f9e3e


[hotfix] [tests] Workaround for WaitForAllVerticesToBeRunning not working in TestingCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7f9f9e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7f9f9e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7f9f9e3

Branch: refs/heads/master
Commit: d7f9f9e3ef4c3906ac9816fb9d68bdfe3be1f08f
Parents: b8d6778
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jul 17 15:16:25 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Tue Jul 18 10:41:25 2017 +0200

----------------------------------------------------------------------
 .../runtime/testingUtils/TestingCluster.scala    | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7f9f9e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 0e3eae5..48c4d85 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -391,13 +391,20 @@ class TestingCluster(
     result match {
       case success: CheckpointRequestSuccess => success.path
       case fail: CheckpointRequestFailure => {
-        if (fail.cause.getMessage.contains("tasks not ready")) {
-          // retry if the tasks are not ready yet.
-          Thread.sleep(50)
-          requestCheckpoint(jobId, options)
-        } else {
-          throw new IOException(fail.cause)
+        // TODO right now, this is a dirty way to detect whether the checkpoint
+        // failed because tasks were not ready.This would not be required if
+        // TestingJobManagerMessages.WaitForAllVerticesToBeRunning(...) works
+        // properly.
+        if (fail.cause != null) {
+          val innerCause = fail.cause.getCause
+          if (innerCause != null
+            && innerCause.getMessage.contains("tasks not ready")) {
+            // retry if the tasks are not ready yet.
+            Thread.sleep(50)
+            return requestCheckpoint(jobId, options)
+          }
         }
+        throw new IOException(fail.cause)
       }
       case _ => throw new IllegalStateException("Trigger checkpoint failed")
     }