You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/12 19:24:57 UTC

kafka git commit: kafka-2266; Client Selector can drop idle connections without notifying NetworkClient; patched by Jason Gustafson; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk ab76dbd1f -> 017c00caf


kafka-2266; Client Selector can drop idle connections without notifying NetworkClient; patched by Jason Gustafson; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/017c00ca
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/017c00ca
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/017c00ca

Branch: refs/heads/trunk
Commit: 017c00caf44aaad3418cb99d3ef42c4d1b066ddd
Parents: ab76dbd
Author: Jason Gustafson <as...@confluent.io>
Authored: Fri Jun 12 10:24:54 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jun 12 10:24:54 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/network/Selector.java  |  2 ++
 .../apache/kafka/common/network/SelectorTest.java  | 17 ++++++++++++++++-
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/017c00ca/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index effb1e6..1da215b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -393,6 +393,8 @@ public class Selector implements Selectable {
                     if (log.isTraceEnabled())
                         log.trace("About to close the idle connection from " + connectionId
                                 + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis");
+
+                    disconnected.add(connectionId);
                     close(connectionId);
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/017c00ca/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index d23b4b6..158f982 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -26,6 +26,7 @@ import java.util.*;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -40,13 +41,15 @@ public class SelectorTest {
     private static final int BUFFER_SIZE = 4 * 1024;
 
     private EchoServer server;
+    private Time time;
     private Selectable selector;
 
     @Before
     public void setup() throws Exception {
         this.server = new EchoServer();
         this.server.start();
-        this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>());
+        this.time = new MockTime();
+        this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>());
     }
 
     @After
@@ -244,6 +247,18 @@ public class SelectorTest {
         assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source());
     }
 
+
+    @Test
+    public void testCloseOldestConnection() throws Exception {
+        String id = "0";
+        blockingConnect(id);
+
+        time.sleep(6000); // The max idle time is 5000ms
+        selector.poll(0);
+
+        assertTrue("The idle connection should have been closed", selector.disconnected().contains(id));
+    }
+
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));
         selector.poll(1000L);