You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/05/10 10:08:14 UTC

[flink] branch release-1.13 updated: [FLINK-22577][tests] Harden KubernetesLeaderElectionAndRetrievalITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 3ff9eb7  [FLINK-22577][tests] Harden KubernetesLeaderElectionAndRetrievalITCase
3ff9eb7 is described below

commit 3ff9eb7029784349fb135e6849b745ba82c7b8c0
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu May 6 15:29:01 2021 +0200

    [FLINK-22577][tests] Harden KubernetesLeaderElectionAndRetrievalITCase
    
    This commit introduces closing logic to the TestingLeaderElectionEventHandler which would
    otherwise forward calls after the KubernetesLeaderElectionDriver is closed.
    
    This closes #15856.
---
 .../KubernetesHighAvailabilityTestBase.java        |  1 +
 ...KubernetesLeaderElectionAndRetrievalITCase.java |  6 +-
 .../TestingLeaderElectionEventHandler.java         | 70 +++++++++++++++-------
 .../ZooKeeperLeaderElectionTest.java               |  5 ++
 4 files changed, 60 insertions(+), 22 deletions(-)

diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
index 18aad55..a89cb50 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
@@ -122,6 +122,7 @@ public class KubernetesHighAvailabilityTestBase extends TestLogger {
             electionEventHandler.init(leaderElectionDriver);
             testMethod.run();
 
+            electionEventHandler.close();
             leaderElectionDriver.close();
             leaderRetrievalDriver.close();
         }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
index eb78138..a22432b 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
@@ -52,9 +52,10 @@ public class KubernetesLeaderElectionAndRetrievalITCase extends TestLogger {
         KubernetesLeaderElectionDriver leaderElectionDriver = null;
         KubernetesLeaderRetrievalDriver leaderRetrievalDriver = null;
 
+        final TestingLeaderElectionEventHandler electionEventHandler =
+                new TestingLeaderElectionEventHandler(LEADER_INFORMATION);
+
         try {
-            final TestingLeaderElectionEventHandler electionEventHandler =
-                    new TestingLeaderElectionEventHandler(LEADER_INFORMATION);
             leaderElectionDriver =
                     new KubernetesLeaderElectionDriver(
                             kubernetesResource.getFlinkKubeClient(),
@@ -88,6 +89,7 @@ public class KubernetesLeaderElectionAndRetrievalITCase extends TestLogger {
             assertThat(
                     retrievalEventHandler.getAddress(), is(LEADER_INFORMATION.getLeaderAddress()));
         } finally {
+            electionEventHandler.close();
             if (leaderElectionDriver != null) {
                 leaderElectionDriver.close();
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
index a659b19..ff91f42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionEventHandler.java
@@ -32,6 +32,8 @@ import java.util.function.Consumer;
 public class TestingLeaderElectionEventHandler extends TestingLeaderBase
         implements LeaderElectionEventHandler {
 
+    private final Object lock = new Object();
+
     private final LeaderInformation leaderInformation;
 
     private final OneShotLatch initializationLatch;
@@ -40,6 +42,8 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase
 
     private LeaderInformation confirmedLeaderInformation = LeaderInformation.empty();
 
+    private boolean running = true;
+
     public TestingLeaderElectionEventHandler(LeaderInformation leaderInformation) {
         this.leaderInformation = leaderInformation;
         this.initializationLatch = new OneShotLatch();
@@ -51,35 +55,53 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase
         initializationLatch.trigger();
     }
 
+    private void ifRunning(Runnable action) {
+        synchronized (lock) {
+            if (running) {
+                action.run();
+            }
+        }
+    }
+
     @Override
     public void onGrantLeadership() {
-        waitForInitialization(
-                leaderElectionDriver -> {
-                    confirmedLeaderInformation = leaderInformation;
-                    leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
-                    leaderEventQueue.offer(confirmedLeaderInformation);
-                });
+        ifRunning(
+                () ->
+                        waitForInitialization(
+                                leaderElectionDriver -> {
+                                    confirmedLeaderInformation = leaderInformation;
+                                    leaderElectionDriver.writeLeaderInformation(
+                                            confirmedLeaderInformation);
+                                    leaderEventQueue.offer(confirmedLeaderInformation);
+                                }));
     }
 
     @Override
     public void onRevokeLeadership() {
-        waitForInitialization(
-                (leaderElectionDriver) -> {
-                    confirmedLeaderInformation = LeaderInformation.empty();
-                    leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
-                    leaderEventQueue.offer(confirmedLeaderInformation);
-                });
+        ifRunning(
+                () ->
+                        waitForInitialization(
+                                (leaderElectionDriver) -> {
+                                    confirmedLeaderInformation = LeaderInformation.empty();
+                                    leaderElectionDriver.writeLeaderInformation(
+                                            confirmedLeaderInformation);
+                                    leaderEventQueue.offer(confirmedLeaderInformation);
+                                }));
     }
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) {
-        waitForInitialization(
-                leaderElectionDriver -> {
-                    if (confirmedLeaderInformation.getLeaderSessionID() != null
-                            && !this.confirmedLeaderInformation.equals(leaderInformation)) {
-                        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
-                    }
-                });
+        ifRunning(
+                () ->
+                        waitForInitialization(
+                                leaderElectionDriver -> {
+                                    if (confirmedLeaderInformation.getLeaderSessionID() != null
+                                            && !this.confirmedLeaderInformation.equals(
+                                                    leaderInformation)) {
+                                        leaderElectionDriver.writeLeaderInformation(
+                                                confirmedLeaderInformation);
+                                    }
+                                }));
     }
 
     private void waitForInitialization(Consumer<? super LeaderElectionDriver> operation) {
@@ -94,6 +116,14 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase
     }
 
     public LeaderInformation getConfirmedLeaderInformation() {
-        return confirmedLeaderInformation;
+        synchronized (lock) {
+            return confirmedLeaderInformation;
+        }
+    }
+
+    public void close() {
+        synchronized (lock) {
+            running = false;
+        }
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 0c26ad5..d8acb0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -154,6 +154,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
                     is(TEST_LEADER.getLeaderSessionID()));
             assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
         } finally {
+            electionEventHandler.close();
             if (leaderElectionDriver != null) {
                 leaderElectionDriver.close();
             }
@@ -401,6 +402,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
                     is(TEST_LEADER.getLeaderSessionID()));
             assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
         } finally {
+            electionEventHandler.close();
             if (leaderElectionDriver != null) {
                 leaderElectionDriver.close();
             }
@@ -450,6 +452,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
                             .isPresent(),
                     is(true));
         } finally {
+            electionEventHandler.close();
             if (leaderElectionDriver != null) {
                 leaderElectionDriver.close();
             }
@@ -528,6 +531,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
                 // that was expected
             }
         } finally {
+            electionEventHandler.close();
             if (leaderRetrievalDriver != null) {
                 leaderRetrievalDriver.close();
             }
@@ -577,6 +581,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
                     is(TEST_LEADER.getLeaderSessionID()));
             assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
         } finally {
+            electionEventHandler.close();
             if (leaderElectionDriver != null) {
                 leaderElectionDriver.close();
             }