You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/18 21:26:08 UTC
flink git commit: [FLINK-7639] [tests] Harden TaskExecutorITCase
Repository: flink
Updated Branches:
refs/heads/master 51d9a748d -> b23c37eef
[FLINK-7639] [tests] Harden TaskExecutorITCase
The TaskExecutorITCase does not wait for the leader election to have finished
before sending remote messages to the ResourceManager. Waiting on the completion
fixed the test instability.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b23c37ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b23c37ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b23c37ee
Branch: refs/heads/master
Commit: b23c37eef04ee00b93eeebc2c70b7740caf303cb
Parents: 51d9a74
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 18 23:22:37 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 18 23:25:56 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java | 5 ++++-
.../flink/runtime/taskexecutor/TaskExecutorITCase.java | 8 +++++++-
2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b23c37ee/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
index b10f7de..1ace3b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -58,7 +58,10 @@ public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpo
"not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken());
}
- sendErrorIfSender(new FencingTokenMismatchException("Expected fencing token " + rpcEndpoint.getFencingToken() + ", actual fencing token " + fencingToken));
+ sendErrorIfSender(
+ new FencingTokenMismatchException("Fencing token mismatch: Ignoring message " + message +
+ " because the fencing token " + fencingToken + " did not match the expected fencing token " +
+ rpcEndpoint.getFencingToken() + '.'));
}
} else if (message instanceof UnfencedMessage) {
super.handleMessage(((UnfencedMessage<?>) message).getPayload());
http://git-wip-us.apache.org/repos/asf/flink/blob/b23c37ee/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 348dce6..e448ccc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -68,6 +68,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
@@ -188,7 +189,10 @@ public class TaskExecutorITCase extends TestLogger {
final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
// notify the RM that it is the leader
- rmLeaderElectionService.isLeader(rmLeaderId);
+ CompletableFuture<UUID> isLeaderFuture = rmLeaderElectionService.isLeader(rmLeaderId);
+
+ // wait for the completion of the leader election
+ assertEquals(rmLeaderId, isLeaderFuture.get());
// notify the TM about the new RM leader
rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
@@ -216,6 +220,8 @@ public class TaskExecutorITCase extends TestLogger {
if (testingFatalErrorHandler.hasExceptionOccurred()) {
testingFatalErrorHandler.rethrowError();
}
+
+ rpcService.stopService();
}