You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/03/09 20:16:00 UTC
[nifi] branch main updated: NIFI-9761 Correct PeerChannel processing for TLS 1.3 (#5836)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c73573b NIFI-9761 Correct PeerChannel processing for TLS 1.3 (#5836)
c73573b is described below
commit c73573b325a6e370bb3031e6906c786bcbf72ebc
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Wed Mar 9 14:15:52 2022 -0600
NIFI-9761 Correct PeerChannel processing for TLS 1.3 (#5836)
* NIFI-9761 Corrected PeerChannel processing for TLS 1.3
- Added TestPeerChannel with methods for TLS 1.2 and TLS 1.3
- Updated PeerChannel.close() to process SSLEngine close notification
- Improved logging and corrected handling after decryption
---
.../nifi-framework/nifi-framework-core/pom.xml | 5 +
.../clustered/client/async/nio/PeerChannel.java | 169 ++++++++++----
.../client/async/nio/TestPeerChannel.java | 254 +++++++++++++++++++++
3 files changed, 385 insertions(+), 43 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index 7a39b22..1eb6665 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -265,6 +265,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.1</version>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/PeerChannel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/PeerChannel.java
index 67afb4a..5bee319 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/PeerChannel.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/client/async/nio/PeerChannel.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -30,6 +31,10 @@ import java.nio.channels.SocketChannel;
import java.util.OptionalInt;
public class PeerChannel implements Closeable {
+ private static final int END_OF_FILE = -1;
+
+ private static final int EMPTY_BUFFER = 0;
+
private static final Logger logger = LoggerFactory.getLogger(PeerChannel.class);
private final SocketChannel socketChannel;
@@ -38,7 +43,7 @@ public class PeerChannel implements Closeable {
private final ByteBuffer singleByteBuffer = ByteBuffer.allocate(1);
private ByteBuffer destinationBuffer = ByteBuffer.allocate(16 * 1024); // buffer that SSLEngine is to write into
- private ByteBuffer streamBuffer = ByteBuffer.allocate(16 * 1024); // buffer for data that is read from SocketChannel
+ private final ByteBuffer streamBuffer = ByteBuffer.allocate(16 * 1024); // buffer for data that is read from SocketChannel
private ByteBuffer applicationBuffer = ByteBuffer.allocate(0); // buffer for application-level data that is ready to be served up (i.e., already decrypted if necessary)
public PeerChannel(final SocketChannel socketChannel, final SSLEngine sslEngine, final String peerDescription) {
@@ -47,10 +52,45 @@ public class PeerChannel implements Closeable {
this.peerDescription = peerDescription;
}
-
+ /**
+ * Close Socket Channel and process SSLEngine close notifications when configured
+ *
+ * @throws IOException Thrown on failure to close Socket Channel or process SSLEngine operations
+ */
@Override
public void close() throws IOException {
- socketChannel.close();
+ try {
+ if (sslEngine == null) {
+ logger.debug("Closing Peer Channel [{}] SSLEngine not configured", peerDescription);
+ } else {
+ logger.debug("Closing Peer Channel [{}] SSLEngine close started", peerDescription);
+ sslEngine.closeOutbound();
+
+ // Send TLS close notification packets available after initiating SSLEngine.closeOutbound()
+ final ByteBuffer inputBuffer = ByteBuffer.allocate(0);
+ final ByteBuffer outputBuffer = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
+
+ SSLEngineResult wrapResult = sslEngine.wrap(inputBuffer, outputBuffer);
+ SSLEngineResult.Status status = wrapResult.getStatus();
+ outputBuffer.flip();
+ if (SSLEngineResult.Status.OK == status) {
+ write(outputBuffer);
+ outputBuffer.clear();
+ wrapResult = sslEngine.wrap(inputBuffer, outputBuffer);
+ status = wrapResult.getStatus();
+ }
+ if (SSLEngineResult.Status.CLOSED == status) {
+ write(outputBuffer);
+ } else {
+ throw new SSLException(String.format("Closing Peer Channel [%s] Invalid Wrap Result Status [%s]", peerDescription, status));
+ }
+
+ logger.debug("Closing Peer Channel [{}] SSLEngine close completed", peerDescription);
+ }
+ } finally {
+ logger.debug("Closing Peer Channel [{}] Socket Channel close started", peerDescription);
+ socketChannel.close();
+ }
}
public boolean isConnected() {
@@ -65,6 +105,13 @@ public class PeerChannel implements Closeable {
return peerDescription;
}
+ /**
+ * Write one byte to the channel
+ *
+ * @param b Byte to be written
+ * @return Status of write operation returns true on success
+ * @throws IOException Thrown on failure to write to the Socket Channel
+ */
public boolean write(final byte b) throws IOException {
singleByteBuffer.clear();
singleByteBuffer.put(b);
@@ -75,13 +122,18 @@ public class PeerChannel implements Closeable {
return bytesWritten > 0;
}
+ /**
+ * Read one byte as an unsigned integer from the channel
+ *
+ * @return Returns empty when zero bytes are available and returns negative one when the channel is closed
+ * @throws IOException Thrown on failure to read from Socket Channel
+ */
public OptionalInt read() throws IOException {
singleByteBuffer.clear();
final int bytesRead = read(singleByteBuffer);
if (bytesRead < 0) {
- return OptionalInt.of(-1);
- }
- if (bytesRead == 0) {
+ return OptionalInt.of(END_OF_FILE);
+ } else if (bytesRead == EMPTY_BUFFER) {
return OptionalInt.empty();
}
@@ -91,9 +143,6 @@ public class PeerChannel implements Closeable {
return OptionalInt.of(read & 0xFF);
}
-
-
-
/**
* Reads the given ByteBuffer of data and returns a new ByteBuffer (which is "flipped" / ready to be read). The newly returned
* ByteBuffer will be written to be written via the {@link #write(ByteBuffer)} method. I.e., it will have already been encrypted, if
@@ -104,11 +153,11 @@ public class PeerChannel implements Closeable {
* @throws IOException if a failure occurs while encrypting the data
*/
public ByteBuffer prepareForWrite(final ByteBuffer plaintext) throws IOException {
+ logger.trace("Channel [{}] Buffer wrap started: Input Bytes [{}]", peerDescription, plaintext.remaining());
if (sslEngine == null) {
return plaintext;
}
-
ByteBuffer prepared = ByteBuffer.allocate(Math.min(85, plaintext.capacity() - plaintext.position()));
while (plaintext.hasRemaining()) {
encrypt(plaintext);
@@ -125,14 +174,28 @@ public class PeerChannel implements Closeable {
}
prepared.flip();
+ logger.trace("Channel [{}] Buffer wrap completed: Prepared Bytes [{}]", peerDescription, prepared.remaining());
return prepared;
}
+ /**
+ * Write prepared buffer to Socket Channel
+ *
+ * @param preparedBuffer Buffer must contain bytes processed through prepareForWrite() when TLS is enabled
+ * @return Number of bytes written according to SocketChannel.write()
+ * @throws IOException Thrown on failure to write to the Socket Channel
+ */
public int write(final ByteBuffer preparedBuffer) throws IOException {
return socketChannel.write(preparedBuffer);
}
-
+ /**
+ * Read application data bytes into the provided buffer
+ *
+ * @param dst Buffer to be populated with application data bytes
+ * @return Number of bytes read into the provided buffer
+ * @throws IOException Thrown on failure to read from the Socket Channel
+ */
public int read(final ByteBuffer dst) throws IOException {
// If we have data ready to go, then go ahead and copy it.
final int bytesCopied = copy(applicationBuffer, dst);
@@ -141,12 +204,11 @@ public class PeerChannel implements Closeable {
}
final int bytesRead = socketChannel.read(streamBuffer);
- if (bytesRead < 1) {
- return bytesRead;
- }
-
- if (bytesRead > 0) {
- logger.trace("Read {} bytes from SocketChannel", bytesRead);
+ logger.trace("Channel [{}] Socket read completed: bytes [{}]", peerDescription, bytesRead);
+ if (bytesRead == END_OF_FILE) {
+ return END_OF_FILE;
+ } else if (streamBuffer.remaining() == EMPTY_BUFFER) {
+ return EMPTY_BUFFER;
}
streamBuffer.flip();
@@ -157,7 +219,7 @@ public class PeerChannel implements Closeable {
return copy(applicationBuffer, dst);
} else {
final boolean decrypted = decrypt(streamBuffer);
- logger.trace("Decryption after reading those bytes successful = {}", decrypted);
+ logger.trace("Channel [{}] Decryption completed [{}]", peerDescription, decrypted);
if (decrypted) {
cloneToApplicationBuffer(destinationBuffer);
@@ -167,9 +229,8 @@ public class PeerChannel implements Closeable {
} else {
// Not enough data to decrypt. Compact the buffer so that we keep the data we have
// but prepare the buffer to be written to again.
- logger.debug("Not enough data to decrypt. Will need to consume more data before decrypting");
- streamBuffer.compact();
- return 0;
+ logger.trace("Channel [{}] Socket Channel read required", peerDescription);
+ return EMPTY_BUFFER;
}
}
} finally {
@@ -219,6 +280,7 @@ public class PeerChannel implements Closeable {
while (true) {
final SSLEngineResult result = sslEngine.wrap(plaintext, destinationBuffer);
+ logOperationResult("WRAP", result);
switch (result.getStatus()) {
case OK:
@@ -240,9 +302,6 @@ public class PeerChannel implements Closeable {
}
}
-
-
-
/**
* Attempts to decrypt the given buffer of data, writing the result into {@link #destinationBuffer}. If successful, will return <code>true</code>.
* If more data is needed in order to perform the decryption, will return <code>false</code>.
@@ -260,15 +319,21 @@ public class PeerChannel implements Closeable {
while (true) {
final SSLEngineResult result = sslEngine.unwrap(encrypted, destinationBuffer);
+ logOperationResult("UNWRAP", result);
switch (result.getStatus()) {
case OK:
+ if (SSLEngineResult.HandshakeStatus.FINISHED == result.getHandshakeStatus()) {
+ // RFC 8446 Section 4.6 describes Post-Handshake Messages for TLS 1.3
+ // Break out of switch statement to call SSLEngine.unwrap() again
+ break;
+ }
destinationBuffer.flip();
return true;
case CLOSED:
throw new IOException("Failed to decrypt data from Peer " + peerDescription + " because Peer unexpectedly closed connection");
case BUFFER_OVERFLOW:
- // ecnryptedBuffer is not large enough. Need to increase the size.
+ // encryptedBuffer is not large enough. Need to increase the size.
final ByteBuffer tempBuffer = ByteBuffer.allocate(encrypted.position() + sslEngine.getSession().getApplicationBufferSize());
destinationBuffer.flip();
tempBuffer.put(destinationBuffer);
@@ -282,7 +347,11 @@ public class PeerChannel implements Closeable {
}
}
-
+ /**
+ * Perform TLS handshake when SSLEngine configured
+ *
+ * @throws IOException Thrown on failure to handle socket communication or TLS packet processing
+ */
public void performHandshake() throws IOException {
if (sslEngine == null) {
return;
@@ -295,18 +364,17 @@ public class PeerChannel implements Closeable {
while (true) {
final SSLEngineResult.HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+ logHandshakeStatus(handshakeStatus);
switch (handshakeStatus) {
case FINISHED:
case NOT_HANDSHAKING:
streamBuffer.clear();
destinationBuffer.clear();
- logger.debug("Completed SSL Handshake with Peer {}", peerDescription);
+ logHandshakeCompleted();
return;
case NEED_TASK:
- logger.debug("SSL Handshake with Peer {} Needs Task", peerDescription);
-
Runnable runnable;
while ((runnable = sslEngine.getDelegatedTask()) != null) {
runnable.run();
@@ -314,27 +382,22 @@ public class PeerChannel implements Closeable {
break;
case NEED_WRAP:
- logger.trace("SSL Handshake with Peer {} Needs Wrap", peerDescription);
-
encrypt(emptyMessage);
final int bytesWritten = write(destinationBuffer);
- logger.debug("Wrote {} bytes for NEED_WRAP portion of Handshake", bytesWritten);
+ logHandshakeStatusBytes(handshakeStatus, "Socket write completed", bytesWritten);
break;
case NEED_UNWRAP:
- logger.trace("SSL Handshake with Peer {} Needs Unwrap", peerDescription);
-
while (sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
final boolean decrypted = decrypt(unwrapBuffer);
- if (decrypted) {
- logger.trace("Decryption was successful for NEED_UNWRAP portion of Handshake");
+ final SSLEngineResult.HandshakeStatus unwrapHandshakeStatus = sslEngine.getHandshakeStatus();
+ if (decrypted || unwrapHandshakeStatus == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+ logHandshakeStatus(unwrapHandshakeStatus, "Decryption completed");
break;
}
if (unwrapBuffer.capacity() - unwrapBuffer.position() < 1) {
- logger.trace("Enlarging size of Buffer for NEED_UNWRAP portion of Handshake");
-
- // destinationBuffer is not large enough. Need to increase the size.
+ logHandshakeStatus(unwrapHandshakeStatus, "Increasing unwrap buffer for decryption");
final ByteBuffer tempBuffer = ByteBuffer.allocate(unwrapBuffer.capacity() + sslEngine.getSession().getApplicationBufferSize());
tempBuffer.put(unwrapBuffer);
unwrapBuffer = tempBuffer;
@@ -342,17 +405,37 @@ public class PeerChannel implements Closeable {
continue;
}
- logger.trace("Need to read more bytes for NEED_UNWRAP portion of Handshake");
-
- // Need to read more data.
+ logHandshakeStatus(unwrapHandshakeStatus, "Socket read started");
unwrapBuffer.compact();
final int bytesRead = socketChannel.read(unwrapBuffer);
unwrapBuffer.flip();
- logger.debug("Read {} bytes for NEED_UNWRAP portion of Handshake", bytesRead);
+
+ logHandshakeStatusBytes(unwrapHandshakeStatus, "Socket read completed", bytesRead);
}
break;
}
}
}
+
+ private void logOperationResult(final String operation, final SSLEngineResult sslEngineResult) {
+ logger.trace("Channel [{}] {} [{}]", peerDescription, operation, sslEngineResult);
+ }
+
+ private void logHandshakeCompleted() {
+ final SSLSession sslSession = sslEngine.getSession();
+ logger.debug("Channel [{}] Handshake Completed Protocol [{}] Cipher Suite [{}]", peerDescription, sslSession.getProtocol(), sslSession.getCipherSuite());
+ }
+
+ private void logHandshakeStatus(final SSLEngineResult.HandshakeStatus handshakeStatus) {
+ logger.debug("Channel [{}] Handshake Status [{}]", peerDescription, handshakeStatus);
+ }
+
+ private void logHandshakeStatus(final SSLEngineResult.HandshakeStatus handshakeStatus, final String operation) {
+ logger.debug("Channel [{}] Handshake Status [{}] {}", peerDescription, handshakeStatus, operation);
+ }
+
+ private void logHandshakeStatusBytes(final SSLEngineResult.HandshakeStatus handshakeStatus, final String operation, final int bytes) {
+ logger.debug("Channel [{}] Handshake Status [{}] {} Bytes [{}]", peerDescription, handshakeStatus, operation, bytes);
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestPeerChannel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestPeerChannel.java
new file mode 100644
index 0000000..888dcbf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestPeerChannel.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue.clustered.client.async.nio;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.security.util.TlsPlatform;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIf;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.GeneralSecurityException;
+import java.util.OptionalInt;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestPeerChannel {
+ private static final String LOCALHOST = "localhost";
+
+ private static final int GROUP_THREADS = 1;
+
+ private static final boolean CLIENT_CHANNEL = true;
+
+ private static final boolean SERVER_CHANNEL = false;
+
+ private static final long READ_SLEEP_INTERVAL = 500;
+
+ private static final int CHANNEL_TIMEOUT = 15000;
+
+ private static final int SOCKET_TIMEOUT = 5000;
+
+ private static final long SHUTDOWN_TIMEOUT = 100;
+
+ private static final String TLS_1_3 = "TLSv1.3";
+
+ private static final String TLS_1_2 = "TLSv1.2";
+
+ private static final String TLS_1_3_SUPPORTED = "isTls13Supported";
+
+ private static final int PROTOCOL_VERSION = 1;
+
+ private static final int VERSION_ACCEPTED = 0x10;
+
+ private static SSLContext sslContext;
+
+ public static boolean isTls13Supported() {
+ return TlsPlatform.getSupportedProtocols().contains(TLS_1_3);
+ }
+
+ @BeforeAll
+ public static void setConfiguration() throws GeneralSecurityException {
+ final TlsConfiguration tlsConfiguration = new TemporaryKeyStoreBuilder().build();
+ sslContext = SslContextFactory.createSslContext(tlsConfiguration);
+ }
+
+ @Test
+ @Timeout(value = CHANNEL_TIMEOUT, unit = TimeUnit.MILLISECONDS)
+ public void testConnectedClose() throws IOException {
+ final String enabledProtocol = getEnabledProtocol();
+
+ processChannel(enabledProtocol, peerChannel -> {});
+ }
+
+ @Test
+ @Timeout(value = CHANNEL_TIMEOUT, unit = TimeUnit.MILLISECONDS)
+ public void testConnectedWriteReadCloseTls12() throws IOException {
+ assertWriteReadSuccess(TLS_1_2);
+ }
+
+ @EnabledIf(TLS_1_3_SUPPORTED)
+ @Test
+ @Timeout(value = CHANNEL_TIMEOUT, unit = TimeUnit.MILLISECONDS)
+ public void testConnectedWriteReadCloseTls13() throws IOException {
+ assertWriteReadSuccess(TLS_1_3);
+ }
+
+ private void assertWriteReadSuccess(final String enabledProtocol) throws IOException {
+ processChannel(enabledProtocol, peerChannel -> {
+ try {
+ peerChannel.performHandshake();
+
+ final byte[] version = new byte[]{PROTOCOL_VERSION};
+ final ByteBuffer versionBuffer = ByteBuffer.wrap(version);
+ final ByteBuffer encryptedVersionBuffer = peerChannel.prepareForWrite(versionBuffer);
+ peerChannel.write(encryptedVersionBuffer);
+
+ final int firstByteRead = read(peerChannel);
+ assertEquals(PROTOCOL_VERSION, firstByteRead, "Peer Channel first byte read not matched");
+
+ final byte[] versionAccepted = new byte[]{VERSION_ACCEPTED};
+ final ByteBuffer versionAcceptedBuffer = ByteBuffer.wrap(versionAccepted);
+ final ByteBuffer encryptedVersionAcceptedBuffer = peerChannel.prepareForWrite(versionAcceptedBuffer);
+ peerChannel.write(encryptedVersionAcceptedBuffer);
+
+ final int secondByteRead = read(peerChannel);
+ assertEquals(VERSION_ACCEPTED, secondByteRead, "Peer Channel second byte read not matched");
+ } catch (final IOException e) {
+ throw new UncheckedIOException(String.format("Channel Failed for %s", enabledProtocol), e);
+ }
+ });
+ }
+
+ private int read(final PeerChannel peerChannel) throws IOException {
+ OptionalInt read = peerChannel.read();
+ while (!read.isPresent()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(READ_SLEEP_INTERVAL);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Peer Channel read sleep interrupted", e);
+ }
+ read = peerChannel.read();
+ }
+ return read.getAsInt();
+ }
+
+ private void processChannel(final String enabledProtocol, final Consumer<PeerChannel> channelConsumer) throws IOException {
+ final EventLoopGroup group = new NioEventLoopGroup(GROUP_THREADS);
+
+ try (final SocketChannel socketChannel = SocketChannel.open()) {
+ final Socket socket = socketChannel.socket();
+ socket.setSoTimeout(SOCKET_TIMEOUT);
+
+ final InetSocketAddress serverSocketAddress = getServerSocketAddress();
+ startServer(group, serverSocketAddress.getPort(), enabledProtocol);
+
+ socketChannel.connect(serverSocketAddress);
+ final SSLEngine sslEngine = createSslEngine(enabledProtocol, CLIENT_CHANNEL);
+
+ final PeerChannel peerChannel = new PeerChannel(socketChannel, sslEngine, serverSocketAddress.toString());
+ assertConnectedOpen(peerChannel);
+
+ socketChannel.configureBlocking(false);
+ channelConsumer.accept(peerChannel);
+
+ peerChannel.close();
+ assertNotConnectedNotOpen(peerChannel);
+ } finally {
+ shutdownGroup(group);
+ }
+ }
+
+ private void assertConnectedOpen(final PeerChannel peerChannel) {
+ assertTrue(peerChannel.isConnected(), "Channel not connected");
+ assertTrue(peerChannel.isOpen(), "Channel not open");
+ }
+
+ private void assertNotConnectedNotOpen(final PeerChannel peerChannel) {
+ assertFalse(peerChannel.isConnected(), "Channel connected");
+ assertFalse(peerChannel.isOpen(), "Channel open");
+ }
+
+ private void startServer(final EventLoopGroup group, final int port, final String enabledProtocol) {
+ final ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioServerSocketChannel.class);
+ bootstrap.childHandler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(final Channel channel) {
+ final ChannelPipeline pipeline = channel.pipeline();
+ final SSLEngine sslEngine = createSslEngine(enabledProtocol, SERVER_CHANNEL);
+ setPipelineHandlers(pipeline, sslEngine);
+ pipeline.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
+ private int protocolVersion;
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
+ if (byteBuf.readableBytes() == 1) {
+ final int read = byteBuf.readByte();
+ if (PROTOCOL_VERSION == read) {
+ protocolVersion = read;
+ channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{PROTOCOL_VERSION}));
+ } else if (protocolVersion == PROTOCOL_VERSION) {
+ channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{VERSION_ACCEPTED}));
+ } else {
+ throw new SocketException(String.format("Unexpected Integer [%d] read", read));
+ }
+ }
+ }
+ });
+ }
+ });
+
+ final ChannelFuture bindFuture = bootstrap.bind(LOCALHOST, port);
+ bindFuture.syncUninterruptibly();
+ }
+
+ private SSLEngine createSslEngine(final String enabledProtocol, final boolean useClientMode) {
+ final SSLEngine sslEngine = sslContext.createSSLEngine();
+ sslEngine.setUseClientMode(useClientMode);
+ sslEngine.setEnabledProtocols(new String[]{enabledProtocol});
+ return sslEngine;
+ }
+
+
+ private void setPipelineHandlers(final ChannelPipeline pipeline, final SSLEngine sslEngine) {
+ pipeline.addLast(new SslHandler(sslEngine));
+ }
+
+ private void shutdownGroup(final EventLoopGroup group) {
+ group.shutdownGracefully(SHUTDOWN_TIMEOUT, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS).syncUninterruptibly();
+ }
+
+ private InetSocketAddress getServerSocketAddress() {
+ final int port = NetworkUtils.getAvailableTcpPort();
+ return new InetSocketAddress(LOCALHOST, port);
+ }
+
+ private String getEnabledProtocol() {
+ return isTls13Supported() ? TLS_1_3 : TLS_1_2;
+ }
+}