You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/05/10 10:08:14 UTC
[flink] branch release-1.13 updated: [FLINK-22577][tests] Harden
KubernetesLeaderElectionAndRetrievalITCase
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 3ff9eb7 [FLINK-22577][tests] Harden KubernetesLeaderElectionAndRetrievalITCase
3ff9eb7 is described below
commit 3ff9eb7029784349fb135e6849b745ba82c7b8c0
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu May 6 15:29:01 2021 +0200
[FLINK-22577][tests] Harden KubernetesLeaderElectionAndRetrievalITCase
This commit introduces closing logic to the TestingLeaderElectionEventHandler which would
otherwise forward calls after the KubernetesLeaderElectionDriver is closed.
This closes #15856.
---
.../KubernetesHighAvailabilityTestBase.java | 1 +
...KubernetesLeaderElectionAndRetrievalITCase.java | 6 +-
.../TestingLeaderElectionEventHandler.java | 70 +++++++++++++++-------
.../ZooKeeperLeaderElectionTest.java | 5 ++
4 files changed, 60 insertions(+), 22 deletions(-)
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
index 18aad55..a89cb50 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
@@ -122,6 +122,7 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
electionEventHandler.init(leaderElectionDriver);
testMethod.run();
+ electionEventHandler.close();
leaderElectionDriver.close();
leaderRetrievalDriver.close();
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
index eb78138..a22432b 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
@@ -52,9 +52,10 @@ public class KubernetesLeaderElectionAndRetrievalITCase extends TestLogger {
KubernetesLeaderElectionDriver leaderElectionDriver = null;
KubernetesLeaderRetrievalDriver leaderRetrievalDriver = null;
+ final TestingLeaderElectionEventHandler electionEventHandler =
+ new TestingLeaderElectionEventHandler(LEADER_INFORMATION);
+
try {
- final TestingLeaderElectionEventHandler electionEventHandler =
- new TestingLeaderElectionEventHandler(LEADER_INFORMATION);
leaderElectionDriver =
new KubernetesLeaderElectionDriver(
kubernetesResource.getFlinkKubeClient(),
@@ -88,6 +89,7 @@ public class KubernetesLeaderElectionAndRetrievalITCase extends TestLogger {
assertThat(
retrievalEventHandler.getAddress(), is(LEADER_INFORMATION.getLeaderAddress()));
} finally {
+ electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
index a659b19..ff91f42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
@@ -32,6 +32,8 @@ import java.util.function.Consumer;
public class TestingLeaderElectionEventHandler extends TestingLeaderBase
implements LeaderElectionEventHandler {
+ private final Object lock = new Object();
+
private final LeaderInformation leaderInformation;
private final OneShotLatch initializationLatch;
@@ -40,6 +42,8 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase
private LeaderInformation confirmedLeaderInformation = LeaderInformation.empty();
+ private boolean running = true;
+
public TestingLeaderElectionEventHandler(LeaderInformation leaderInformation) {
this.leaderInformation = leaderInformation;
this.initializationLatch = new OneShotLatch();
@@ -51,35 +55,53 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase
initializationLatch.trigger();
}
+ private void ifRunning(Runnable action) {
+ synchronized (lock) {
+ if (running) {
+ action.run();
+ }
+ }
+ }
+
@Override
public void onGrantLeadership() {
- waitForInitialization(
- leaderElectionDriver -> {
- confirmedLeaderInformation = leaderInformation;
- leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
- leaderEventQueue.offer(confirmedLeaderInformation);
- });
+ ifRunning(
+ () ->
+ waitForInitialization(
+ leaderElectionDriver -> {
+ confirmedLeaderInformation = leaderInformation;
+ leaderElectionDriver.writeLeaderInformation(
+ confirmedLeaderInformation);
+ leaderEventQueue.offer(confirmedLeaderInformation);
+ }));
}
@Override
public void onRevokeLeadership() {
- waitForInitialization(
- (leaderElectionDriver) -> {
- confirmedLeaderInformation = LeaderInformation.empty();
- leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
- leaderEventQueue.offer(confirmedLeaderInformation);
- });
+ ifRunning(
+ () ->
+ waitForInitialization(
+ (leaderElectionDriver) -> {
+ confirmedLeaderInformation = LeaderInformation.empty();
+ leaderElectionDriver.writeLeaderInformation(
+ confirmedLeaderInformation);
+ leaderEventQueue.offer(confirmedLeaderInformation);
+ }));
}
@Override
public void onLeaderInformationChange(LeaderInformation leaderInformation) {
- waitForInitialization(
- leaderElectionDriver -> {
- if (confirmedLeaderInformation.getLeaderSessionID() != null
- && !this.confirmedLeaderInformation.equals(leaderInformation)) {
- leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
- }
- });
+ ifRunning(
+ () ->
+ waitForInitialization(
+ leaderElectionDriver -> {
+ if (confirmedLeaderInformation.getLeaderSessionID() != null
+ && !this.confirmedLeaderInformation.equals(
+ leaderInformation)) {
+ leaderElectionDriver.writeLeaderInformation(
+ confirmedLeaderInformation);
+ }
+ }));
}
private void waitForInitialization(Consumer<? super LeaderElectionDriver> operation) {
@@ -94,6 +116,14 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase
}
public LeaderInformation getConfirmedLeaderInformation() {
- return confirmedLeaderInformation;
+ synchronized (lock) {
+ return confirmedLeaderInformation;
+ }
+ }
+
+ public void close() {
+ synchronized (lock) {
+ running = false;
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 0c26ad5..d8acb0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -154,6 +154,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
+ electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
@@ -401,6 +402,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
+ electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
@@ -450,6 +452,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
.isPresent(),
is(true));
} finally {
+ electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
@@ -528,6 +531,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
// that was expected
}
} finally {
+ electionEventHandler.close();
if (leaderRetrievalDriver != null) {
leaderRetrievalDriver.close();
}
@@ -577,6 +581,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
+ electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}