You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/09/29 09:03:25 UTC
[kafka] branch 2.0 updated: KAFKA-7453: Expire registered channels
not selected within idle timeout (#5712)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new aea7b6d KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)
aea7b6d is described below
commit aea7b6dc8b78bb30b2eb0ea88e4c865ef009d028
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Sep 28 21:07:57 2018 +0100
KAFKA-7453: Expire registered channels not selected within idle timeout (#5712)
Reviewers: Jun Rao <ju...@gmail.com>. Ismael Juma <is...@juma.me.uk>
---
.../main/java/org/apache/kafka/common/network/Selector.java | 2 ++
.../java/org/apache/kafka/common/network/SelectorTest.java | 13 +++++++++++++
2 files changed, 15 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 7e32509..941a615 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -279,6 +279,8 @@ public class Selector implements Selectable, AutoCloseable {
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
this.channels.put(id, channel);
+ if (idleExpiryManager != null)
+ idleExpiryManager.update(channel.id(), time.nanoseconds());
return key;
}
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 3bb6244..2f1437d 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -364,6 +364,19 @@ public class SelectorTest {
}
@Test
+ public void testIdleExpiryWithoutReadyKeys() throws IOException {
+ String id = "0";
+ selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
+ KafkaChannel channel = selector.channel(id);
+ channel.selectionKey().interestOps(0);
+
+ time.sleep(6000); // The max idle time is 5000ms
+ selector.poll(0);
+ assertTrue("The idle connection should have been closed", selector.disconnected().containsKey(id));
+ assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
+ }
+
+ @Test
public void testImmediatelyConnectedCleaned() throws Exception {
Metrics metrics = new Metrics(); // new metrics object to avoid metric registration conflicts
Selector selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {