You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/17 15:06:25 UTC
[kafka] branch trunk updated: MINOR: Add test for
ConsumerNetworkClient.trySend (#6739)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64c2d49 MINOR: Add test for ConsumerNetworkClient.trySend (#6739)
64c2d49 is described below
commit 64c2d49cf5c8b1cecbd7601dc2d1577fc606ab1e
Author: Shaobo Liu <la...@gmail.com>
AuthorDate: Fri May 17 23:06:06 2019 +0800
MINOR: Add test for ConsumerNetworkClient.trySend (#6739)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../consumer/internals/ConsumerNetworkClient.java | 3 +-
.../internals/ConsumerNetworkClientTest.java | 35 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 753fdb0..2a6e8b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -459,7 +459,8 @@ public class ConsumerNetworkClient implements Closeable {
}
}
- private long trySend(long now) {
+ // Visible for testing
+ long trySend(long now) {
long pollDelayMs = maxPollTimeoutMs;
// send any requests that can be sent now
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f3750aa..d6c5c2e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -44,6 +44,7 @@ import org.junit.Test;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -379,6 +380,40 @@ public class ConsumerNetworkClientTest {
assertEquals(0, consumerClient.pendingRequestCount(node));
}
+ @Test
+ public void testTrySend() {
+ final AtomicBoolean isReady = new AtomicBoolean();
+ final AtomicInteger checkCount = new AtomicInteger();
+ client = new MockClient(time, metadata) {
+ @Override
+ public boolean ready(Node node, long now) {
+ checkCount.incrementAndGet();
+ if (isReady.get())
+ return super.ready(node, now);
+ else
+ return false;
+ }
+ };
+ consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, 10, Integer.MAX_VALUE);
+ consumerClient.send(node, heartbeat());
+ consumerClient.send(node, heartbeat());
+ assertEquals(2, consumerClient.pendingRequestCount(node));
+ assertEquals(0, client.inFlightRequestCount(node.idString()));
+
+ consumerClient.trySend(time.milliseconds());
+ // only check one time when the node doesn't ready
+ assertEquals(1, checkCount.getAndSet(0));
+ assertEquals(2, consumerClient.pendingRequestCount(node));
+ assertEquals(0, client.inFlightRequestCount(node.idString()));
+
+ isReady.set(true);
+ consumerClient.trySend(time.milliseconds());
+ // check node ready or not for every request
+ assertEquals(2, checkCount.getAndSet(0));
+ assertEquals(2, consumerClient.pendingRequestCount(node));
+ assertEquals(2, client.inFlightRequestCount(node.idString()));
+ }
+
private HeartbeatRequest.Builder heartbeat() {
return new HeartbeatRequest.Builder(new HeartbeatRequestData()
.setGroupId("group")