You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/21 03:51:13 UTC

[2/3] flink git commit: [FLINK-6918] [tests] Harden AbstractOperatorRestoreTestBase by retrying CancelWithSavepoint messages

[FLINK-6918] [tests] Harden AbstractOperatorRestoreTestBase by retrying CancelWithSavepoint messages

The problem is that a StreamTask can be in state RUNNING without internally being running.
As a consequence checkpoint message will be discarded. This problem will be solved once
FLINK-4714 has been addressed. Until then, we harden the test case by retrying the
CancelWithSavepoint message.

This closes #4129.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47862afb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47862afb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47862afb

Branch: refs/heads/master
Commit: 47862afbef98faee61e07ca4a00f41f34a764cf5
Parents: 6fdeed3
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 15 08:54:42 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 21 11:33:13 2017 +0800

----------------------------------------------------------------------
 .../restore/AbstractOperatorRestoreTestBase.java    | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47862afb/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index f087cf4..b6dfb02 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -194,8 +194,20 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 		// Trigger savepoint
 		File targetDirectory = tmpFolder.newFolder();
 		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
-		Future<Object> future = jobManager.ask(msg, timeout);
-		result = Await.result(future, timeout);
+
+		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
+		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
+		boolean retry = true;
+		for (int i = 0; retry && i < 10; i++) {
+			Future<Object> future = jobManager.ask(msg, timeout);
+			result = Await.result(future, timeout);
+
+			if (result instanceof JobManagerMessages.CancellationFailure) {
+				Thread.sleep(50L);
+			} else {
+				retry = false;
+			}
+		}
 
 		if (result instanceof JobManagerMessages.CancellationFailure) {
 			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;