You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/06/20 08:44:41 UTC

[kafka] branch 1.1 updated: KAFKA-7012: Don't process SSL channels without data to process (#5237)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 2ea3e55  KAFKA-7012: Don't process SSL channels without data to process (#5237)
2ea3e55 is described below

commit 2ea3e55a162cd0bc7df25633a59064b1c7e2f170
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue Jun 19 15:16:16 2018 +0100

    KAFKA-7012: Don't process SSL channels without data to process (#5237)
    
    Avoid unnecessary processing of SSL channels when there are some bytes buffered, but not enough to make progress.
    
    Reviewers: Radai Rosenblatt <ra...@gmail.com>, Jun Rao <ju...@gmail.com>
---
 .../org/apache/kafka/common/network/Selector.java  |  4 +-
 .../kafka/common/network/SslTransportLayer.java    | 27 ++++++-
 .../kafka/common/network/TransportLayer.java       |  1 +
 .../kafka/common/network/SslSelectorTest.java      | 94 +++++++++++++++++++++-
 4 files changed, 119 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 1b2d1a2..be02b5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -498,7 +498,9 @@ public class Selector implements Selectable, AutoCloseable {
                     //this channel has bytes enqueued in intermediary buffers that we could not read
                     //(possibly because no memory). it may be the case that the underlying socket will
                     //not come up in the next poll() and so we need to remember this channel for the
-                    //next poll call otherwise data may be stuck in said buffers forever.
+                    //next poll call otherwise data may be stuck in said buffers forever. If we attempt
+                    //to process buffered data and no progress is made, the channel buffered status is
+                    //cleared to avoid the overhead of checking every time.
                     keysWithBufferedRead.add(key);
                 }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 49f1d66..31cf50f 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -65,6 +65,7 @@ public class SslTransportLayer implements TransportLayer {
     private ByteBuffer netReadBuffer;
     private ByteBuffer netWriteBuffer;
     private ByteBuffer appReadBuffer;
+    private boolean hasBytesBuffered;
     private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
     public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
@@ -501,13 +502,17 @@ public class SslTransportLayer implements TransportLayer {
             read = readFromAppBuffer(dst);
         }
 
+        boolean readFromNetwork = false;
         boolean isClosed = false;
         // Each loop reads at most once from the socket.
         while (dst.remaining() > 0) {
             int netread = 0;
             netReadBuffer = Utils.ensureCapacity(netReadBuffer, netReadBufferSize());
-            if (netReadBuffer.remaining() > 0)
+            if (netReadBuffer.remaining() > 0) {
                 netread = readFromSocketChannel();
+                if (netread > 0)
+                    readFromNetwork = true;
+            }
 
             while (netReadBuffer.position() > 0) {
                 netReadBuffer.flip();
@@ -561,6 +566,7 @@ public class SslTransportLayer implements TransportLayer {
             if (netread <= 0 || isClosed)
                 break;
         }
+        updateBytesBuffered(readFromNetwork || read > 0);
         // If data has been read and unwrapped, return the data even if end-of-stream, channel will be closed
         // on a subsequent poll.
         return read;
@@ -791,6 +797,11 @@ public class SslTransportLayer implements TransportLayer {
         return netReadBuffer;
     }
 
+    // Visibility for testing
+    protected ByteBuffer appReadBuffer() {
+        return appReadBuffer;
+    }
+
     /**
      * SSL exceptions are propagated as authentication failures so that clients can avoid
      * retries and report the failure. If `flush` is true, exceptions are propagated after
@@ -824,12 +835,22 @@ public class SslTransportLayer implements TransportLayer {
 
     @Override
     public boolean hasBytesBuffered() {
-        return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+        return hasBytesBuffered;
+    }
+
+    // Update `hasBytesBuffered` status. If any bytes were read from the network or
+    // if data was returned from read, `hasBytesBuffered` is set to true if any buffered
+    // data is still remaining. If not, `hasBytesBuffered` is set to false since no progress
+    // can be made until more data is available to read from the network.
+    private void updateBytesBuffered(boolean madeProgress) {
+        if (madeProgress)
+            hasBytesBuffered = netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+        else
+            hasBytesBuffered = false;
     }
 
     @Override
     public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
         return fileChannel.transferTo(position, count, this);
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index 3673d21..a8a4b87 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -94,6 +94,7 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
 
     /**
      * @return true if channel has bytes to be read in any intermediate buffers
+     * which may be processed without reading additional data from the network.
      */
     boolean hasBytesBuffered();
 
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 1d78e5a..8198493 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -42,6 +42,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -96,6 +98,13 @@ public class SslSelectorTest extends SelectorTest {
         connect(node, new InetSocketAddress("localhost", server.port));
         selector.send(createSend(node, request));
 
+        waitForBytesBuffered(selector, node);
+
+        selector.close(node);
+        verifySelectorEmpty();
+    }
+
+    private void waitForBytesBuffered(final Selector selector, final String node) throws Exception {
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
@@ -107,8 +116,76 @@ public class SslSelectorTest extends SelectorTest {
                 }
             }
         }, 2000L, "Failed to reach socket state with bytes buffered");
+    }
 
-        selector.close(node);
+    @Test
+    public void testBytesBufferedChannelWithNoIncomingBytes() throws Exception {
+        verifyNoUnnecessaryPollWithBytesBuffered(false);
+    }
+
+    @Test
+    public void testBytesBufferedChannelAfterMute() throws Exception {
+        verifyNoUnnecessaryPollWithBytesBuffered(true);
+    }
+
+    private void verifyNoUnnecessaryPollWithBytesBuffered(boolean explicitlyMute) throws Exception {
+        this.selector.close();
+
+        final String node1 = "1";
+        String node2 = "2";
+        final AtomicInteger node1Polls = new AtomicInteger();
+
+        this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
+        this.channelBuilder.configure(sslClientConfigs);
+        this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()) {
+            @Override
+            void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
+                for (SelectionKey key : selectionKeys) {
+                    KafkaChannel channel = (KafkaChannel) key.attachment();
+                    if (channel != null && channel.id().equals(node1))
+                        node1Polls.incrementAndGet();
+                }
+                super.pollSelectionKeys(selectionKeys, isImmediatelyConnected, currentTimeNanos);
+            }
+        };
+
+        // Get node1 into bytes buffered state and then disable read on the socket.
+        // Truncate the read buffers to ensure that there is buffered data, but not enough to make progress.
+        int largeRequestSize = 100 * 1024;
+        connect(node1, new InetSocketAddress("localhost", server.port));
+        selector.send(createSend(node1,  TestUtils.randomString(largeRequestSize)));
+        waitForBytesBuffered(selector, node1);
+        TestSslChannelBuilder.TestSslTransportLayer.transportLayers.get(node1).truncateReadBuffer();
+        KafkaChannel channel1 = selector.channel(node1);
+        if (explicitlyMute) {
+            channel1.mute();
+        } else {
+            SelectionKey key = channel1.selectionKey();
+            key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+        }
+
+        // Clear poll count and count the polls from now on
+        node1Polls.set(0);
+
+        // Process sends and receives on node2. Test verifies that we don't process node1
+        // unnecessarily on each of these polls.
+        connect(node2, new InetSocketAddress("localhost", server.port));
+        int received = 0;
+        String request = TestUtils.randomString(10);
+        selector.send(createSend(node2, request));
+        while (received < 100) {
+            received += selector.completedReceives().size();
+            if (!selector.completedSends().isEmpty()) {
+                selector.send(createSend(node2, request));
+            }
+            selector.poll(5);
+        }
+
+        // Verify that pollSelectionKeys was invoked once to process buffered data
+        // but not again since there isn't sufficient data to process.
+        assertEquals(1, node1Polls.get());
+        selector.close(node1);
+        selector.close(node2);
         verifySelectorEmpty();
     }
 
@@ -252,22 +329,33 @@ public class SslSelectorTest extends SelectorTest {
          * TestSslTransportLayer will read from socket once every two tries. This increases
          * the chance that there will be bytes buffered in the transport layer after read().
          */
-        class TestSslTransportLayer extends SslTransportLayer {
+        static class TestSslTransportLayer extends SslTransportLayer {
+            static Map<String, TestSslTransportLayer> transportLayers = new HashMap<>();
             boolean muteSocket = false;
 
             public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
                 super(channelId, key, sslEngine);
+                transportLayers.put(channelId, this);
             }
 
             @Override
             protected int readFromSocketChannel() throws IOException {
                 if (muteSocket) {
-                    muteSocket = false;
+                    if ((selectionKey().interestOps() & SelectionKey.OP_READ) != 0)
+                        muteSocket = false;
                     return 0;
                 }
                 muteSocket = true;
                 return super.readFromSocketChannel();
             }
+
+            // Leave one byte in network read buffer so that some buffered bytes are present,
+            // but not enough to make progress on a read.
+            void truncateReadBuffer() throws Exception {
+                netReadBuffer().position(1);
+                appReadBuffer().position(0);
+                muteSocket = true;
+            }
         }
     }