You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/09/29 09:03:25 UTC

[kafka] branch 2.0 updated: KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new aea7b6d  KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)
aea7b6d is described below

commit aea7b6dc8b78bb30b2eb0ea88e4c865ef009d028
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Sep 28 21:07:57 2018 +0100

    KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)
    
    Reviewers: Jun Rao <ju...@gmail.com>. Ismael Juma <is...@juma.me.uk>
---
 .../main/java/org/apache/kafka/common/network/Selector.java |  2 ++
 .../java/org/apache/kafka/common/network/SelectorTest.java  | 13 +++++++++++++
 2 files changed, 15 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 7e32509..941a615 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -279,6 +279,8 @@ public class Selector implements Selectable, AutoCloseable {
         SelectionKey key = socketChannel.register(nioSelector, interestedOps);
         KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
         this.channels.put(id, channel);
+        if (idleExpiryManager != null)
+            idleExpiryManager.update(channel.id(), time.nanoseconds());
         return key;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 3bb6244..2f1437d 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -364,6 +364,19 @@ public class SelectorTest {
     }
 
     @Test
+    public void testIdleExpiryWithoutReadyKeys() throws IOException {
+        String id = "0";
+        selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+        KafkaChannel channel = selector.channel(id);
+        channel.selectionKey().interestOps(0);
+
+        time.sleep(6000); // The max idle time is 5000ms
+        selector.poll(0);
+        assertTrue("The idle connection should have been closed", selector.disconnected().containsKey(id));
+        assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
+    }
+
+    @Test
     public void testImmediatelyConnectedCleaned() throws Exception {
         Metrics metrics = new Metrics(); // new metrics object to avoid metric registration conflicts
         Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {