You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/21 03:51:13 UTC
[2/3] flink git commit: [FLINK-6918] [tests] Harden
AbstractOperatorRestoreTestBase by retrying CancelWithSavepoint messages
[FLINK-6918] [tests] Harden AbstractOperatorRestoreTestBase by retrying CancelWithSavepoint messages
The problem is that a StreamTask can be in state RUNNING without internally being running.
As a consequence checkpoint message will be discarded. This problem will be solved once
FLINK-4714 has been addressed. Until then, we harden the test case by retrying the
CancelWithSavepoint message.
This closes #4129.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47862afb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47862afb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47862afb
Branch: refs/heads/master
Commit: 47862afbef98faee61e07ca4a00f41f34a764cf5
Parents: 6fdeed3
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 15 08:54:42 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 21 11:33:13 2017 +0800
----------------------------------------------------------------------
.../restore/AbstractOperatorRestoreTestBase.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/47862afb/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index f087cf4..b6dfb02 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -194,8 +194,20 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
// Trigger savepoint
File targetDirectory = tmpFolder.newFolder();
msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
- Future<Object> future = jobManager.ask(msg, timeout);
- result = Await.result(future, timeout);
+
+ // FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
+ // TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
+ boolean retry = true;
+ for (int i = 0; retry && i < 10; i++) {
+ Future<Object> future = jobManager.ask(msg, timeout);
+ result = Await.result(future, timeout);
+
+ if (result instanceof JobManagerMessages.CancellationFailure) {
+ Thread.sleep(50L);
+ } else {
+ retry = false;
+ }
+ }
if (result instanceof JobManagerMessages.CancellationFailure) {
JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;