You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/07/17 23:01:12 UTC
kafka git commit: KAFKA-5587;
Remove channel only after staged receives are delivered
Repository: kafka
Updated Branches:
refs/heads/trunk e2fe19d22 -> 28c83d966
KAFKA-5587; Remove channel only after staged receives are delivered
When idle connections are closed, ensure that channels with staged
receives are retained in `closingChannels` until all staged receives
are completed. Also ensure that only one staged receive is completed
in each poll, even when channels are closed.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3526 from rajinisivaram/KAFKA-5587
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/28c83d96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/28c83d96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/28c83d96
Branch: refs/heads/trunk
Commit: 28c83d9667676515607713d4ccfd3757a8afcba2
Parents: e2fe19d
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon Jul 17 20:29:26 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jul 17 23:58:22 2017 +0100
----------------------------------------------------------------------
clients/out/test/resources/log4j.properties | 21 ++++++++
.../offsetAndMetadataSerializedfile | Bin 0 -> 144 bytes
.../serializedData/topicPartitionSerializedfile | Bin 0 -> 125 bytes
.../apache/kafka/common/network/Selector.java | 18 ++++---
.../kafka/common/network/SelectorTest.java | 54 +++++++++++++++++++
5 files changed, 86 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/28c83d96/clients/out/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/clients/out/test/resources/log4j.properties b/clients/out/test/resources/log4j.properties
new file mode 100644
index 0000000..b1d5b7f
--- /dev/null
+++ b/clients/out/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=ERROR
http://git-wip-us.apache.org/repos/asf/kafka/blob/28c83d96/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile
----------------------------------------------------------------------
diff --git a/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile b/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile
new file mode 100644
index 0000000..95319cb
Binary files /dev/null and b/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile differ
http://git-wip-us.apache.org/repos/asf/kafka/blob/28c83d96/clients/out/test/resources/serializedData/topicPartitionSerializedfile
----------------------------------------------------------------------
diff --git a/clients/out/test/resources/serializedData/topicPartitionSerializedfile b/clients/out/test/resources/serializedData/topicPartitionSerializedfile
new file mode 100644
index 0000000..2c1c501
Binary files /dev/null and b/clients/out/test/resources/serializedData/topicPartitionSerializedfile differ
http://git-wip-us.apache.org/repos/asf/kafka/blob/28c83d96/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5dbe83b..da3de80 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -327,14 +327,16 @@ public class Selector implements Selectable, AutoCloseable {
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
- addToCompletedReceives();
-
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
+
+ // Add to completedReceives after closing expired connections to avoid removing
+ // channels with completed receives until all staged receives are completed.
+ addToCompletedReceives();
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
@@ -563,11 +565,7 @@ public class Selector implements Selectable, AutoCloseable {
// are tracked to ensure that requests are processed one-by-one by the broker to preserve ordering.
Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
if (processOutstanding && deque != null && !deque.isEmpty()) {
- if (!channel.isMute()) {
- addToCompletedReceives(channel, deque);
- if (deque.isEmpty())
- this.stagedReceives.remove(channel);
- }
+ // stagedReceives will be moved to completedReceives later along with receives from other channels
closingChannels.put(channel.id(), channel);
} else
doClose(channel, processOutstanding);
@@ -697,6 +695,12 @@ public class Selector implements Selectable, AutoCloseable {
return new HashSet<>(nioSelector.keys());
}
+ // only for testing
+ int numStagedReceives(KafkaChannel channel) {
+ Deque<NetworkReceive> deque = stagedReceives.get(channel);
+ return deque == null ? 0 : deque.size();
+ }
+
private class SelectorMetrics {
private final Metrics metrics;
private final String metricGrpPrefix;
http://git-wip-us.apache.org/repos/asf/kafka/blob/28c83d96/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 33959fd..76ebb21 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -17,6 +17,9 @@
package org.apache.kafka.common.network;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
@@ -268,6 +271,57 @@ public class SelectorTest {
assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
}
+ @Test
+ public void testCloseOldestConnectionWithOneStagedReceive() throws Exception {
+ verifyCloseOldestConnectionWithStagedReceives(1);
+ }
+
+ @Test
+ public void testCloseOldestConnectionWithMultipleStagedReceives() throws Exception {
+ verifyCloseOldestConnectionWithStagedReceives(5);
+ }
+
+ private void verifyCloseOldestConnectionWithStagedReceives(int maxStagedReceives) throws Exception {
+ String id = "0";
+ blockingConnect(id);
+ KafkaChannel channel = selector.channel(id);
+
+ selector.mute(id);
+ for (int i = 0; i <= maxStagedReceives; i++) {
+ selector.send(createSend(id, String.valueOf(i)));
+ selector.poll(1000);
+ }
+
+ selector.unmute(id);
+ do {
+ selector.poll(1000);
+ } while (selector.completedReceives().isEmpty());
+
+ int stagedReceives = selector.numStagedReceives(channel);
+ int completedReceives = 0;
+ while (selector.disconnected().isEmpty()) {
+ time.sleep(6000); // The max idle time is 5000ms
+ selector.poll(0);
+ completedReceives += selector.completedReceives().size();
+ // With SSL, more receives may be staged from buffered data
+ int newStaged = selector.numStagedReceives(channel) - (stagedReceives - completedReceives);
+ if (newStaged > 0) {
+ stagedReceives += newStaged;
+ assertNotNull("Channel should not have been expired", selector.channel(id));
+ assertFalse("Channel should not have been disconnected", selector.disconnected().containsKey(id));
+ } else if (!selector.completedReceives().isEmpty()) {
+ assertEquals(1, selector.completedReceives().size());
+ assertTrue("Channel not found", selector.closingChannel(id) != null || selector.channel(id) != null);
+ assertFalse("Disconnect notified too early", selector.disconnected().containsKey(id));
+ }
+ }
+ assertEquals(maxStagedReceives, completedReceives);
+ assertEquals(stagedReceives, completedReceives);
+ assertNull("Channel not removed", selector.channel(id));
+ assertNull("Channel not removed", selector.closingChannel(id));
+ assertTrue("Disconnect not notified", selector.disconnected().containsKey(id));
+ assertTrue("Unexpected receive", selector.completedReceives().isEmpty());
+ }
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));