You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/07/10 14:11:03 UTC

[kafka] 03/08: MINOR: Fix flaky DistributedHerderTest cases related to zombie fencing (#13806)

This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cafefe708530992c58924cae50270befee3672dd
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Mon Jun 5 12:50:54 2023 -0700

    MINOR: Fix flaky DistributedHerderTest cases related to zombie fencing (#13806)
    
    Reviewers: Yash Mayya <ya...@gmail.com>, Chris Egerton <ch...@aiven.io>
---
 .../runtime/distributed/DistributedHerder.java     |  7 +++++
 .../runtime/distributed/DistributedHerderTest.java | 35 ++++++++++++----------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 88f92f53f17..98543e6fc21 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -2146,6 +2146,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     DistributedHerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
         DistributedHerderRequest req = new DistributedHerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback);
         requests.add(req);
+        // We don't need to synchronize here
+        // If the condition evaluates to true, we can and should trigger a wakeup
+        // If it evaluates to false because our request has suddenly been popped off of the queue, then
+        // the herder is already running the request and no wakeup is necessary
+        // If it evaluates to false because our request was not at the head of the queue, then a wakeup
+        // should already have been triggered when the request that is currently at the head of the
+        // queue was added
         if (peekWithoutException() == req)
             member.wakeup();
         return req;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 33b8f966e2e..cc4147737df 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -3337,8 +3337,17 @@ public class DistributedHerderTest {
 
         expectAnyTicks();
 
+        // There's a race condition in DistributedHerder::addRequest that isn't triggered
+        // often under normal circumstances, but becomes more likely during this test
+        // because we mock out member::poll and the herder no longer blocks when invoking
+        // that method
+        // This can cause the herder to skip a call to member::wakeup during that method, and
+        // since the number of calls to that method isn't vital to our testing coverage, we permit
+        // any number of calls
+        // The race condition itself is benign and should have no negative impact on herder logic;
+        // see DistributedHerder::addRequest for more detail
         member.wakeup();
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().anyTimes();
 
         org.easymock.IExpectationSetters<RestClient.HttpResponse<Object>> expectRequest = EasyMock.expect(
                 restClient.httpRequest(
@@ -3357,7 +3366,7 @@ public class DistributedHerderTest {
             return null;
         });
 
-        expectHerderShutdown(true);
+        expectHerderShutdown();
         forwardRequestExecutor.shutdown();
         EasyMock.expectLastCall();
         EasyMock.expect(forwardRequestExecutor.awaitTermination(anyLong(), anyObject())).andReturn(true);
@@ -3442,7 +3451,7 @@ public class DistributedHerderTest {
             EasyMock.expectLastCall();
         }
 
-        expectHerderShutdown(false);
+        expectHerderShutdown();
 
         PowerMock.replayAll();
 
@@ -3477,7 +3486,7 @@ public class DistributedHerderTest {
         expectAnyTicks();
 
         member.wakeup();
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().anyTimes();
 
         ClusterConfigState configState = exactlyOnceSnapshot(
                 sessionKey,
@@ -3515,7 +3524,7 @@ public class DistributedHerderTest {
         configBackingStore.putTaskCountRecord(CONN1, 1);
         EasyMock.expectLastCall();
 
-        expectHerderShutdown(true);
+        expectHerderShutdown();
 
         PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
 
@@ -3548,7 +3557,7 @@ public class DistributedHerderTest {
         expectAnyTicks();
 
         member.wakeup();
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().anyTimes();
 
         ClusterConfigState configState = exactlyOnceSnapshot(
                 sessionKey,
@@ -3563,7 +3572,7 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.fenceZombies(EasyMock.eq(CONN1), EasyMock.eq(2), EasyMock.eq(CONN1_CONFIG)))
                 .andThrow(fencingException);
 
-        expectHerderShutdown(true);
+        expectHerderShutdown();
 
         PowerMock.replayAll();
 
@@ -3598,7 +3607,7 @@ public class DistributedHerderTest {
         expectAnyTicks();
 
         member.wakeup();
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().anyTimes();
 
         ClusterConfigState configState = exactlyOnceSnapshot(
                 sessionKey,
@@ -3630,7 +3639,7 @@ public class DistributedHerderTest {
             });
         }
 
-        expectHerderShutdown(true);
+        expectHerderShutdown();
 
         PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
 
@@ -3752,7 +3761,7 @@ public class DistributedHerderTest {
             EasyMock.expectLastCall();
         });
 
-        expectHerderShutdown(false);
+        expectHerderShutdown();
 
         PowerMock.replayAll();
 
@@ -4333,11 +4342,7 @@ public class DistributedHerderTest {
         EasyMock.expectLastCall();
     }
 
-    private void expectHerderShutdown(boolean wakeup) {
-        if (wakeup) {
-            member.wakeup();
-            EasyMock.expectLastCall();
-        }
+    private void expectHerderShutdown() {
         worker.stopAndAwaitConnectors();
         EasyMock.expectLastCall();
         worker.stopAndAwaitTasks();