You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/03/14 08:21:49 UTC

[flink] branch release-1.14 updated: [FLINK-24538][runtime][tests] Fix race condition when offering information to leader event queue in TestingRetrievalBase

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new faad65c  [FLINK-24538][runtime][tests] Fix race condition when offering information to leader event queue in TestingRetrievalBase
faad65c is described below

commit faad65c0b1ed5ec1438eaec3eb417c29c282dc0b
Author: Mika Naylor <ma...@autophagy.io>
AuthorDate: Thu Mar 10 13:51:09 2022 +0100

    [FLINK-24538][runtime][tests] Fix race condition when offering information to leader event queue in TestingRetrievalBase
---
 .../highavailability/KubernetesLeaderRetrievalDriverTest.java        | 1 +
 .../apache/flink/runtime/leaderelection/TestingRetrievalBase.java    | 1 -
 .../runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java  | 5 +++++
 3 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
index 6e1e972..46500a8 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriverTest.java
@@ -98,6 +98,7 @@ public class KubernetesLeaderRetrievalDriverTest extends KubernetesHighAvailabil
                             getLeaderConfigMap().getData().clear();
                             callbackHandler.onModified(
                                     Collections.singletonList(getLeaderConfigMap()));
+                            retrievalEventHandler.waitForEmptyLeaderInformation(TIMEOUT);
                             assertThat(retrievalEventHandler.getAddress(), is(nullValue()));
                         });
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingRetrievalBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingRetrievalBase.java
index e9b0fb5..449eba9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingRetrievalBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingRetrievalBase.java
@@ -110,7 +110,6 @@ public class TestingRetrievalBase {
 
     public void offerToLeaderQueue(LeaderInformation leaderInformation) {
         leaderEventQueue.offer(leaderInformation);
-        this.leader = leaderInformation;
     }
 
     public int getLeaderEventQueueSize() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java
index c0305ad..9ca3153 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/SettableLeaderRetrievalServiceTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
+
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
@@ -32,6 +34,7 @@ import static org.junit.Assert.assertThat;
 public class SettableLeaderRetrievalServiceTest extends TestLogger {
 
     private SettableLeaderRetrievalService settableLeaderRetrievalService;
+    private static final Duration TIMEOUT = Duration.ofHours(1);
 
     @Before
     public void setUp() {
@@ -47,6 +50,7 @@ public class SettableLeaderRetrievalServiceTest extends TestLogger {
         final TestingListener listener = new TestingListener();
         settableLeaderRetrievalService.start(listener);
 
+        listener.waitForNewLeader(TIMEOUT.toMillis());
         assertThat(listener.getAddress(), equalTo(localhost));
         assertThat(
                 listener.getLeaderSessionID(), equalTo(HighAvailabilityServices.DEFAULT_LEADER_ID));
@@ -61,6 +65,7 @@ public class SettableLeaderRetrievalServiceTest extends TestLogger {
         settableLeaderRetrievalService.notifyListener(
                 localhost, HighAvailabilityServices.DEFAULT_LEADER_ID);
 
+        listener.waitForNewLeader(TIMEOUT.toMillis());
         assertThat(listener.getAddress(), equalTo(localhost));
         assertThat(
                 listener.getLeaderSessionID(), equalTo(HighAvailabilityServices.DEFAULT_LEADER_ID));