You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/17 15:06:25 UTC

[kafka] branch trunk updated: MINOR: Add test for ConsumerNetworkClient.trySend (#6739)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64c2d49  MINOR: Add test for ConsumerNetworkClient.trySend (#6739)
64c2d49 is described below

commit 64c2d49cf5c8b1cecbd7601dc2d1577fc606ab1e
Author: Shaobo Liu <la...@gmail.com>
AuthorDate: Fri May 17 23:06:06 2019 +0800

    MINOR: Add test for ConsumerNetworkClient.trySend (#6739)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../consumer/internals/ConsumerNetworkClient.java  |  3 +-
 .../internals/ConsumerNetworkClientTest.java       | 35 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 753fdb0..2a6e8b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -459,7 +459,8 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
-    private long trySend(long now) {
+    // Visible for testing
+    long trySend(long now) {
         long pollDelayMs = maxPollTimeoutMs;
 
         // send any requests that can be sent now
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f3750aa..d6c5c2e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -44,6 +44,7 @@ import org.junit.Test;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -379,6 +380,40 @@ public class ConsumerNetworkClientTest {
         assertEquals(0, consumerClient.pendingRequestCount(node));
     }
 
+    @Test
+    public void testTrySend() {
+        final AtomicBoolean isReady = new AtomicBoolean();
+        final AtomicInteger checkCount = new AtomicInteger();
+        client = new MockClient(time, metadata) {
+            @Override
+            public boolean ready(Node node, long now) {
+                checkCount.incrementAndGet();
+                if (isReady.get())
+                    return super.ready(node, now);
+                else
+                    return false;
+            }
+        };
+        consumerClient = new ConsumerNetworkClient(new LogContext(), client, metadata, time, 100, 10, Integer.MAX_VALUE);
+        consumerClient.send(node, heartbeat());
+        consumerClient.send(node, heartbeat());
+        assertEquals(2, consumerClient.pendingRequestCount(node));
+        assertEquals(0, client.inFlightRequestCount(node.idString()));
+
+        consumerClient.trySend(time.milliseconds());
+        // only check one time when the node doesn't ready
+        assertEquals(1, checkCount.getAndSet(0));
+        assertEquals(2, consumerClient.pendingRequestCount(node));
+        assertEquals(0, client.inFlightRequestCount(node.idString()));
+
+        isReady.set(true);
+        consumerClient.trySend(time.milliseconds());
+        // check node ready or not for every request
+        assertEquals(2, checkCount.getAndSet(0));
+        assertEquals(2, consumerClient.pendingRequestCount(node));
+        assertEquals(2, client.inFlightRequestCount(node.idString()));
+    }
+
     private HeartbeatRequest.Builder heartbeat() {
         return new HeartbeatRequest.Builder(new HeartbeatRequestData()
                 .setGroupId("group")