You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/09/28 20:08:12 UTC

[kafka] branch trunk updated: KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a8205c  KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)
0a8205c is described below

commit 0a8205cb45667fd7d001f21e4e0412220b1abec1
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Sep 28 21:07:57 2018 +0100

    KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)
    
    Reviewers: Jun Rao <ju...@gmail.com>. Ismael Juma <is...@juma.me.uk>
---
 .../main/java/org/apache/kafka/common/network/Selector.java |  2 ++
 .../java/org/apache/kafka/common/network/SelectorTest.java  | 13 +++++++++++++
 2 files changed, 15 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 44223e7..93325d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -319,6 +319,8 @@ public class Selector implements Selectable, AutoCloseable {
         SelectionKey key = socketChannel.register(nioSelector, interestedOps);
         KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
         this.channels.put(id, channel);
+        if (idleExpiryManager != null)
+            idleExpiryManager.update(channel.id(), time.nanoseconds());
         return key;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index cfd7fb3..cef7c7f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -364,6 +364,19 @@ public class SelectorTest {
     }
 
     @Test
+    public void testIdleExpiryWithoutReadyKeys() throws IOException {
+        String id = "0";
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        KafkaChannel channel = selector.channel(id);
+        channel.selectionKey().interestOps(0);
+
+        time.sleep(6000); // The max idle time is 5000ms
+        selector.poll(0);
+        assertTrue("The idle connection should have been closed", selector.disconnected().containsKey(id));
+        assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
+    }
+
+    @Test
     public void testImmediatelyConnectedCleaned() throws Exception {
         Metrics metrics = new Metrics(); // new metrics object to avoid metric registration conflicts
         Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {