You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/06/21 04:05:19 UTC

[1/2] flink git commit: [FLINK-6918] [tests] Harden AbstractOperatorRestoreTestBase by retrying CancelWithSavepoint messages

Repository: flink
Updated Branches:
  refs/heads/release-1.3 088232c2e -> a27686959


[FLINK-6918] [tests] Harden AbstractOperatorRestoreTestBase by retrying CancelWithSavepoint messages

The problem is that a StreamTask can be in state RUNNING without internally being running.
As a consequence checkpoint message will be discarded. This problem will be solved once
FLINK-4714 has been addressed. Until then, we harden the test case by retrying the
CancelWithSavepoint message.

This closes #4129.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70f33441
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70f33441
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70f33441

Branch: refs/heads/release-1.3
Commit: 70f33441f87e600f7b5197b87ac34c81604f8ee9
Parents: 088232c
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jun 15 08:54:42 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 21 11:54:17 2017 +0800

----------------------------------------------------------------------
 .../restore/AbstractOperatorRestoreTestBase.java    | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70f33441/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index f087cf4..b6dfb02 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -194,8 +194,20 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 		// Trigger savepoint
 		File targetDirectory = tmpFolder.newFolder();
 		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
-		Future<Object> future = jobManager.ask(msg, timeout);
-		result = Await.result(future, timeout);
+
+		// FLINK-6918: Retry cancel with savepoint message in case that StreamTasks were not running
+		// TODO: The retry logic should be removed once the StreamTask lifecycle has been fixed (see FLINK-4714)
+		boolean retry = true;
+		for (int i = 0; retry && i < 10; i++) {
+			Future<Object> future = jobManager.ask(msg, timeout);
+			result = Await.result(future, timeout);
+
+			if (result instanceof JobManagerMessages.CancellationFailure) {
+				Thread.sleep(50L);
+			} else {
+				retry = false;
+			}
+		}
 
 		if (result instanceof JobManagerMessages.CancellationFailure) {
 			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;


[2/2] flink git commit: [FLINK-6945] Fix TaskCancelAsyncProducerConsumerITCase by removing race condition

Posted by tr...@apache.org.
[FLINK-6945] Fix TaskCancelAsyncProducerConsumerITCase by removing race condition

The TaskCacnelAsyncProducerConsumerITCase#testCancelAsyncProducerAndConsumer test case
sometimes failed with a NPE because of a race condition. The problem was that some
invokables set static fields which are checked in the main thread. Since we checked
the wrong field, the one for the consumer, after making sure that the producer
is running, this could lead to a race condition if the consumer wasn't running yet.

This closes #4139.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2768695
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2768695
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2768695

Branch: refs/heads/release-1.3
Commit: a2768695923dc48ba02fa7714d82d833ca98a092
Parents: 70f3344
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jun 19 09:54:35 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 21 11:54:18 2017 +0800

----------------------------------------------------------------------
 .../taskmanager/TaskCancelAsyncProducerConsumerITCase.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2768695/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 4ea6511..69f1a49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -126,12 +126,12 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 					break;
 				} else {
 					// Retry
-					Thread.sleep(500);
+					Thread.sleep(500L);
 				}
 			}
 
 			// Verify that async producer is in blocking request
-			assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_CONSUMER_THREAD.getStackTrace()), producerBlocked);
+			assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked);
 
 			boolean consumerWaiting = false;
 			for (int i = 0; i < 50; i++) {
@@ -145,7 +145,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 					break;
 				} else {
 					// Retry
-					Thread.sleep(500);
+					Thread.sleep(500L);
 				}
 			}