You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by di...@apache.org on 2023/07/10 14:11:03 UTC
[kafka] 03/08: MINOR: Fix flaky DistributedHerderTest cases related to zombie fencing (#13806)
This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit cafefe708530992c58924cae50270befee3672dd
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Mon Jun 5 12:50:54 2023 -0700
MINOR: Fix flaky DistributedHerderTest cases related to zombie fencing (#13806)
Reviewers: Yash Mayya <ya...@gmail.com>, Chris Egerton <ch...@aiven.io>
---
.../runtime/distributed/DistributedHerder.java | 7 +++++
.../runtime/distributed/DistributedHerderTest.java | 35 ++++++++++++----------
2 files changed, 27 insertions(+), 15 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 88f92f53f17..98543e6fc21 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -2146,6 +2146,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
DistributedHerderRequest addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
DistributedHerderRequest req = new DistributedHerderRequest(time.milliseconds() + delayMs, requestSeqNum.incrementAndGet(), action, callback);
requests.add(req);
+ // We don't need to synchronize here
+ // If the condition evaluates to true, we can and should trigger a wakeup
+ // If it evaluates to false because our request has suddenly been popped off of the queue, then
+ // the herder is already running the request and no wakeup is necessary
+ // If it evaluates to false because our request was not at the head of the queue, then a wakeup
+ // should already have been triggered when the request that is currently at the head of the
+ // queue was added
if (peekWithoutException() == req)
member.wakeup();
return req;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 33b8f966e2e..cc4147737df 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -3337,8 +3337,17 @@ public class DistributedHerderTest {
expectAnyTicks();
+ // There's a race condition in DistributedHerder::addRequest that isn't triggered
+ // often under normal circumstances, but becomes more likely during this test
+ // because we mock out member::poll and the herder no longer blocks when invoking
+ // that method
+ // This can cause the herder to skip a call to member::wakeup during that method, and
+ // since the number of calls to that method isn't vital to our testing coverage, we permit
+ // any number of calls
+ // The race condition itself is benign and should have no negative impact on herder logic;
+ // see DistributedHerder::addRequest for more detail
member.wakeup();
- EasyMock.expectLastCall();
+ EasyMock.expectLastCall().anyTimes();
org.easymock.IExpectationSetters<RestClient.HttpResponse<Object>> expectRequest = EasyMock.expect(
restClient.httpRequest(
@@ -3357,7 +3366,7 @@ public class DistributedHerderTest {
return null;
});
- expectHerderShutdown(true);
+ expectHerderShutdown();
forwardRequestExecutor.shutdown();
EasyMock.expectLastCall();
EasyMock.expect(forwardRequestExecutor.awaitTermination(anyLong(), anyObject())).andReturn(true);
@@ -3442,7 +3451,7 @@ public class DistributedHerderTest {
EasyMock.expectLastCall();
}
- expectHerderShutdown(false);
+ expectHerderShutdown();
PowerMock.replayAll();
@@ -3477,7 +3486,7 @@ public class DistributedHerderTest {
expectAnyTicks();
member.wakeup();
- EasyMock.expectLastCall();
+ EasyMock.expectLastCall().anyTimes();
ClusterConfigState configState = exactlyOnceSnapshot(
sessionKey,
@@ -3515,7 +3524,7 @@ public class DistributedHerderTest {
configBackingStore.putTaskCountRecord(CONN1, 1);
EasyMock.expectLastCall();
- expectHerderShutdown(true);
+ expectHerderShutdown();
PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
@@ -3548,7 +3557,7 @@ public class DistributedHerderTest {
expectAnyTicks();
member.wakeup();
- EasyMock.expectLastCall();
+ EasyMock.expectLastCall().anyTimes();
ClusterConfigState configState = exactlyOnceSnapshot(
sessionKey,
@@ -3563,7 +3572,7 @@ public class DistributedHerderTest {
EasyMock.expect(worker.fenceZombies(EasyMock.eq(CONN1), EasyMock.eq(2), EasyMock.eq(CONN1_CONFIG)))
.andThrow(fencingException);
- expectHerderShutdown(true);
+ expectHerderShutdown();
PowerMock.replayAll();
@@ -3598,7 +3607,7 @@ public class DistributedHerderTest {
expectAnyTicks();
member.wakeup();
- EasyMock.expectLastCall();
+ EasyMock.expectLastCall().anyTimes();
ClusterConfigState configState = exactlyOnceSnapshot(
sessionKey,
@@ -3630,7 +3639,7 @@ public class DistributedHerderTest {
});
}
- expectHerderShutdown(true);
+ expectHerderShutdown();
PowerMock.replayAll(workerFencingFuture, herderFencingFuture);
@@ -3752,7 +3761,7 @@ public class DistributedHerderTest {
EasyMock.expectLastCall();
});
- expectHerderShutdown(false);
+ expectHerderShutdown();
PowerMock.replayAll();
@@ -4333,11 +4342,7 @@ public class DistributedHerderTest {
EasyMock.expectLastCall();
}
- private void expectHerderShutdown(boolean wakeup) {
- if (wakeup) {
- member.wakeup();
- EasyMock.expectLastCall();
- }
+ private void expectHerderShutdown() {
worker.stopAndAwaitConnectors();
EasyMock.expectLastCall();
worker.stopAndAwaitTasks();