You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/06 00:48:09 UTC
[kafka] branch trunk updated: MINOR: Complete inflight requests in
order on disconnect (#4642)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f2c087 MINOR: Complete inflight requests in order on disconnect (#4642)
8f2c087 is described below
commit 8f2c08716630eba7e3badacc79be4c8c413a00da
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Mar 5 16:48:05 2018 -0800
MINOR: Complete inflight requests in order on disconnect (#4642)
NetworkClient should use FIFO order when completing inflight requests following a disconnect.
I've added new unit tests for `InFlightRequests` and `NetworkClient` which verify completion order.
Reviewers: Jun Rao <ju...@gmail.com>
---
.../org/apache/kafka/clients/InFlightRequests.java | 11 ++--
.../apache/kafka/clients/InFlightRequestsTest.java | 63 ++++++++++++++++++----
.../apache/kafka/clients/NetworkClientTest.java | 52 ++++++++++++++++++
.../java/org/apache/kafka/test/MockSelector.java | 52 +++++++++++++-----
4 files changed, 151 insertions(+), 27 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index a062818..5caee2d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -20,12 +20,12 @@ import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
/**
* The set of requests which have been sent or are being sent but haven't yet received a response
*/
@@ -151,9 +151,14 @@ final class InFlightRequests {
if (reqs == null) {
return Collections.emptyList();
} else {
- Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
+ final Deque<NetworkClient.InFlightRequest> clearedRequests = requests.remove(node);
inFlightRequestCount.getAndAdd(-clearedRequests.size());
- return clearedRequests;
+ return new Iterable<NetworkClient.InFlightRequest>() {
+ @Override
+ public Iterator<NetworkClient.InFlightRequest> iterator() {
+ return clearedRequests.descendingIterator();
+ }
+ };
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
index e00ca08..600e5dc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/InFlightRequestsTest.java
@@ -17,44 +17,85 @@
package org.apache.kafka.clients;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
public class InFlightRequestsTest {
private InFlightRequests inFlightRequests;
+ private int correlationId;
+ private String dest = "dest";
+
@Before
public void setup() {
inFlightRequests = new InFlightRequests(12);
- NetworkClient.InFlightRequest ifr =
- new NetworkClient.InFlightRequest(null, 0, "dest", null, false, false, null, null, 0);
- inFlightRequests.add(ifr);
+ correlationId = 0;
}
@Test
- public void checkIncrementAndDecrementOnLastSent() {
+ public void testCompleteLastSent() {
+ int correlationId1 = addRequest(dest);
+ int correlationId2 = addRequest(dest);
+ assertEquals(2, inFlightRequests.count());
+
+ assertEquals(correlationId2, inFlightRequests.completeLastSent(dest).header.correlationId());
assertEquals(1, inFlightRequests.count());
- inFlightRequests.completeLastSent("dest");
+ assertEquals(correlationId1, inFlightRequests.completeLastSent(dest).header.correlationId());
assertEquals(0, inFlightRequests.count());
}
@Test
- public void checkDecrementOnClear() {
- inFlightRequests.clearAll("dest");
+ public void testClearAll() {
+ int correlationId1 = addRequest(dest);
+ int correlationId2 = addRequest(dest);
+
+ List<NetworkClient.InFlightRequest> clearedRequests = TestUtils.toList(this.inFlightRequests.clearAll(dest));
assertEquals(0, inFlightRequests.count());
+ assertEquals(2, clearedRequests.size());
+ assertEquals(correlationId1, clearedRequests.get(0).header.correlationId());
+ assertEquals(correlationId2, clearedRequests.get(1).header.correlationId());
}
@Test
- public void checkDecrementOnCompleteNext() {
- inFlightRequests.completeNext("dest");
+ public void testCompleteNext() {
+ int correlationId1 = addRequest(dest);
+ int correlationId2 = addRequest(dest);
+ assertEquals(2, inFlightRequests.count());
+
+ assertEquals(correlationId1, inFlightRequests.completeNext(dest).header.correlationId());
+ assertEquals(1, inFlightRequests.count());
+
+ assertEquals(correlationId2, inFlightRequests.completeNext(dest).header.correlationId());
assertEquals(0, inFlightRequests.count());
}
@Test(expected = IllegalStateException.class)
- public void throwExceptionOnNeverBeforeSeenNode() {
- inFlightRequests.completeNext("not-added");
+ public void testCompleteNextThrowsIfNoInflights() {
+ inFlightRequests.completeNext(dest);
}
+
+ @Test(expected = IllegalStateException.class)
+ public void testCompleteLastSentThrowsIfNoInFlights() {
+ inFlightRequests.completeLastSent(dest);
+ }
+
+ private int addRequest(String destination) {
+ int correlationId = this.correlationId;
+ this.correlationId += 1;
+
+ RequestHeader requestHeader = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", correlationId);
+ NetworkClient.InFlightRequest ifr = new NetworkClient.InFlightRequest(requestHeader, 0,
+ destination, null, false, false, null, null, 0);
+ inFlightRequests.add(ifr);
+ return correlationId;
+ }
+
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 8c2428e..77b36df 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -37,12 +37,14 @@ import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class NetworkClientTest {
@@ -82,6 +84,7 @@ public class NetworkClientTest {
@Before
public void setup() {
+ selector.reset();
metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
}
@@ -351,6 +354,55 @@ public class NetworkClientTest {
}
@Test
+ public void testDisconnectWithMultipleInFlights() throws Exception {
+ NetworkClient client = this.clientWithNoVersionDiscovery;
+ awaitReady(client, node);
+ assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
+ client.isReady(node, time.milliseconds()));
+
+ MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.<String>emptyList(), true);
+ long now = time.milliseconds();
+
+ final List<ClientResponse> callbackResponses = new ArrayList<>();
+ RequestCompletionHandler callback = new RequestCompletionHandler() {
+ @Override
+ public void onComplete(ClientResponse response) {
+ callbackResponses.add(response);
+ }
+ };
+
+ ClientRequest request1 = client.newClientRequest(node.idString(), builder, now, true, callback);
+ client.send(request1, now);
+ client.poll(0, now);
+
+ ClientRequest request2 = client.newClientRequest(node.idString(), builder, now, true, callback);
+ client.send(request2, now);
+ client.poll(0, now);
+
+ assertNotEquals(request1.correlationId(), request2.correlationId());
+
+ assertEquals(2, client.inFlightRequestCount());
+ assertEquals(2, client.inFlightRequestCount(node.idString()));
+
+ client.disconnect(node.idString());
+
+ List<ClientResponse> responses = client.poll(0, time.milliseconds());
+ assertEquals(2, responses.size());
+ assertEquals(responses, callbackResponses);
+ assertEquals(0, client.inFlightRequestCount());
+ assertEquals(0, client.inFlightRequestCount(node.idString()));
+
+ // Ensure that the responses are returned in the order they were sent
+ ClientResponse response1 = responses.get(0);
+ assertTrue(response1.wasDisconnected());
+ assertEquals(request1.correlationId(), response1.requestHeader().correlationId());
+
+ ClientResponse response2 = responses.get(1);
+ assertTrue(response2.wasDisconnected());
+ assertEquals(request2.correlationId(), response2.requestHeader().correlationId());
+ }
+
+ @Test
public void testCallDisconnect() throws Exception {
awaitReady(client, node);
assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 6fc1b1b..bd27d5c 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -16,6 +16,14 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.common.network.ChannelState;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.utils.Time;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -25,24 +33,17 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.kafka.common.network.ChannelState;
-import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.network.Selectable;
-import org.apache.kafka.common.network.Send;
-import org.apache.kafka.common.utils.Time;
-
/**
* A fake selector to use for testing
*/
public class MockSelector implements Selectable {
private final Time time;
- private final List<Send> initiatedSends = new ArrayList<Send>();
- private final List<Send> completedSends = new ArrayList<Send>();
- private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
+ private final List<Send> initiatedSends = new ArrayList<>();
+ private final List<Send> completedSends = new ArrayList<>();
+ private final List<NetworkReceive> completedReceives = new ArrayList<>();
private final Map<String, ChannelState> disconnected = new HashMap<>();
- private final List<String> connected = new ArrayList<String>();
+ private final List<String> connected = new ArrayList<>();
private final List<DelayedReceive> delayedReceives = new ArrayList<>();
public MockSelector(Time time) {
@@ -109,8 +110,28 @@ public class MockSelector implements Selectable {
@Override
public void poll(long timeout) throws IOException {
- this.completedSends.addAll(this.initiatedSends);
+ completeInitiatedSends();
+ completeDelayedReceives();
+ time.sleep(timeout);
+ }
+
+ private void completeInitiatedSends() throws IOException {
+ for (Send send : initiatedSends) {
+ completeSend(send);
+ }
this.initiatedSends.clear();
+ }
+
+ private void completeSend(Send send) throws IOException {
+ // Consume the send so that we will be able to send more requests to the destination
+ ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
+ while (!send.completed()) {
+ send.writeTo(discardChannel);
+ }
+ completedSends.add(send);
+ }
+
+ private void completeDelayedReceives() {
for (Send completedSend : completedSends) {
Iterator<DelayedReceive> delayedReceiveIterator = delayedReceives.iterator();
while (delayedReceiveIterator.hasNext()) {
@@ -121,7 +142,6 @@ public class MockSelector implements Selectable {
}
}
}
- time.sleep(timeout);
}
@Override
@@ -178,4 +198,10 @@ public class MockSelector implements Selectable {
public boolean isChannelReady(String id) {
return true;
}
+
+ public void reset() {
+ clear();
+ initiatedSends.clear();
+ delayedReceives.clear();
+ }
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.