You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/06 00:48:09 UTC

[kafka] branch trunk updated: MINOR: Complete inflight requests in order on disconnect (#4642)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f2c087  MINOR: Complete inflight requests in order on disconnect (#4642)
8f2c087 is described below

commit 8f2c08716630eba7e3badacc79be4c8c413a00da
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Mar 5 16:48:05 2018 -0800

    MINOR: Complete inflight requests in order on disconnect (#4642)
    
    NetworkClient should use FIFO order when completing inflight requests following a disconnect.
    
    I've added new unit tests for `InFlightRequests` and `NetworkClient` which verify completion order.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../org/apache/kafka/clients/InFlightRequests.java | 11 ++--
 .../apache/kafka/clients/InFlightRequestsTest.java | 63 ++++++++++++++++++----
 .../apache/kafka/clients/NetworkClientTest.java    | 52 ++++++++++++++++++
 .../java/org/apache/kafka/test/MockSelector.java   | 52 +++++++++++++-----
 4 files changed, 151 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index a062818..5caee2d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -20,12 +20,12 @@ import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-
 /**
  * The set of requests which have been sent or are being sent but haven't yet received a response
  */
@@ -151,9 +151,14 @@ final class InFlightRequests {
         if (reqs == null) {
             return Collections.emptyList();
         } else {
-            Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
+            final Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
             inFlightRequestCount.getAndAdd(-clearedRequests.size());
-            return clearedRequests;
+            return new Iterable<NetworkClient.InFlightRequest>() {
+                @Override
+                public Iterator<NetworkClient.InFlightRequest> iterator() {
+                    return clearedRequests.descendingIterator();
+                }
+            };
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
index e00ca08..600e5dc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
@@ -17,44 +17,85 @@
 
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 
 public class InFlightRequestsTest {
 
     private InFlightRequests inFlightRequests;
+    private int correlationId;
+    private String dest = "dest";
+
     @Before
     public void setup() {
         inFlightRequests = new InFlightRequests(12);
-        NetworkClient.InFlightRequest ifr =
-                new NetworkClient.InFlightRequest(null, 0, "dest", null, false, false, null, null, 0);
-        inFlightRequests.add(ifr);
+        correlationId = 0;
     }
 
     @Test
-    public void checkIncrementAndDecrementOnLastSent() {
+    public void testCompleteLastSent() {
+        int correlationId1 = addRequest(dest);
+        int correlationId2 = addRequest(dest);
+        assertEquals(2, inFlightRequests.count());
+
+        assertEquals(correlationId2, inFlightRequests.completeLastSent(dest).header.correlationId());
         assertEquals(1, inFlightRequests.count());
 
-        inFlightRequests.completeLastSent("dest");
+        assertEquals(correlationId1, inFlightRequests.completeLastSent(dest).header.correlationId());
         assertEquals(0, inFlightRequests.count());
     }
 
     @Test
-    public void checkDecrementOnClear() {
-        inFlightRequests.clearAll("dest");
+    public void testClearAll() {
+        int correlationId1 = addRequest(dest);
+        int correlationId2 = addRequest(dest);
+
+        List<NetworkClient.InFlightRequest> clearedRequests = TestUtils.toList(this.inFlightRequests.clearAll(dest));
         assertEquals(0, inFlightRequests.count());
+        assertEquals(2, clearedRequests.size());
+        assertEquals(correlationId1, clearedRequests.get(0).header.correlationId());
+        assertEquals(correlationId2, clearedRequests.get(1).header.correlationId());
     }
 
     @Test
-    public void checkDecrementOnCompleteNext() {
-        inFlightRequests.completeNext("dest");
+    public void testCompleteNext() {
+        int correlationId1 = addRequest(dest);
+        int correlationId2 = addRequest(dest);
+        assertEquals(2, inFlightRequests.count());
+
+        assertEquals(correlationId1, inFlightRequests.completeNext(dest).header.correlationId());
+        assertEquals(1, inFlightRequests.count());
+
+        assertEquals(correlationId2, inFlightRequests.completeNext(dest).header.correlationId());
         assertEquals(0, inFlightRequests.count());
     }
 
     @Test(expected = IllegalStateException.class)
-    public void throwExceptionOnNeverBeforeSeenNode() {
-        inFlightRequests.completeNext("not-added");
+    public void testCompleteNextThrowsIfNoInflights() {
+        inFlightRequests.completeNext(dest);
     }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCompleteLastSentThrowsIfNoInFlights() {
+        inFlightRequests.completeLastSent(dest);
+    }
+
+    private int addRequest(String destination) {
+        int correlationId = this.correlationId;
+        this.correlationId += 1;
+
+        RequestHeader requestHeader = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", correlationId);
+        NetworkClient.InFlightRequest ifr = new NetworkClient.InFlightRequest(requestHeader, 0,
+                destination, null, false, false, null, null, 0);
+        inFlightRequests.add(ifr);
+        return correlationId;
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 8c2428e..77b36df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -37,12 +37,14 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 public class NetworkClientTest {
@@ -82,6 +84,7 @@ public class NetworkClientTest {
 
     @Before
     public void setup() {
+        selector.reset();
         metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
     }
 
@@ -351,6 +354,55 @@ public class NetworkClientTest {
     }
 
     @Test
+    public void testDisconnectWithMultipleInFlights() throws Exception {
+        NetworkClient client = this.clientWithNoVersionDiscovery;
+        awaitReady(client, node);
+        assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
+                client.isReady(node, time.milliseconds()));
+
+        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.<String>emptyList(), true);
+        long now = time.milliseconds();
+
+        final List<ClientResponse> callbackResponses = new ArrayList<>();
+        RequestCompletionHandler callback = new RequestCompletionHandler() {
+            @Override
+            public void onComplete(ClientResponse response) {
+                callbackResponses.add(response);
+            }
+        };
+
+        ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, callback);
+        client.send(request1, now);
+        client.poll(0, now);
+
+        ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, callback);
+        client.send(request2, now);
+        client.poll(0, now);
+
+        assertNotEquals(request1.correlationId(), request2.correlationId());
+
+        assertEquals(2, client.inFlightRequestCount());
+        assertEquals(2, client.inFlightRequestCount(node.idString()));
+
+        client.disconnect(node.idString());
+
+        List<ClientResponse> responses = client.poll(0, time.milliseconds());
+        assertEquals(2, responses.size());
+        assertEquals(responses, callbackResponses);
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(0, client.inFlightRequestCount(node.idString()));
+
+        // Ensure that the responses are returned in the order they were sent
+        ClientResponse response1 = responses.get(0);
+        assertTrue(response1.wasDisconnected());
+        assertEquals(request1.correlationId(), response1.requestHeader().correlationId());
+
+        ClientResponse response2 = responses.get(1);
+        assertTrue(response2.wasDisconnected());
+        assertEquals(request2.correlationId(), response2.requestHeader().correlationId());
+    }
+
+    @Test
     public void testCallDisconnect() throws Exception {
         awaitReady(client, node);
         assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 6fc1b1b..bd27d5c 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -16,6 +16,14 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.common.network.ChannelState;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.utils.Time;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -25,24 +33,17 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.network.ChannelState;
-import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.network.Selectable;
-import org.apache.kafka.common.network.Send;
-import org.apache.kafka.common.utils.Time;
-
 /**
  * A fake selector to use for testing
  */
 public class MockSelector implements Selectable {
 
     private final Time time;
-    private final List<Send> initiatedSends = new ArrayList<Send>();
-    private final List<Send> completedSends = new ArrayList<Send>();
-    private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
+    private final List<Send> initiatedSends = new ArrayList<>();
+    private final List<Send> completedSends = new ArrayList<>();
+    private final List<NetworkReceive> completedReceives = new ArrayList<>();
     private final Map<String, ChannelState> disconnected = new HashMap<>();
-    private final List<String> connected = new ArrayList<String>();
+    private final List<String> connected = new ArrayList<>();
     private final List<DelayedReceive> delayedReceives = new ArrayList<>();
 
     public MockSelector(Time time) {
@@ -109,8 +110,28 @@ public class MockSelector implements Selectable {
 
     @Override
     public void poll(long timeout) throws IOException {
-        this.completedSends.addAll(this.initiatedSends);
+        completeInitiatedSends();
+        completeDelayedReceives();
+        time.sleep(timeout);
+    }
+
+    private void completeInitiatedSends() throws IOException {
+        for (Send send : initiatedSends) {
+            completeSend(send);
+        }
         this.initiatedSends.clear();
+    }
+
+    private void completeSend(Send send) throws IOException {
+        // Consume the send so that we will be able to send more requests to the destination
+        ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
+        while (!send.completed()) {
+            send.writeTo(discardChannel);
+        }
+        completedSends.add(send);
+    }
+
+    private void completeDelayedReceives() {
         for (Send completedSend : completedSends) {
             Iterator<DelayedReceive> delayedReceiveIterator = delayedReceives.iterator();
             while (delayedReceiveIterator.hasNext()) {
@@ -121,7 +142,6 @@ public class MockSelector implements Selectable {
                 }
             }
         }
-        time.sleep(timeout);
     }
 
     @Override
@@ -178,4 +198,10 @@ public class MockSelector implements Selectable {
     public boolean isChannelReady(String id) {
         return true;
     }
+
+    public void reset() {
+        clear();
+        initiatedSends.clear();
+        delayedReceives.clear();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.