You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/18 21:26:08 UTC

flink git commit: [FLINK-7639] [tests] Harden TaskExecutorITCase

Repository: flink
Updated Branches:
  refs/heads/master 51d9a748d -> b23c37eef


[FLINK-7639] [tests] Harden TaskExecutorITCase

The TaskExecutorITCase does not wait for the leader election to have finished
before sending remote messages to the ResourceManager. Waiting on the completion
fixed the test instability.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b23c37ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b23c37ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b23c37ee

Branch: refs/heads/master
Commit: b23c37eef04ee00b93eeebc2c70b7740caf303cb
Parents: 51d9a74
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 18 23:22:37 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 18 23:25:56 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java    | 5 ++++-
 .../flink/runtime/taskexecutor/TaskExecutorITCase.java       | 8 +++++++-
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b23c37ee/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index b10f7de..1ace3b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -58,7 +58,10 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
 						"not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken());
 				}
 
-				sendErrorIfSender(new FencingTokenMismatchException("Expected fencing token " + rpcEndpoint.getFencingToken() + ", actual fencing token " + fencingToken));
+				sendErrorIfSender(
+					new FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + message +
+						" because the fencing token " + fencingToken + " did not match the expected fencing token " +
+						rpcEndpoint.getFencingToken() + '.'));
 			}
 		} else if (message instanceof UnfencedMessage) {
 			super.handleMessage(((UnfencedMessage<?>) message).getPayload());

http://git-wip-us.apache.org/repos/asf/flink/blob/b23c37ee/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 348dce6..e448ccc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -68,6 +68,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
@@ -188,7 +189,10 @@ public class TaskExecutorITCase extends TestLogger {
 			final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
 
 			// notify the RM that it is the leader
-			rmLeaderElectionService.isLeader(rmLeaderId);
+			CompletableFuture<UUID> isLeaderFuture = rmLeaderElectionService.isLeader(rmLeaderId);
+
+			// wait for the completion of the leader election
+			assertEquals(rmLeaderId, isLeaderFuture.get());
 
 			// notify the TM about the new RM leader
 			rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
@@ -216,6 +220,8 @@ public class TaskExecutorITCase extends TestLogger {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
 				testingFatalErrorHandler.rethrowError();
 			}
+
+			rpcService.stopService();
 		}