You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/19 14:20:41 UTC
[kafka] branch 2.0 updated: KAFKA-7012: Don't process SSL channels
without data to process (#5237)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 6020173 KAFKA-7012: Don't process SSL channels without data to process (#5237)
6020173 is described below
commit 6020173fa0f900a96ca0126d713f13840ac3ee73
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Jun 19 15:16:16 2018 +0100
KAFKA-7012: Don't process SSL channels without data to process (#5237)
Avoid unnecessary processing of SSL channels when there are some bytes buffered, but not enough to make progress.
Reviewers: Radai Rosenblatt <ra...@gmail.com>, Jun Rao <ju...@gmail.com>
---
.../org/apache/kafka/common/network/Selector.java | 4 +-
.../kafka/common/network/SslTransportLayer.java | 27 ++++++-
.../kafka/common/network/TransportLayer.java | 1 +
.../kafka/common/network/SslSelectorTest.java | 91 +++++++++++++++++++++-
4 files changed, 116 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 334ca79..a269f0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -498,7 +498,9 @@ public class Selector implements Selectable, AutoCloseable {
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
- //next poll call otherwise data may be stuck in said buffers forever.
+ //next poll call otherwise data may be stuck in said buffers forever. If we attempt
+ //to process buffered data and no progress is made, the channel buffered status is
+ //cleared to avoid the overhead of checking every time.
keysWithBufferedRead.add(key);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 704a198..06e7e93 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -64,6 +64,7 @@ public class SslTransportLayer implements TransportLayer {
private ByteBuffer netReadBuffer;
private ByteBuffer netWriteBuffer;
private ByteBuffer appReadBuffer;
+ private boolean hasBytesBuffered;
private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
@@ -503,13 +504,17 @@ public class SslTransportLayer implements TransportLayer {
read = readFromAppBuffer(dst);
}
+ boolean readFromNetwork = false;
boolean isClosed = false;
// Each loop reads at most once from the socket.
while (dst.remaining() > 0) {
int netread = 0;
netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
- if (netReadBuffer.remaining() > 0)
+ if (netReadBuffer.remaining() > 0) {
netread = readFromSocketChannel();
+ if (netread > 0)
+ readFromNetwork = true;
+ }
while (netReadBuffer.position() > 0) {
netReadBuffer.flip();
@@ -563,6 +568,7 @@ public class SslTransportLayer implements TransportLayer {
if (netread <= 0 || isClosed)
break;
}
+ updateBytesBuffered(readFromNetwork || read > 0);
// If data has been read and unwrapped, return the data even if end-of-stream, channel will be closed
// on a subsequent poll.
return read;
@@ -793,6 +799,11 @@ public class SslTransportLayer implements TransportLayer {
return netReadBuffer;
}
+ // Visibility for testing
+ protected ByteBuffer appReadBuffer() {
+ return appReadBuffer;
+ }
+
/**
* SSL exceptions are propagated as authentication failures so that clients can avoid
* retries and report the failure. If `flush` is true, exceptions are propagated after
@@ -826,12 +837,22 @@ public class SslTransportLayer implements TransportLayer {
@Override
public boolean hasBytesBuffered() {
- return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+ return hasBytesBuffered;
+ }
+
+ // Update `hasBytesBuffered` status. If any bytes were read from the network or
+ // if data was returned from read, `hasBytesBuffered` is set to true if any buffered
+ // data is still remaining. If not, `hasBytesBuffered` is set to false since no progress
+ // can be made until more data is available to read from the network.
+ private void updateBytesBuffered(boolean madeProgress) {
+ if (madeProgress)
+ hasBytesBuffered = netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+ else
+ hasBytesBuffered = false;
}
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, this);
}
-
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index 3673d21..a8a4b87 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -94,6 +94,7 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
/**
* @return true if channel has bytes to be read in any intermediate buffers
+ * which may be processed without reading additional data from the network.
*/
boolean hasBytesBuffered();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 1d78e5a..3bdb07a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -42,6 +42,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -96,6 +99,13 @@ public class SslSelectorTest extends SelectorTest {
connect(node, new InetSocketAddress("localhost", server.port));
selector.send(createSend(node, request));
+ waitForBytesBuffered(selector, node);
+
+ selector.close(node);
+ verifySelectorEmpty();
+ }
+
+ private void waitForBytesBuffered(Selector selector, String node) throws Exception {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
@@ -107,8 +117,72 @@ public class SslSelectorTest extends SelectorTest {
}
}
}, 2000L, "Failed to reach socket state with bytes buffered");
+ }
- selector.close(node);
+ @Test
+ public void testBytesBufferedChannelWithNoIncomingBytes() throws Exception {
+ verifyNoUnnecessaryPollWithBytesBuffered(key ->
+ key.interestOps(key.interestOps() & ~SelectionKey.OP_READ));
+ }
+
+ @Test
+ public void testBytesBufferedChannelAfterMute() throws Exception {
+ verifyNoUnnecessaryPollWithBytesBuffered(key -> ((KafkaChannel) key.attachment()).mute());
+ }
+
+ private void verifyNoUnnecessaryPollWithBytesBuffered(Consumer<SelectionKey> disableRead)
+ throws Exception {
+ this.selector.close();
+
+ String node1 = "1";
+ String node2 = "2";
+ final AtomicInteger node1Polls = new AtomicInteger();
+
+ this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
+ this.channelBuilder.configure(sslClientConfigs);
+ this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
+ @Override
+ void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
+ for (SelectionKey key : selectionKeys) {
+ KafkaChannel channel = (KafkaChannel) key.attachment();
+ if (channel != null && channel.id().equals(node1))
+ node1Polls.incrementAndGet();
+ }
+ super.pollSelectionKeys(selectionKeys, isImmediatelyConnected, currentTimeNanos);
+ }
+ };
+
+ // Get node1 into bytes buffered state and then disable read on the socket.
+ // Truncate the read buffers to ensure that there is buffered data, but not enough to make progress.
+ int largeRequestSize = 100 * 1024;
+ connect(node1, new InetSocketAddress("localhost", server.port));
+ selector.send(createSend(node1, TestUtils.randomString(largeRequestSize)));
+ waitForBytesBuffered(selector, node1);
+ TestSslChannelBuilder.TestSslTransportLayer.transportLayers.get(node1).truncateReadBuffer();
+ disableRead.accept(selector.channel(node1).selectionKey());
+
+ // Clear poll count and count the polls from now on
+ node1Polls.set(0);
+
+ // Process sends and receives on node2. Test verifies that we don't process node1
+ // unnecessarily on each of these polls.
+ connect(node2, new InetSocketAddress("localhost", server.port));
+ int received = 0;
+ String request = TestUtils.randomString(10);
+ selector.send(createSend(node2, request));
+ while (received < 100) {
+ received += selector.completedReceives().size();
+ if (!selector.completedSends().isEmpty()) {
+ selector.send(createSend(node2, request));
+ }
+ selector.poll(5);
+ }
+
+ // Verify that pollSelectionKeys was invoked once to process buffered data
+ // but not again since there isn't sufficient data to process.
+ assertEquals(1, node1Polls.get());
+ selector.close(node1);
+ selector.close(node2);
verifySelectorEmpty();
}
@@ -252,22 +326,33 @@ public class SslSelectorTest extends SelectorTest {
* TestSslTransportLayer will read from socket once every two tries. This increases
* the chance that there will be bytes buffered in the transport layer after read().
*/
- class TestSslTransportLayer extends SslTransportLayer {
+ static class TestSslTransportLayer extends SslTransportLayer {
+ static Map<String, TestSslTransportLayer> transportLayers = new HashMap<>();
boolean muteSocket = false;
public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
super(channelId, key, sslEngine);
+ transportLayers.put(channelId, this);
}
@Override
protected int readFromSocketChannel() throws IOException {
if (muteSocket) {
- muteSocket = false;
+ if ((selectionKey().interestOps() & SelectionKey.OP_READ) != 0)
+ muteSocket = false;
return 0;
}
muteSocket = true;
return super.readFromSocketChannel();
}
+
+ // Leave one byte in network read buffer so that some buffered bytes are present,
+ // but not enough to make progress on a read.
+ void truncateReadBuffer() throws Exception {
+ netReadBuffer().position(1);
+ appReadBuffer().position(0);
+ muteSocket = true;
+ }
}
}