You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/09/29 06:10:13 UTC

[kafka] branch 2.0 updated: KAFKA-7454: Use lazy allocation for SslTransportLayer buffers and null them on close (#5713)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new f256dfd  KAFKA-7454: Use lazy allocation for SslTransportLayer buffers and null them on close (#5713)
f256dfd is described below

commit f256dfdaad90545f9e1f57c678c7247568110bee
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Sat Sep 29 07:09:10 2018 +0100

    KAFKA-7454: Use lazy allocation for SslTransportLayer buffers and null them on close (#5713)
    
    Lazy allocation helps when there are a large number of connections
    that have been accepted, but where no data has been received from
    the clients. Each buffer is often around 16k (max TLS record size).
    
    Nulling the buffers should not make a difference in the current
    implementation since we release the reference to the channel
    and transport layer after we close them, but it's a good practice
    to release medium/large buffers after `close` is called.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 .../apache/kafka/common/network/SslTransportLayer.java   | 16 +++++++++++-----
 .../org/apache/kafka/common/network/SslSelectorTest.java |  1 -
 .../kafka/common/network/SslTransportLayerTest.java      |  7 ++++++-
 3 files changed, 17 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 08a39e7..5dc8086 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
  */
 public class SslTransportLayer implements TransportLayer {
     private enum State {
+        NOT_INITALIZED,
         HANDSHAKE,
         HANDSHAKE_FAILED,
         READY,
@@ -70,9 +71,7 @@ public class SslTransportLayer implements TransportLayer {
     private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
     public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
-        SslTransportLayer transportLayer = new SslTransportLayer(channelId, key, sslEngine);
-        transportLayer.startHandshake();
-        return transportLayer;
+        return new SslTransportLayer(channelId, key, sslEngine);
     }
 
     // Prefer `create`, only use this in tests
@@ -81,6 +80,7 @@ public class SslTransportLayer implements TransportLayer {
         this.key = key;
         this.socketChannel = (SocketChannel) key.channel();
         this.sslEngine = sslEngine;
+        this.state = State.NOT_INITALIZED;
 
         final LogContext logContext = new LogContext(String.format("[SslTransportLayer channelId=%s key=%s] ", channelId, key));
         this.log = logContext.logger(getClass());
@@ -88,7 +88,7 @@ public class SslTransportLayer implements TransportLayer {
 
     // Visible for testing
     protected void startHandshake() throws IOException {
-        if (state != null)
+        if (state != State.NOT_INITALIZED)
             throw new IllegalStateException("startHandshake() can only be called once, state " + state);
 
         this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
@@ -156,11 +156,12 @@ public class SslTransportLayer implements TransportLayer {
     */
     @Override
     public void close() throws IOException {
+        State prevState = state;
         if (state == State.CLOSING) return;
         state = State.CLOSING;
         sslEngine.closeOutbound();
         try {
-            if (isConnected()) {
+            if (prevState != State.NOT_INITALIZED && isConnected()) {
                 if (!flush(netWriteBuffer)) {
                     throw new IOException("Remaining data in the network buffer, can't send SSL close message.");
                 }
@@ -181,6 +182,9 @@ public class SslTransportLayer implements TransportLayer {
         } finally {
             socketChannel.socket().close();
             socketChannel.close();
+            netReadBuffer = null;
+            netWriteBuffer = null;
+            appReadBuffer = null;
         }
     }
 
@@ -242,6 +246,8 @@ public class SslTransportLayer implements TransportLayer {
     */
     @Override
     public void handshake() throws IOException {
+        if (state == State.NOT_INITALIZED)
+            startHandshake();
         if (state == State.READY)
             throw renegotiationException();
         if (state == State.CLOSING)
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 3bdb07a..1f9739b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -318,7 +318,6 @@ public class SslSelectorTest extends SelectorTest {
             SocketChannel socketChannel = (SocketChannel) key.channel();
             SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort());
             TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine);
-            transportLayer.startHandshake();
             return transportLayer;
         }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index d70a448..ca80dd9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -1071,7 +1071,6 @@ public class SslTransportLayerTest {
             SocketChannel socketChannel = (SocketChannel) key.channel();
             SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort());
             TestSslTransportLayer transportLayer = newTransportLayer(id, key, sslEngine);
-            transportLayer.startHandshake();
             return transportLayer;
         }
 
@@ -1146,6 +1145,12 @@ public class SslTransportLayerTest {
                 return super.flush(buf);
             }
 
+            @Override
+            protected void startHandshake() throws IOException {
+                assertTrue("SSL handshake initialized too early", socketChannel().isConnected());
+                super.startHandshake();
+            }
+
             private void resetDelayedFlush() {
                 numDelayedFlushesRemaining.set(flushDelayCount);
             }