You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/19 06:40:40 UTC

flink git commit: [FLINK-9872][tests] Properly cancel test job

Repository: flink
Updated Branches:
  refs/heads/master e984168e2 -> e022acbde


[FLINK-9872][tests] Properly cancel test job

This closes #6349.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e022acbd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e022acbd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e022acbd

Branch: refs/heads/master
Commit: e022acbde54114b747fe8f1b32a164ce7af761f6
Parents: e984168
Author: zentol <ch...@apache.org>
Authored: Tue Jul 17 11:34:29 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Jul 19 08:40:31 2018 +0200

----------------------------------------------------------------------
 .../flink/test/checkpointing/SavepointITCase.java       | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e022acbd/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 7eb5b8f..fa8fa34 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -592,7 +592,11 @@ public class SavepointITCase extends TestLogger {
 				latch.await();
 			}
 			savepointPath = client.triggerSavepoint(jobGraph.getJobID(), null).get();
-			source.cancel();
+
+			client.cancel(jobGraph.getJobID());
+			while (!client.getJobStatus(jobGraph.getJobID()).get().isGloballyTerminalState()) {
+				Thread.sleep(100);
+			}
 
 			jobGraph = streamGraph.getJobGraph();
 			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
@@ -602,7 +606,11 @@ public class SavepointITCase extends TestLogger {
 			for (OneShotLatch latch : iterTestRestoreWait) {
 				latch.await();
 			}
-			source.cancel();
+
+			client.cancel(jobGraph.getJobID());
+			while (!client.getJobStatus(jobGraph.getJobID()).get().isGloballyTerminalState()) {
+				Thread.sleep(100);
+			}
 		} finally {
 			if (null != savepointPath) {
 				client.disposeSavepoint(savepointPath);