You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/12 19:24:57 UTC
kafka git commit: kafka-2266;
Client Selector can drop idle connections without notifying
NetworkClient; patched by Jason Gustafson; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk ab76dbd1f -> 017c00caf
kafka-2266; Client Selector can drop idle connections without notifying NetworkClient; patched by Jason Gustafson; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/017c00ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/017c00ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/017c00ca
Branch: refs/heads/trunk
Commit: 017c00caf44aaad3418cb99d3ef42c4d1b066ddd
Parents: ab76dbd
Author: Jason Gustafson <as...@confluent.io>
Authored: Fri Jun 12 10:24:54 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jun 12 10:24:54 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/common/network/Selector.java | 2 ++
.../apache/kafka/common/network/SelectorTest.java | 17 ++++++++++++++++-
2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/017c00ca/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index effb1e6..1da215b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -393,6 +393,8 @@ public class Selector implements Selectable {
if (log.isTraceEnabled())
log.trace("About to close the idle connection from " + connectionId
+ " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
+
+ disconnected.add(connectionId);
close(connectionId);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/017c00ca/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index d23b4b6..158f982 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -26,6 +26,7 @@ import java.util.*;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -40,13 +41,15 @@ public class SelectorTest {
private static final int BUFFER_SIZE = 4 * 1024;
private EchoServer server;
+ private Time time;
private Selectable selector;
@Before
public void setup() throws Exception {
this.server = new EchoServer();
this.server.start();
- this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>());
+ this.time = new MockTime();
+ this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>());
}
@After
@@ -244,6 +247,18 @@ public class SelectorTest {
assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source());
}
+
+ @Test
+ public void testCloseOldestConnection() throws Exception {
+ String id = "0";
+ blockingConnect(id);
+
+ time.sleep(6000); // The max idle time is 5000ms
+ selector.poll(0);
+
+ assertTrue("The idle connection should have been closed", selector.disconnected().contains(id));
+ }
+
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);