You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2022/04/26 21:54:09 UTC
[geode] branch support/1.14 updated: GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message (#7449) (#7615)
This is an automated email from the ASF dual-hosted git repository.
burcham pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push:
new 07c08e9502 GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message (#7449) (#7615)
07c08e9502 is described below
commit 07c08e95025ff955c9b361db4b97902ce722be81
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Apr 26 14:54:03 2022 -0700
GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message (#7449) (#7615)
* Key expiration works for TLSv1.3 and GCM-based ciphers
* TLS KeyUpdate messages are processed correctly
* Removed dependencies on: Mockito 4, JUnit 5, GeodeParamsRunner
(cherry picked from commit d2535394a82ac5faf10f004f4e3c15f756f7b177)
---
.../internal/P2PMessagingConcurrencyDUnitTest.java | 2 +-
...P2pMessagingSslTlsKeyUpdateDistributedTest.java | 367 +++++++++++++++
.../tcp/ConnectionCloseSSLTLSDUnitTest.java | 8 +-
.../internal/net/NioSslEngineKeyUpdateTest.java | 497 +++++++++++++++++++++
.../apache/geode/internal/net/NioSslEngine.java | 67 +--
.../org/apache/geode/internal/tcp/Connection.java | 2 +-
.../geode/internal/net/NioSslEngineTest.java | 38 +-
7 files changed, 934 insertions(+), 47 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
index 0d7c2d389f..326e1c2ac4 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -167,7 +167,6 @@ public class P2PMessagingConcurrencyDUnitTest {
bytesTransferredAdder = new LongAdder();
final ClusterDistributionManager cdm = getCDM();
- final Random random = new Random(RANDOM_SEED);
final AtomicInteger nextSenderId = new AtomicInteger();
/*
@@ -194,6 +193,7 @@ public class P2PMessagingConcurrencyDUnitTest {
throw new RuntimeException("doSending failed", e);
}
final int firstMessageId = senderId * SENDER_COUNT;
+ final Random random = new Random(RANDOM_SEED);
for (int messageId = firstMessageId; messageId < firstMessageId
+ MESSAGES_PER_SENDER; messageId++) {
final TestMessage msg = new TestMessage(receiverMember, random, messageId);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
new file mode 100644
index 0000000000..3a887d5521
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.distributed.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_CIPHERS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.GeneralSecurityException;
+import java.security.Security;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.version.VersionManager;
+
+/**
+ * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number
+ * of bytes that may be encoded with a key. When that limit is reached, a TLS
+ * KeyUpdate message is generated (by the SSLEngine). That message causes a new
+ * key to be negotiated between the peer SSLEngines.
+ *
+ * This test arranges for a low encryption byte limit to be set for the sending member
+ * and then for the receiving member. With the low byte limit configured, the test
+ * sends P2P messages via TLS and verifies request-reply message processing.
+ */
+@Category({MembershipTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class P2pMessagingSslTlsKeyUpdateDistributedTest {
+
+ private static final String TLS_PROTOCOL = "TLSv1.3";
+ private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384";
+
+ private static final int ENCRYPTED_BYTES_LIMIT = 64 * 1024;
+
+ private static final int MESSAGE_SIZE = 1024;
+
+ /*
+ * How many messages will be generated? We generate enough to cause KeyUpdate
+ * to be generated, and then we generate many more beyond that. Even with buggy
+ * wrap/unwrap logic, the retries in DirectChannel.sendToMany() and the transparent
+ * connection reestablishment in ConnectionTable can mask those bugs. So to reliably
+ * fail in the presence of bugs we need to generate lots of extra messages.
+ */
+ private static final int MESSAGES_PER_SENDER = ENCRYPTED_BYTES_LIMIT / MESSAGE_SIZE + 2000;
+
+ {
+ assertThat(MESSAGE_SIZE * MESSAGES_PER_SENDER > 10 * ENCRYPTED_BYTES_LIMIT);
+ }
+
+ public static final int MAX_REPLY_WAIT_MILLIS = 1_000;
+
+ private static Properties geodeConfigurationProperties;
+
+ @Rule
+ public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+ private MemberVM sender;
+ private MemberVM receiver;
+
+ @After
+ public void afterEach() {
+ clusterStartupRule.getVM(0).bounceForcibly();
+ clusterStartupRule.getVM(1).bounceForcibly();
+ clusterStartupRule.getVM(2).bounceForcibly();
+ }
+
+ /*
+ * bytes sent on sender JVM, bytes received on receiver JVM
+ * (not used in test JVM)
+ */
+ private static LongAdder bytesTransferredAdder;
+
+ // in receiver JVM only
+ private static LongAdder repliesGeneratedAdder;
+
+ // in sender JVM only
+ private static LongAdder repliesReceivedAdder;
+
+
+ private void configureJVMsAndStartClusterMembers(
+ final long locatorEncryptedBytesLimit,
+ final long senderEncryptedBytesLimit,
+ final long receiverEncryptedBytesLimit)
+ throws GeneralSecurityException, IOException {
+
+ clusterStartupRule.getVM(0).invoke(
+ setSecurityProperties(locatorEncryptedBytesLimit));
+ clusterStartupRule.getVM(1).invoke(
+ setSecurityProperties(senderEncryptedBytesLimit));
+ clusterStartupRule.getVM(2).invoke(
+ setSecurityProperties(receiverEncryptedBytesLimit));
+
+ final Properties senderConfiguration = geodeConfigurationProperties();
+ final Properties receiverConfiguration = geodeConfigurationProperties();
+
+ final MemberVM locator =
+ clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
+ x -> x.withProperties(senderConfiguration).withConnectionToLocator()
+ .withoutClusterConfigurationService().withoutManagementRestService());
+
+ sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+ receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
+ }
+
+ private @NotNull SerializableRunnableIF setSecurityProperties(final long encryptedBytesLimit) {
+ return () -> {
+ Security.setProperty("jdk.tls.keyLimits",
+ "AES/GCM/NoPadding KeyUpdate " + encryptedBytesLimit);
+
+ final Class<?> sslCipher = Class.forName("sun.security.ssl.SSLCipher");
+ final Field cipherLimits = sslCipher.getDeclaredField("cipherLimits");
+ cipherLimits.setAccessible(true);
+ assertThat((Map<String, Long>) cipherLimits.get(null)).containsEntry(
+ "AES/GCM/NOPADDING:KEYUPDATE",
+ encryptedBytesLimit);
+ };
+ }
+
+ @Test
+ @Parameters({
+ "137438953472, 65536, 137438953472",
+ "137438953472, 137438953472, 65536",
+ })
+ public void testP2PMessagingWithKeyUpdate(
+ final long locatorEncryptedBytesLimit,
+ final long senderEncryptedBytesLimit,
+ final long receiverEncryptedBytesLimit)
+ throws GeneralSecurityException, IOException {
+
+ configureJVMsAndStartClusterMembers(locatorEncryptedBytesLimit, senderEncryptedBytesLimit,
+ receiverEncryptedBytesLimit);
+
+ final InternalDistributedMember receiverMember =
+ receiver.invoke("get receiving member id", () -> {
+
+ bytesTransferredAdder = new LongAdder();
+ repliesGeneratedAdder = new LongAdder();
+
+ final ClusterDistributionManager cdm = getCDM();
+ final InternalDistributedMember localMember = cdm.getDistribution().getLocalMember();
+ return localMember;
+ });
+
+ // by returning a value from the invoked lambda we make invocation synchronous
+ final Boolean sendingComplete =
+ sender.invoke("message sending and reply counting", () -> {
+
+ bytesTransferredAdder = new LongAdder();
+ repliesReceivedAdder = new LongAdder();
+
+ final ClusterDistributionManager cdm = getCDM();
+
+ int failedRecipientCount = 0;
+ int droppedRepliesCount = 0;
+
+ final ReplyProcessor21[] replyProcessors = new ReplyProcessor21[MESSAGES_PER_SENDER];
+
+ // this loop sends request messages
+ for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) {
+
+ final ReplyProcessor21 replyProcessor = new ReplyProcessor21(cdm, receiverMember);
+ replyProcessors[messageId] = replyProcessor;
+ final TestMessage msg = new TestMessage(messageId, receiverMember,
+ replyProcessor.getProcessorId());
+
+ final Set<InternalDistributedMember> failedRecipients = cdm.putOutgoing(msg);
+ if (failedRecipients == null) {
+ bytesTransferredAdder.add(MESSAGE_SIZE);
+ } else {
+ failedRecipientCount += failedRecipients.size();
+ }
+ }
+
+ // this loop counts reply arrivals
+ for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) {
+ final ReplyProcessor21 replyProcessor = replyProcessors[messageId];
+ final boolean receivedReply =
+ replyProcessor.waitForRepliesUninterruptibly(MAX_REPLY_WAIT_MILLIS);
+ if (receivedReply) {
+ repliesReceivedAdder.increment();
+ } else {
+ droppedRepliesCount += 1;
+ }
+ }
+
+ assertThat((long) failedRecipientCount).as("message delivery failed N times").isZero();
+ assertThat((long) droppedRepliesCount).as("some replies were dropped").isZero();
+ return true;
+ });
+
+ // at this point, sender is done sending
+ final long bytesSent = getByteCount(sender);
+
+ await().untilAsserted(
+ () -> {
+ assertThat(getRepliesGenerated()).isEqualTo(MESSAGES_PER_SENDER);
+ assertThat(getRepliesReceived()).isEqualTo(MESSAGES_PER_SENDER);
+ assertThat(getByteCount(receiver))
+ .as("bytes received != bytes sent")
+ .isEqualTo(bytesSent);
+ });
+ }
+
+ private long getRepliesGenerated() {
+ return receiver.invoke(() -> repliesGeneratedAdder.sum());
+ }
+
+ private long getRepliesReceived() {
+ return sender.invoke(() -> repliesReceivedAdder.sum());
+ }
+
+ private long getByteCount(final MemberVM member) {
+ return member.invoke(() -> bytesTransferredAdder.sum());
+ }
+
+ private static ClusterDistributionManager getCDM() {
+ return (ClusterDistributionManager) ((InternalCache) CacheFactory.getAnyInstance())
+ .getDistributionManager();
+ }
+
+ private static class TestMessage extends DistributionMessage {
+ private volatile int messageId;
+ private volatile int replyProcessorId;
+ private volatile int length;
+
+ TestMessage(final int messageId,
+ final InternalDistributedMember receiver,
+ final int replyProcessorId) {
+ setRecipient(receiver);
+ this.messageId = messageId;
+ this.replyProcessorId = replyProcessorId;
+ }
+
+ // necessary for deserialization
+ public TestMessage() {
+ messageId = 0;
+ replyProcessorId = 0;
+ }
+
+ @Override
+ public int getProcessorType() {
+ return OperationExecutors.STANDARD_EXECUTOR;
+ }
+
+ @Override
+ protected void process(final ClusterDistributionManager dm) {
+
+ // In case bugs cause fromData to be called more times than this method,
+ // we don't count the bytes as "transferred" until we're in this method.
+ bytesTransferredAdder.add(length);
+
+ final ReplyMessage replyMsg = new ReplyMessage();
+ replyMsg.setRecipient(getSender());
+ replyMsg.setProcessorId(replyProcessorId);
+ replyMsg.setReturnValue("howdy!");
+ dm.putOutgoing(replyMsg);
+ repliesGeneratedAdder.increment();
+ }
+
+ @Override
+ public void toData(final DataOutput out, final SerializationContext context)
+ throws IOException {
+ super.toData(out, context);
+
+ out.writeInt(messageId);
+ out.writeInt(replyProcessorId);
+
+ final ThreadLocalRandom random = ThreadLocalRandom.current();
+ final int length = MESSAGE_SIZE;
+
+ out.writeInt(length);
+
+ final byte[] payload = new byte[length];
+ random.nextBytes(payload);
+
+ out.write(payload);
+ }
+
+ @Override
+ public void fromData(final DataInput in, final DeserializationContext context)
+ throws IOException, ClassNotFoundException {
+ super.fromData(in, context);
+
+ messageId = in.readInt();
+ replyProcessorId = in.readInt();
+
+ length = in.readInt();
+
+ final byte[] payload = new byte[length];
+
+ in.readFully(payload);
+ }
+
+ @Override
+ public int getDSFID() {
+ return NO_FIXED_ID; // for testing only!
+ }
+ }
+
+ private static @NotNull Properties geodeConfigurationProperties()
+ throws GeneralSecurityException, IOException {
+ // subsequent calls must return the same value so members agree on credentials
+ if (geodeConfigurationProperties == null) {
+ final CertificateMaterial ca = new CertificateBuilder()
+ .commonName("Test CA")
+ .isCA()
+ .generate();
+
+ final CertificateMaterial serverCertificate = new CertificateBuilder()
+ .commonName("member")
+ .issuedBy(ca)
+ .generate();
+
+ final CertStores memberStore = new CertStores("member");
+ memberStore.withCertificate("member", serverCertificate);
+ memberStore.trust("ca", ca);
+ // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+ final Properties props = memberStore.propertiesWith("all", false, false);
+ props.setProperty(SSL_PROTOCOLS, TLS_PROTOCOL);
+ props.setProperty(SSL_CIPHERS, TLS_CIPHER_SUITE);
+ geodeConfigurationProperties = props;
+ }
+ return geodeConfigurationProperties;
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
index 77fe9bf81f..72c79b03c0 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
@@ -33,7 +33,6 @@ import static org.apache.geode.test.dunit.VM.getVM;
import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Fail.fail;
import java.io.File;
import java.io.Serializable;
@@ -62,6 +61,7 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.DistributedRule;
@@ -96,6 +96,9 @@ public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
+ @Rule
+ public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
+
private VM locator;
private VM sender;
private VM receiver;
@@ -139,9 +142,8 @@ public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
blackboard.signalGate(UPDATE_ENTERED_GATE);
blackboard.waitForGate(SUSPEND_UPDATE_GATE);
} catch (TimeoutException | InterruptedException e) {
- fail("message observus interruptus");
+ errorCollector.addError(e);
}
- logger.info("BGB: got before process message: " + message);
});
}
};
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java
new file mode 100644
index 0000000000..57e361ada3
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.security.GeneralSecurityException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.security.UnrecoverableKeyException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.DMStats;
+
+/**
+ * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number
+ * of bytes that may be encoded with a key. When that limit is reached, a TLS
+ * KeyUpdate message is generated (by the SSLEngine). That message causes a new
+ * key to be negotiated between the peer SSLEngines.
+ *
+ * Geode's {@link NioSslEngine} class (subclass of {@link NioFilter}) encapsulates
+ * Java's {@link SSLEngine}. Geode's MsgStreamer, Connection, and ConnectionTable
+ * classes interact with NioSslEngine to accomplish peer-to-peer (P2P) messaging.
+ *
+ * This test constructs a pair of SSLEngine's and wraps them in NioSslEngine.
+ * Rather than relying on MsgStreamer, Connection, or ConnectionTable classes
+ * (which themselves have a lot of dependencies on other classes), this test
+ * implements simplified logic for driving the sending and receiving of data,
+ * i.e. calling NioSslEngine.wrap() and NioSslEngine.unwrap(). See
+ * {@link #send(int, NioFilter, SocketChannel, int)} and
+ * {@link #receive(int, NioFilter, SocketChannel, ByteBuffer)}.
+ *
+ * The {@link #keyUpdateDuringSecureDataTransferTest()} arranges for the encrypted bytes limit
+ * to be reached very quickly (see {@link #ENCRYPTED_BYTES_LIMIT}).
+ * The test verifies data transfer continues correctly, after the limit is reached.
+ * This indirectly verifies that the KeyUpdate protocol initiated by the sending
+ * SSLEngine is correctly handled by all the components involved.
+ */
+public class NioSslEngineKeyUpdateTest {
+
+ private static final String TLS_PROTOCOL = "TLSv1.3";
+ private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384";
+
+ // number of bytes the GCM cipher can encrypt before initiating a KeyUpdate
+ private static final int ENCRYPTED_BYTES_LIMIT = 1;
+
+ {
+ Security.setProperty("jdk.tls.keyLimits",
+ "AES/GCM/NoPadding KeyUpdate " + ENCRYPTED_BYTES_LIMIT);
+ }
+
+ private static BufferPool bufferPool;
+ private static SSLContext sslContext;
+ private static KeyStore keystore;
+ private static char[] keystorePassword;
+ private static KeyStore truststore;
+
+ private SSLEngine clientEngine;
+ private SSLEngine serverEngine;
+ private int packetBufferSize;
+
+ @BeforeClass
+ public static void beforeClass() throws GeneralSecurityException, IOException {
+ DMStats mockStats = mock(DMStats.class);
+ bufferPool = new BufferPool(mockStats);
+
+ final Properties securityProperties = createKeystoreAndTruststore();
+
+ keystore = KeyStore.getInstance("JKS");
+ keystorePassword = securityProperties.getProperty(SSL_KEYSTORE_PASSWORD).toCharArray();
+ keystore.load(new FileInputStream(securityProperties.getProperty(SSL_KEYSTORE)),
+ keystorePassword);
+
+ truststore = KeyStore.getInstance("JKS");
+ final char[] truststorePassword =
+ securityProperties.getProperty(SSL_TRUSTSTORE_PASSWORD).toCharArray();
+ truststore.load(new FileInputStream(securityProperties.getProperty(SSL_TRUSTSTORE)),
+ truststorePassword);
+ }
+
+ @Before
+ public void before() throws NoSuchAlgorithmException, UnrecoverableKeyException,
+ KeyStoreException, KeyManagementException {
+ final KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");
+ kmf.init(keystore, keystorePassword);
+ final TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+ tmf.init(truststore);
+
+ sslContext = SSLContext.getInstance("TLS");
+ final SecureRandom random = new SecureRandom(new byte[] {1, 2, 3});
+ sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random);
+
+ final SSLParameters defaultParameters = sslContext.getDefaultSSLParameters();
+ final String[] protocols = defaultParameters.getProtocols();
+ final String[] cipherSuites = defaultParameters.getCipherSuites();
+ System.out.println(
+ String.format("TLS settings (default) before handshake: Protocols: %s, Cipher Suites: %s",
+ Arrays.toString(protocols), Arrays.toString(cipherSuites)));
+
+ clientEngine = createSSLEngine("server-host", true, sslContext);
+ packetBufferSize = clientEngine.getSession().getPacketBufferSize();
+
+ serverEngine = createSSLEngine("client-host", false, sslContext);
+ }
+
+ /*
+ * Verify initial handshake succeeds in the presence of KeyUpdate messages
+ * (i.e. updating send-side cryptographic keys).
+ *
+ * This test verifies, primarily, the behavior of
+ * NioSslEngine.handshake(SocketChannel, int, ByteBuffer)
+ */
+ @Test
+ public void keyUpdateDuringInitialHandshakeTest() {
+ clientServerTest(
+ (channel, filter, peerNetData) -> {
+ handshakeTLS(channel, filter, peerNetData, "Client:");
+ return true;
+ },
+ (channel, filter, peerNetData1) -> {
+ handshakeTLS(channel, filter, peerNetData1,
+ "Server:");
+ return true;
+ });
+ }
+
+ /*
+ * Building on keyUpdateDuringInitialHandshakeTest(), this test verifies that
+ * after the handshake succeeds, subsequent data transfer succeeds in the presence
+ * of KeyUpdate messages (i.e. updating send-side cryptographic keys).
+ *
+ * This test verifies, primarily, the behavior of NioSslEngine#wrap(ByteBuffer).
+ */
+ @Test
+ public void keyUpdateDuringSecureDataTransferTest() {
+ clientServerTest(
+ (final SocketChannel channel,
+ final NioSslEngine filter,
+ final ByteBuffer peerNetData) -> {
+ handshakeTLS(channel, filter, peerNetData, "Client:");
+ /*
+ * In order to verify that KeyUpdate is properly handled in NioSslEngine.wrap()
+ * we must arrange for the KeyUpdate (generation and processing) to occur after
+ * the initial handshake and before the handshaking during NioSslEngine.close().
+ *
+ * If we call send() only once, regardless of the number of bytes wrapped (even
+ * if it exceeds the encryption byte limit set in jdk.tls.keyLimits, the status
+ * result from SSLEngine.wrap() will be OK. We will fail to encounter the situation
+ * where it is e.g. BUFFER_OVERFLOW.
+ *
+ * By calling send() with bytesToSend >= the limit, we can be sure that the
+ * subsequent call to send() will trigger the KeyUpdate and will require proper
+ * handling of that situation in NioSslEngine.wrap().
+ */
+ send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0);
+ send(ENCRYPTED_BYTES_LIMIT, filter, channel, ENCRYPTED_BYTES_LIMIT);
+
+ return true;
+ },
+ (final SocketChannel channel,
+ final NioSslEngine filter,
+ final ByteBuffer peerNetData) -> {
+ handshakeTLS(channel, filter, peerNetData, "Server:");
+ /*
+ * Call receive() twice (like we did for send()) just to test that receive() is
+ * leaving buffers in the correct readable/writable state when it returns.
+ */
+ for (int i = 0; i < 2; i++) {
+ final byte[] received =
+ receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData);
+ assertThat(received).hasSize(ENCRYPTED_BYTES_LIMIT);
+ for (int j = i * ENCRYPTED_BYTES_LIMIT; j < (i + 1)
+ * ENCRYPTED_BYTES_LIMIT; j++) {
+ assertThat(received[j % received.length]).isEqualTo((byte) j);
+ }
+ }
+ return true;
+ });
+ }
+
+ /*
+ * Building on keyUpdateDuringSecureDataTransferTest(), this test verifies that
+ * NioSslEngine.close() succeeds in the presence of KeyUpdate messages
+ * (i.e. updating send-side cryptographic keys). This test is important because
+ * NioSslEngine.close() involves some TLS handshaking.
+ *
+ * This test verifies, primarily, the behavior of NioSslEngine#close(SocketChannel).
+ */
+ @Test
+ public void keyUpdateDuringSocketCloseHandshakeTest() {
+ clientServerTest(
+ (final SocketChannel channel,
+ final NioSslEngine filter,
+ final ByteBuffer peerNetData) -> {
+ handshakeTLS(channel, filter, peerNetData, "Client:");
+ /*
+ * Leave send-side SSLEngine in a state where it will generate a KeyUpdate
+ * TLS message during (but not before) NioSslEngine.close().
+ */
+ send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0);
+ return true;
+ },
+ (final SocketChannel channel,
+ final NioSslEngine filter,
+ final ByteBuffer peerNetData) -> {
+ handshakeTLS(channel, filter, peerNetData, "Server:");
+ receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData);
+ /*
+ * No need to validate the received data since our purpose is only to verify that
+ * NioSslEngine.close() succeeds cleanly.
+ */
+ return true;
+ });
+ }
+
+ private static SSLEngine createSSLEngine(final String peerHost, final boolean useClientMode,
+ final SSLContext sslContext) {
+ final SSLEngine engine = sslContext.createSSLEngine(peerHost, 10001);
+ engine.setEnabledProtocols(new String[] {TLS_PROTOCOL});
+ engine.setEnabledCipherSuites(new String[] {TLS_CIPHER_SUITE});
+ engine.setUseClientMode(useClientMode);
+ return engine;
+ }
+
+ private void clientServerTest(final PeerAction clientAction, final PeerAction serverAction) {
+ final ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+ final CompletableFuture<SocketAddress> boundAddress = new CompletableFuture<>();
+
+ final CountDownLatch serversWaiting = new CountDownLatch(1);
+
+ final CompletableFuture<Boolean> serverHandshakeFuture =
+ supplyAsync(
+ () -> server(boundAddress, packetBufferSize,
+ serverAction, serversWaiting, serverEngine),
+ executorService);
+
+ final CompletableFuture<Boolean> clientHandshakeFuture =
+ supplyAsync(
+ () -> client(boundAddress, packetBufferSize,
+ clientAction, serversWaiting, clientEngine),
+ executorService);
+
+ CompletableFuture.allOf(serverHandshakeFuture, clientHandshakeFuture)
+ .join();
+ }
+
+ /*
+ * An action taken on a client or server after the SocketChannel has been established.
+ */
+ private interface PeerAction {
+ boolean apply(final SocketChannel acceptedChannel,
+ final NioSslEngine filter,
+ final ByteBuffer peerNetData) throws IOException;
+ }
+
+ private static boolean client(
+ final CompletableFuture<SocketAddress> boundAddress,
+ final int packetBufferSize,
+ final PeerAction peerAction,
+ final CountDownLatch serversWaiting,
+ final SSLEngine engine) {
+ try {
+ try (final SocketChannel connectedChannel = SocketChannel.open()) {
+ connectedChannel.connect(boundAddress.get());
+ final ByteBuffer peerNetData =
+ ByteBuffer.allocateDirect(packetBufferSize);
+
+ final NioSslEngine filter = new NioSslEngine(engine, bufferPool);
+
+ final boolean result =
+ peerAction.apply(connectedChannel, filter, peerNetData);
+
+ serversWaiting.await(); // wait for last server to give up before closing our socket
+
+ filter.close(connectedChannel);
+
+ return result;
+ }
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ printException("In client:", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static boolean server(
+ final CompletableFuture<SocketAddress> boundAddress,
+ final int packetBufferSize,
+ final PeerAction peerAction,
+ final CountDownLatch serversWaiting,
+ final SSLEngine engine) {
+ try (final ServerSocketChannel boundChannel = ServerSocketChannel.open()) {
+ final InetSocketAddress bindAddress =
+ new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+ boundChannel.bind(bindAddress);
+ boundAddress.complete(boundChannel.getLocalAddress());
+ try (final SocketChannel acceptedChannel = boundChannel.accept()) {
+ final ByteBuffer peerNetData =
+ ByteBuffer.allocateDirect(packetBufferSize);
+
+ final NioSslEngine filter = new NioSslEngine(engine, bufferPool);
+
+ final boolean result =
+ peerAction.apply(acceptedChannel, filter, peerNetData);
+
+ filter.close(acceptedChannel);
+
+ return result;
+ }
+ } catch (IOException e) {
+ printException("In server:", e);
+ throw new RuntimeException(e);
+ } finally {
+ serversWaiting.countDown();
+ }
+ }
+
+ private static void printException(final String context, final Exception e) {
+ System.out.println(context + "\n");
+ e.printStackTrace();
+ }
+
+ private static Properties createKeystoreAndTruststore()
+ throws GeneralSecurityException, IOException {
+ final CertificateMaterial ca = new CertificateBuilder()
+ .commonName("Test CA")
+ .isCA()
+ .generate();
+
+ CertificateMaterial serverCertificate = new CertificateBuilder()
+ .commonName("server")
+ .issuedBy(ca)
+ .generate();
+
+ final CertStores serverStore = CertStores.serverStore();
+ serverStore.withCertificate("server", serverCertificate);
+ serverStore.trust("ca", ca);
+ return serverStore.propertiesWith("all", true, false);
+ }
+
+ /**
+ * @param filter is a newly constructed and intitialized object; it has not been used
+ * for handshakes previously.
+ * @param peerNetData on entry: don't care about read/write state or contents
+ */
+ private static boolean handshakeTLS(final SocketChannel channel,
+ final NioSslEngine filter,
+ final ByteBuffer peerNetData,
+ final String context) throws IOException {
+ final boolean blocking = channel.isBlocking();
+ try {
+ channel.configureBlocking(false);
+ final boolean result =
+ filter.handshake(channel, 6_000, peerNetData);
+ System.out.println(
+ String.format(
+ "%s TLS settings after successful handshake: Protocol: %s, Cipher Suite: %s",
+ context,
+ filter.engine.getSession().getProtocol(),
+ filter.engine.getSession().getCipherSuite()));
+ return result;
+ } finally {
+ channel.configureBlocking(blocking);
+ }
+ }
+
+ /**
+ * This method is trying to do what Connection readMessages() and processInputBuffer() do
+ * together.
+ *
+ * Note well: peerNetData may contain content on entry to this method. Also the filter's
+ * buffers (e.g. the buffer returned by unwrap) may already contain data from previous
+ * calls to unwrap().
+ *
+ * @param peerNetData will be in write mode on entry and may already contain content
+ */
+ private static byte[] receive(
+ final int bytesToReceive,
+ final NioFilter filter,
+ final SocketChannel channel,
+ final ByteBuffer peerNetData) throws IOException {
+ final byte[] received = new byte[bytesToReceive];
+ // peerNetData in write mode
+ peerNetData.flip();
+ // peerNetData in read mode
+ int pos = 0;
+ while (pos < bytesToReceive) {
+ /*
+ * On first iteration unwrap() is called before the channel is read. This is necessary
+ * since a previous call to this method (receive()) could have left data in the filter
+ * and there might not be any more data coming on the channel (ever).
+ *
+ * If no data was already held in the filter's buffer, and peerNetData was empty
+ * before calling unwrap() then the buffer returned by unwrap will be empty. But before
+ * we start the second loop iteration, we'll read (from the channel into peerNetData).
+ *
+ * The filter's unwrap() method takes peerNetData in read mode and when the method returns
+ * the buffer is in write mode (ready for us to add data to it if needed).
+ */
+ try (final ByteBufferSharing appDataSharing = filter.unwrap(peerNetData)) {
+ // peerNetData in write mode
+ final ByteBuffer appData = appDataSharing.getBuffer();
+ // appData in write mode
+ appData.flip();
+ // appData in read mode
+ if (appData.hasRemaining()) {
+ final int newBytes = Math.min(appData.remaining(), received.length - pos);
+ assert pos + newBytes <= received.length;
+ appData.get(received, pos, newBytes);
+ pos += newBytes;
+ } else {
+ channel.read(peerNetData);
+ }
+ peerNetData.flip();
+ // peerNetData in read mode ready for filter unwrap() call
+ appData.compact();
+ // appData in write mode ready for filter unwrap() call
+ }
+ }
+ peerNetData.compact();
+ // peerNetData in write mode
+ return received;
+ }
+
+ /*
+ * This method is trying to do what Connection.writeFully() does.
+ */
+ private static void send(
+ final int bytesToSend,
+ final NioFilter filter,
+ final SocketChannel channel,
+ final int startingValue)
+ throws IOException {
+ // if we wanted to send more than one buffer-full we could add an outer loop
+ final ByteBuffer appData = ByteBuffer.allocateDirect(bytesToSend);
+ for (int i = 0; i < bytesToSend; i++) {
+ appData.put((byte) (i + startingValue));
+ }
+ appData.flip();
+ try (final ByteBufferSharing netDataSharing = filter.wrap(appData)) {
+ final ByteBuffer netData = netDataSharing.getBuffer();
+ while (netData.remaining() > 0) {
+ channel.write(netData);
+ }
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 7831444200..09d99bb0a7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -18,7 +18,7 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
-import static javax.net.ssl.SSLEngineResult.Status.OK;
+import static javax.net.ssl.SSLEngineResult.Status.CLOSED;
import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER;
@@ -40,6 +40,7 @@ import javax.net.ssl.SSLSession;
import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
+import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.internal.net.BufferPool.BufferType;
import org.apache.geode.internal.net.ByteBufferVendor.OpenAttemptTimedOut;
@@ -51,6 +52,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
* Its use should be confined to one thread or should be protected by external synchronization.
*/
public class NioSslEngine implements NioFilter {
+ @Immutable
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
private static final Logger logger = LogService.getLogger();
private final BufferPool bufferPool;
@@ -229,7 +232,8 @@ public class NioSslEngine implements NioFilter {
}
@Override
- public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
+ public ByteBufferSharing wrap(ByteBuffer appData)
+ throws IOException {
try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
ByteBuffer myNetData = outputSharing.getBuffer();
@@ -237,24 +241,21 @@ public class NioSslEngine implements NioFilter {
myNetData.clear();
while (appData.hasRemaining()) {
- // ensure we have lots of capacity since encrypted data might
- // be larger than the app data
- int remaining = myNetData.capacity() - myNetData.position();
-
- if (remaining < (appData.remaining() * 2)) {
- int newCapacity = expandedCapacity(appData, myNetData);
- myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+ final SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+ switch (wrapResult.getStatus()) {
+ case BUFFER_OVERFLOW:
+ final int newCapacity =
+ myNetData.position() + engine.getSession().getPacketBufferSize();
+ myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+ break;
+ case BUFFER_UNDERFLOW:
+ case CLOSED:
+ throw new SSLException("Error encrypting data: " + wrapResult);
}
- SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
-
if (wrapResult.getHandshakeStatus() == NEED_TASK) {
handleBlockingTasks();
}
-
- if (wrapResult.getStatus() != OK) {
- throw new SSLException("Error encrypting data: " + wrapResult);
- }
}
myNetData.flip();
@@ -373,6 +374,7 @@ public class NioSslEngine implements NioFilter {
@Override
public synchronized void close(SocketChannel socketChannel) {
+
if (closed) {
return;
}
@@ -381,19 +383,25 @@ public class NioSslEngine implements NioFilter {
try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) {
final ByteBuffer myNetData = outputSharing.getBuffer();
- if (!engine.isOutboundDone()) {
- ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
- engine.closeOutbound();
+ engine.closeOutbound();
+
+ SSLEngineResult result = null;
+
+ while (!engine.isOutboundDone()) {
// clear the buffer to receive a CLOSE message from the SSLEngine
myNetData.clear();
// Get close message
- SSLEngineResult result = engine.wrap(empty, myNetData);
-
- if (result.getStatus() != SSLEngineResult.Status.CLOSED) {
- throw new SSLHandshakeException(
- "Error closing SSL session. Status=" + result.getStatus());
+ result = engine.wrap(EMPTY_BYTE_BUFFER, myNetData);
+
+ /*
+ * We would have liked to make this one of the while() conditions but
+ * if status is CLOSED we'll get a "Broken pipe" exception in the next write()
+ * so it needs to be handled here.
+ */
+ if (result.getStatus() == CLOSED) {
+ break;
}
// Send close message to peer
@@ -402,6 +410,10 @@ public class NioSslEngine implements NioFilter {
socketChannel.write(myNetData);
}
}
+ if (result != null && result.getStatus() != CLOSED) {
+ throw new SSLHandshakeException(
+ "Error closing SSL session. Status=" + result.getStatus());
+ }
} catch (ClosedChannelException e) {
// we can't send a close message if the channel is closed
} catch (IOException e) {
@@ -416,18 +428,13 @@ public class NioSslEngine implements NioFilter {
}
}
- private int expandedCapacity(ByteBuffer sourceBuffer, ByteBuffer targetBuffer) {
- return Math.max(targetBuffer.position() + sourceBuffer.remaining() * 2,
- targetBuffer.capacity() * 2);
- }
-
@VisibleForTesting
- public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException {
+ public ByteBufferVendor getOutputBufferVendorForTestingOnly() {
return outputBufferVendor;
}
@VisibleForTesting
- public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException {
+ public ByteBufferVendor getInputBufferVendorForTestingOnly() {
return inputBufferVendor;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 82e3b3da9e..03f3e77ab5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1801,7 +1801,7 @@ public class Connection implements Runnable {
* checks to see if an exception should not be logged: i.e., "forcibly closed", "reset by peer",
* or "connection reset"
*/
- private static boolean isIgnorableIOException(Exception e) {
+ private static boolean isIgnorableIOException(IOException e) {
if (e instanceof ClosedChannelException) {
return true;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index d6b9aa6485..7c72d6780b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
@@ -190,7 +191,7 @@ public class NioSslEngineTest {
}
@Test
- public void wrap() throws Exception {
+ public void engineWrapCausesResizeThenSucceeds() throws Exception {
try (final ByteBufferSharing outputSharing =
nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
@@ -206,6 +207,7 @@ public class NioSslEngineTest {
// buffer
TestSSLEngine testEngine = new TestSSLEngine();
testEngine.addReturnResult(
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0),
new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
spyNioSslEngine.engine = testEngine;
@@ -217,12 +219,12 @@ public class NioSslEngineTest {
appData.flip();
assertThat(wrappedBuffer).isEqualTo(appData);
}
- verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+ verify(spyNioSslEngine, times(2)).handleBlockingTasks();
}
}
@Test
- public void wrapFails() throws IOException {
+ public void engineWrapCausesResizeThenCloses() throws IOException {
try (final ByteBufferSharing outputSharing =
nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
// make the application data too big to fit into the engine's encryption buffer
@@ -237,10 +239,12 @@ public class NioSslEngineTest {
// buffer
TestSSLEngine testEngine = new TestSSLEngine();
testEngine.addReturnResult(
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0),
new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
spyNioSslEngine.engine = testEngine;
- assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+ assertThatThrownBy(() -> spyNioSslEngine.wrap(appData))
+ .isInstanceOf(SSLException.class)
.hasMessageContaining("Error encrypting data");
}
}
@@ -370,7 +374,7 @@ public class NioSslEngineTest {
when(mockChannel.socket()).thenReturn(mockSocket);
when(mockSocket.isClosed()).thenReturn(false);
- when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+ when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE);
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
new SSLEngineResult(CLOSED, FINISHED, 0, 0));
nioSslEngine.close(mockChannel);
@@ -384,13 +388,13 @@ public class NioSslEngineTest {
}
@Test
- public void closeWhenUnwrapError() throws Exception {
+ public void closeWhenWrapError() throws Exception {
SocketChannel mockChannel = mock(SocketChannel.class);
Socket mockSocket = mock(Socket.class);
when(mockChannel.socket()).thenReturn(mockSocket);
when(mockSocket.isClosed()).thenReturn(true);
- when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+ when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE);
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
new SSLEngineResult(BUFFER_OVERFLOW, FINISHED, 0, 0));
assertThatThrownBy(() -> nioSslEngine.close(mockChannel)).isInstanceOf(GemFireIOException.class)
@@ -405,14 +409,13 @@ public class NioSslEngineTest {
when(mockChannel.socket()).thenReturn(mockSocket);
when(mockSocket.isClosed()).thenReturn(true);
- when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
try (final ByteBufferSharing outputSharing =
nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
// give the NioSslEngine something to write on its socket channel, simulating a TLS close
// message
outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
- return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+ return new SSLEngineResult(OK, NEED_UNWRAP, 0, 0);
}
});
when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
@@ -605,10 +608,19 @@ public class NioSslEngineTest {
@Override
public SSLEngineResult wrap(ByteBuffer[] sources, int i, int i1, ByteBuffer destination) {
- for (ByteBuffer source : sources) {
+ assertThat(sources.length)
+ .as("test unexpectedly tried to wrap with multiple sources")
+ .isEqualTo(1);
+ final ByteBuffer source = sources[0];
+ final SSLEngineResult nextResult = nextResult();
+ try {
destination.put(source);
+ } catch (final BufferOverflowException e) {
+ assertThat(BUFFER_OVERFLOW)
+ .as("got unexpected buffer overflow")
+ .isEqualTo(nextResult.getStatus());
}
- return nextResult();
+ return nextResult;
}
@Override
@@ -672,7 +684,9 @@ public class NioSslEngineTest {
@Override
public SSLSession getSession() {
- return null;
+ final SSLSession session = mock(SSLSession.class);
+ when(session.getPacketBufferSize()).thenReturn(16 * 1024);
+ return session;
}
@Override