You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2022/04/26 21:54:09 UTC

[geode] branch support/1.14 updated: GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message (#7449) (#7615)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new 07c08e9502 GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message  (#7449) (#7615)
07c08e9502 is described below

commit 07c08e95025ff955c9b361db4b97902ce722be81
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Apr 26 14:54:03 2022 -0700

    GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message  (#7449) (#7615)
    
    * Key expiration works for TLSv1.3 and GCM-based ciphers
    * TLS KeyUpdate messages are processed correctly
    * Removed dependencies on: Mockito 4, JUnit 5, GeodeParamsRunner
    
    (cherry picked from commit d2535394a82ac5faf10f004f4e3c15f756f7b177)
---
 .../internal/P2PMessagingConcurrencyDUnitTest.java |   2 +-
 ...P2pMessagingSslTlsKeyUpdateDistributedTest.java | 367 +++++++++++++++
 .../tcp/ConnectionCloseSSLTLSDUnitTest.java        |   8 +-
 .../internal/net/NioSslEngineKeyUpdateTest.java    | 497 +++++++++++++++++++++
 .../apache/geode/internal/net/NioSslEngine.java    |  67 +--
 .../org/apache/geode/internal/tcp/Connection.java  |   2 +-
 .../geode/internal/net/NioSslEngineTest.java       |  38 +-
 7 files changed, 934 insertions(+), 47 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
index 0d7c2d389f..326e1c2ac4 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -167,7 +167,6 @@ public class P2PMessagingConcurrencyDUnitTest {
       bytesTransferredAdder = new LongAdder();
 
       final ClusterDistributionManager cdm = getCDM();
-      final Random random = new Random(RANDOM_SEED);
       final AtomicInteger nextSenderId = new AtomicInteger();
 
       /*
@@ -194,6 +193,7 @@ public class P2PMessagingConcurrencyDUnitTest {
           throw new RuntimeException("doSending failed", e);
         }
         final int firstMessageId = senderId * SENDER_COUNT;
+        final Random random = new Random(RANDOM_SEED);
         for (int messageId = firstMessageId; messageId < firstMessageId
             + MESSAGES_PER_SENDER; messageId++) {
           final TestMessage msg = new TestMessage(receiverMember, random, messageId);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
new file mode 100644
index 0000000000..3a887d5521
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.distributed.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_CIPHERS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.GeneralSecurityException;
+import java.security.Security;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.version.VersionManager;
+
+/**
+ * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number
+ * of bytes that may be encoded with a key. When that limit is reached, a TLS
+ * KeyUpdate message is generated (by the SSLEngine). That message causes a new
+ * key to be negotiated between the peer SSLEngines.
+ *
+ * This test arranges for a low encryption byte limit to be set for the sending member
+ * and then for the receiving member. With the low byte limit configured, the test
+ * sends P2P messages via TLS and verifies request-reply message processing.
+ */
+@Category({MembershipTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class P2pMessagingSslTlsKeyUpdateDistributedTest {
+
+  private static final String TLS_PROTOCOL = "TLSv1.3";
+  private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384";
+
+  private static final int ENCRYPTED_BYTES_LIMIT = 64 * 1024;
+
+  private static final int MESSAGE_SIZE = 1024;
+
+  /*
+   * How many messages will be generated? We generate enough to cause KeyUpdate
+   * to be generated, and then we generate many more beyond that. Even with buggy
+   * wrap/unwrap logic, the retries in DirectChannel.sendToMany() and the transparent
+   * connection reestablishment in ConnectionTable can mask those bugs. So to reliably
+   * fail in the presence of bugs we need to generate lots of extra messages.
+   */
+  private static final int MESSAGES_PER_SENDER = ENCRYPTED_BYTES_LIMIT / MESSAGE_SIZE + 2000;
+
+  {
+    assertThat(MESSAGE_SIZE * MESSAGES_PER_SENDER > 10 * ENCRYPTED_BYTES_LIMIT);
+  }
+
+  public static final int MAX_REPLY_WAIT_MILLIS = 1_000;
+
+  private static Properties geodeConfigurationProperties;
+
+  @Rule
+  public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  private MemberVM sender;
+  private MemberVM receiver;
+
+  @After
+  public void afterEach() {
+    clusterStartupRule.getVM(0).bounceForcibly();
+    clusterStartupRule.getVM(1).bounceForcibly();
+    clusterStartupRule.getVM(2).bounceForcibly();
+  }
+
+  /*
+   * bytes sent on sender JVM, bytes received on receiver JVM
+   * (not used in test JVM)
+   */
+  private static LongAdder bytesTransferredAdder;
+
+  // in receiver JVM only
+  private static LongAdder repliesGeneratedAdder;
+
+  // in sender JVM only
+  private static LongAdder repliesReceivedAdder;
+
+
+  private void configureJVMsAndStartClusterMembers(
+      final long locatorEncryptedBytesLimit,
+      final long senderEncryptedBytesLimit,
+      final long receiverEncryptedBytesLimit)
+      throws GeneralSecurityException, IOException {
+
+    clusterStartupRule.getVM(0).invoke(
+        setSecurityProperties(locatorEncryptedBytesLimit));
+    clusterStartupRule.getVM(1).invoke(
+        setSecurityProperties(senderEncryptedBytesLimit));
+    clusterStartupRule.getVM(2).invoke(
+        setSecurityProperties(receiverEncryptedBytesLimit));
+
+    final Properties senderConfiguration = geodeConfigurationProperties();
+    final Properties receiverConfiguration = geodeConfigurationProperties();
+
+    final MemberVM locator =
+        clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
+            x -> x.withProperties(senderConfiguration).withConnectionToLocator()
+                .withoutClusterConfigurationService().withoutManagementRestService());
+
+    sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+    receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
+  }
+
+  private @NotNull SerializableRunnableIF setSecurityProperties(final long encryptedBytesLimit) {
+    return () -> {
+      Security.setProperty("jdk.tls.keyLimits",
+          "AES/GCM/NoPadding KeyUpdate " + encryptedBytesLimit);
+
+      final Class<?> sslCipher = Class.forName("sun.security.ssl.SSLCipher");
+      final Field cipherLimits = sslCipher.getDeclaredField("cipherLimits");
+      cipherLimits.setAccessible(true);
+      assertThat((Map<String, Long>) cipherLimits.get(null)).containsEntry(
+          "AES/GCM/NOPADDING:KEYUPDATE",
+          encryptedBytesLimit);
+    };
+  }
+
+  @Test
+  @Parameters({
+      "137438953472, 65536, 137438953472",
+      "137438953472, 137438953472, 65536",
+  })
+  public void testP2PMessagingWithKeyUpdate(
+      final long locatorEncryptedBytesLimit,
+      final long senderEncryptedBytesLimit,
+      final long receiverEncryptedBytesLimit)
+      throws GeneralSecurityException, IOException {
+
+    configureJVMsAndStartClusterMembers(locatorEncryptedBytesLimit, senderEncryptedBytesLimit,
+        receiverEncryptedBytesLimit);
+
+    final InternalDistributedMember receiverMember =
+        receiver.invoke("get receiving member id", () -> {
+
+          bytesTransferredAdder = new LongAdder();
+          repliesGeneratedAdder = new LongAdder();
+
+          final ClusterDistributionManager cdm = getCDM();
+          final InternalDistributedMember localMember = cdm.getDistribution().getLocalMember();
+          return localMember;
+        });
+
+    // by returning a value from the invoked lambda we make invocation synchronous
+    final Boolean sendingComplete =
+        sender.invoke("message sending and reply counting", () -> {
+
+          bytesTransferredAdder = new LongAdder();
+          repliesReceivedAdder = new LongAdder();
+
+          final ClusterDistributionManager cdm = getCDM();
+
+          int failedRecipientCount = 0;
+          int droppedRepliesCount = 0;
+
+          final ReplyProcessor21[] replyProcessors = new ReplyProcessor21[MESSAGES_PER_SENDER];
+
+          // this loop sends request messages
+          for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) {
+
+            final ReplyProcessor21 replyProcessor = new ReplyProcessor21(cdm, receiverMember);
+            replyProcessors[messageId] = replyProcessor;
+            final TestMessage msg = new TestMessage(messageId, receiverMember,
+                replyProcessor.getProcessorId());
+
+            final Set<InternalDistributedMember> failedRecipients = cdm.putOutgoing(msg);
+            if (failedRecipients == null) {
+              bytesTransferredAdder.add(MESSAGE_SIZE);
+            } else {
+              failedRecipientCount += failedRecipients.size();
+            }
+          }
+
+          // this loop counts reply arrivals
+          for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) {
+            final ReplyProcessor21 replyProcessor = replyProcessors[messageId];
+            final boolean receivedReply =
+                replyProcessor.waitForRepliesUninterruptibly(MAX_REPLY_WAIT_MILLIS);
+            if (receivedReply) {
+              repliesReceivedAdder.increment();
+            } else {
+              droppedRepliesCount += 1;
+            }
+          }
+
+          assertThat((long) failedRecipientCount).as("message delivery failed N times").isZero();
+          assertThat((long) droppedRepliesCount).as("some replies were dropped").isZero();
+          return true;
+        });
+
+    // at this point, sender is done sending
+    final long bytesSent = getByteCount(sender);
+
+    await().untilAsserted(
+        () -> {
+          assertThat(getRepliesGenerated()).isEqualTo(MESSAGES_PER_SENDER);
+          assertThat(getRepliesReceived()).isEqualTo(MESSAGES_PER_SENDER);
+          assertThat(getByteCount(receiver))
+              .as("bytes received != bytes sent")
+              .isEqualTo(bytesSent);
+        });
+  }
+
+  private long getRepliesGenerated() {
+    return receiver.invoke(() -> repliesGeneratedAdder.sum());
+  }
+
+  private long getRepliesReceived() {
+    return sender.invoke(() -> repliesReceivedAdder.sum());
+  }
+
+  private long getByteCount(final MemberVM member) {
+    return member.invoke(() -> bytesTransferredAdder.sum());
+  }
+
+  private static ClusterDistributionManager getCDM() {
+    return (ClusterDistributionManager) ((InternalCache) CacheFactory.getAnyInstance())
+        .getDistributionManager();
+  }
+
+  private static class TestMessage extends DistributionMessage {
+    private volatile int messageId;
+    private volatile int replyProcessorId;
+    private volatile int length;
+
+    TestMessage(final int messageId,
+        final InternalDistributedMember receiver,
+        final int replyProcessorId) {
+      setRecipient(receiver);
+      this.messageId = messageId;
+      this.replyProcessorId = replyProcessorId;
+    }
+
+    // necessary for deserialization
+    public TestMessage() {
+      messageId = 0;
+      replyProcessorId = 0;
+    }
+
+    @Override
+    public int getProcessorType() {
+      return OperationExecutors.STANDARD_EXECUTOR;
+    }
+
+    @Override
+    protected void process(final ClusterDistributionManager dm) {
+
+      // In case bugs cause fromData to be called more times than this method,
+      // we don't count the bytes as "transferred" until we're in this method.
+      bytesTransferredAdder.add(length);
+
+      final ReplyMessage replyMsg = new ReplyMessage();
+      replyMsg.setRecipient(getSender());
+      replyMsg.setProcessorId(replyProcessorId);
+      replyMsg.setReturnValue("howdy!");
+      dm.putOutgoing(replyMsg);
+      repliesGeneratedAdder.increment();
+    }
+
+    @Override
+    public void toData(final DataOutput out, final SerializationContext context)
+        throws IOException {
+      super.toData(out, context);
+
+      out.writeInt(messageId);
+      out.writeInt(replyProcessorId);
+
+      final ThreadLocalRandom random = ThreadLocalRandom.current();
+      final int length = MESSAGE_SIZE;
+
+      out.writeInt(length);
+
+      final byte[] payload = new byte[length];
+      random.nextBytes(payload);
+
+      out.write(payload);
+    }
+
+    @Override
+    public void fromData(final DataInput in, final DeserializationContext context)
+        throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+
+      messageId = in.readInt();
+      replyProcessorId = in.readInt();
+
+      length = in.readInt();
+
+      final byte[] payload = new byte[length];
+
+      in.readFully(payload);
+    }
+
+    @Override
+    public int getDSFID() {
+      return NO_FIXED_ID; // for testing only!
+    }
+  }
+
+  private static @NotNull Properties geodeConfigurationProperties()
+      throws GeneralSecurityException, IOException {
+    // subsequent calls must return the same value so members agree on credentials
+    if (geodeConfigurationProperties == null) {
+      final CertificateMaterial ca = new CertificateBuilder()
+          .commonName("Test CA")
+          .isCA()
+          .generate();
+
+      final CertificateMaterial serverCertificate = new CertificateBuilder()
+          .commonName("member")
+          .issuedBy(ca)
+          .generate();
+
+      final CertStores memberStore = new CertStores("member");
+      memberStore.withCertificate("member", serverCertificate);
+      memberStore.trust("ca", ca);
+      // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+      final Properties props = memberStore.propertiesWith("all", false, false);
+      props.setProperty(SSL_PROTOCOLS, TLS_PROTOCOL);
+      props.setProperty(SSL_CIPHERS, TLS_CIPHER_SUITE);
+      geodeConfigurationProperties = props;
+    }
+    return geodeConfigurationProperties;
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
index 77fe9bf81f..72c79b03c0 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
@@ -33,7 +33,6 @@ import static org.apache.geode.test.dunit.VM.getVM;
 import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.assertj.core.api.Fail.fail;
 
 import java.io.File;
 import java.io.Serializable;
@@ -62,6 +61,7 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.DistributedErrorCollector;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.dunit.rules.DistributedRule;
 
@@ -96,6 +96,9 @@ public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
 
+  @Rule
+  public DistributedErrorCollector errorCollector = new DistributedErrorCollector();
+
   private VM locator;
   private VM sender;
   private VM receiver;
@@ -139,9 +142,8 @@ public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
                           blackboard.signalGate(UPDATE_ENTERED_GATE);
                           blackboard.waitForGate(SUSPEND_UPDATE_GATE);
                         } catch (TimeoutException | InterruptedException e) {
-                          fail("message observus interruptus");
+                          errorCollector.addError(e);
                         }
-                        logger.info("BGB: got before process message: " + message);
                       });
                     }
                   };
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java
new file mode 100644
index 0000000000..57e361ada3
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.security.GeneralSecurityException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.security.UnrecoverableKeyException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.DMStats;
+
+/**
+ * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number
+ * of bytes that may be encoded with a key. When that limit is reached, a TLS
+ * KeyUpdate message is generated (by the SSLEngine). That message causes a new
+ * key to be negotiated between the peer SSLEngines.
+ *
+ * Geode's {@link NioSslEngine} class (subclass of {@link NioFilter}) encapsulates
+ * Java's {@link SSLEngine}. Geode's MsgStreamer, Connection, and ConnectionTable
+ * classes interact with NioSslEngine to accomplish peer-to-peer (P2P) messaging.
+ *
+ * This test constructs a pair of SSLEngine's and wraps them in NioSslEngine.
+ * Rather than relying on MsgStreamer, Connection, or ConnectionTable classes
+ * (which themselves have a lot of dependencies on other classes), this test
+ * implements simplified logic for driving the sending and receiving of data,
+ * i.e. calling NioSslEngine.wrap() and NioSslEngine.unwrap(). See
+ * {@link #send(int, NioFilter, SocketChannel, int)} and
+ * {@link #receive(int, NioFilter, SocketChannel, ByteBuffer)}.
+ *
+ * The {@link #keyUpdateDuringSecureDataTransferTest()} arranges for the encrypted bytes limit
+ * to be reached very quickly (see {@link #ENCRYPTED_BYTES_LIMIT}).
+ * The test verifies data transfer continues correctly, after the limit is reached.
+ * This indirectly verifies that the KeyUpdate protocol initiated by the sending
+ * SSLEngine is correctly handled by all the components involved.
+ */
+public class NioSslEngineKeyUpdateTest {
+
+  private static final String TLS_PROTOCOL = "TLSv1.3";
+  private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384";
+
+  // number of bytes the GCM cipher can encrypt before initiating a KeyUpdate
+  private static final int ENCRYPTED_BYTES_LIMIT = 1;
+
+  {
+    Security.setProperty("jdk.tls.keyLimits",
+        "AES/GCM/NoPadding KeyUpdate " + ENCRYPTED_BYTES_LIMIT);
+  }
+
+  private static BufferPool bufferPool;
+  private static SSLContext sslContext;
+  private static KeyStore keystore;
+  private static char[] keystorePassword;
+  private static KeyStore truststore;
+
+  private SSLEngine clientEngine;
+  private SSLEngine serverEngine;
+  private int packetBufferSize;
+
+  @BeforeClass
+  public static void beforeClass() throws GeneralSecurityException, IOException {
+    DMStats mockStats = mock(DMStats.class);
+    bufferPool = new BufferPool(mockStats);
+
+    final Properties securityProperties = createKeystoreAndTruststore();
+
+    keystore = KeyStore.getInstance("JKS");
+    keystorePassword = securityProperties.getProperty(SSL_KEYSTORE_PASSWORD).toCharArray();
+    keystore.load(new FileInputStream(securityProperties.getProperty(SSL_KEYSTORE)),
+        keystorePassword);
+
+    truststore = KeyStore.getInstance("JKS");
+    final char[] truststorePassword =
+        securityProperties.getProperty(SSL_TRUSTSTORE_PASSWORD).toCharArray();
+    truststore.load(new FileInputStream(securityProperties.getProperty(SSL_TRUSTSTORE)),
+        truststorePassword);
+  }
+
+  @Before
+  public void before() throws NoSuchAlgorithmException, UnrecoverableKeyException,
+      KeyStoreException, KeyManagementException {
+    final KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");
+    kmf.init(keystore, keystorePassword);
+    final TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+    tmf.init(truststore);
+
+    sslContext = SSLContext.getInstance("TLS");
+    final SecureRandom random = new SecureRandom(new byte[] {1, 2, 3});
+    sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random);
+
+    final SSLParameters defaultParameters = sslContext.getDefaultSSLParameters();
+    final String[] protocols = defaultParameters.getProtocols();
+    final String[] cipherSuites = defaultParameters.getCipherSuites();
+    System.out.println(
+        String.format("TLS settings (default) before handshake: Protocols: %s, Cipher Suites: %s",
+            Arrays.toString(protocols), Arrays.toString(cipherSuites)));
+
+    clientEngine = createSSLEngine("server-host", true, sslContext);
+    packetBufferSize = clientEngine.getSession().getPacketBufferSize();
+
+    serverEngine = createSSLEngine("client-host", false, sslContext);
+  }
+
+  /*
+   * Verify initial handshake succeeds in the presence of KeyUpdate messages
+   * (i.e. updating send-side cryptographic keys).
+   *
+   * This test verifies, primarily, the behavior of
+   * NioSslEngine.handshake(SocketChannel, int, ByteBuffer)
+   */
+  @Test
+  public void keyUpdateDuringInitialHandshakeTest() {
+    clientServerTest(
+        (channel, filter, peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Client:");
+          return true;
+        },
+        (channel, filter, peerNetData1) -> {
+          handshakeTLS(channel, filter, peerNetData1,
+              "Server:");
+          return true;
+        });
+  }
+
+  /*
+   * Building on keyUpdateDuringInitialHandshakeTest(), this test verifies that
+   * after the handshake succeeds, subsequent data transfer succeeds in the presence
+   * of KeyUpdate messages (i.e. updating send-side cryptographic keys).
+   *
+   * This test verifies, primarily, the behavior of NioSslEngine#wrap(ByteBuffer).
+   */
+  @Test
+  public void keyUpdateDuringSecureDataTransferTest() {
+    clientServerTest(
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Client:");
+          /*
+           * In order to verify that KeyUpdate is properly handled in NioSslEngine.wrap()
+           * we must arrange for the KeyUpdate (generation and processing) to occur after
+           * the initial handshake and before the handshaking during NioSslEngine.close().
+           *
+           * If we call send() only once, regardless of the number of bytes wrapped (even
+           * if it exceeds the encryption byte limit set in jdk.tls.keyLimits, the status
+           * result from SSLEngine.wrap() will be OK. We will fail to encounter the situation
+           * where it is e.g. BUFFER_OVERFLOW.
+           *
+           * By calling send() with bytesToSend >= the limit, we can be sure that the
+           * subsequent call to send() will trigger the KeyUpdate and will require proper
+           * handling of that situation in NioSslEngine.wrap().
+           */
+          send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0);
+          send(ENCRYPTED_BYTES_LIMIT, filter, channel, ENCRYPTED_BYTES_LIMIT);
+
+          return true;
+        },
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Server:");
+          /*
+           * Call receive() twice (like we did for send()) just to test that receive() is
+           * leaving buffers in the correct readable/writable state when it returns.
+           */
+          for (int i = 0; i < 2; i++) {
+            final byte[] received =
+                receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData);
+            assertThat(received).hasSize(ENCRYPTED_BYTES_LIMIT);
+            for (int j = i * ENCRYPTED_BYTES_LIMIT; j < (i + 1)
+                * ENCRYPTED_BYTES_LIMIT; j++) {
+              assertThat(received[j % received.length]).isEqualTo((byte) j);
+            }
+          }
+          return true;
+        });
+  }
+
+  /*
+   * Building on keyUpdateDuringSecureDataTransferTest(), this test verifies that
+   * NioSslEngine.close() succeeds in the presence of KeyUpdate messages
+   * (i.e. updating send-side cryptographic keys). This test is important because
+   * NioSslEngine.close() involves some TLS handshaking.
+   *
+   * This test verifies, primarily, the behavior of NioSslEngine#close(SocketChannel).
+   */
+  @Test
+  public void keyUpdateDuringSocketCloseHandshakeTest() {
+    clientServerTest(
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Client:");
+          /*
+           * Leave send-side SSLEngine in a state where it will generate a KeyUpdate
+           * TLS message during (but not before) NioSslEngine.close().
+           */
+          send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0);
+          return true;
+        },
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Server:");
+          receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData);
+          /*
+           * No need to validate the received data since our purpose is only to verify that
+           * NioSslEngine.close() succeeds cleanly.
+           */
+          return true;
+        });
+  }
+
+  private static SSLEngine createSSLEngine(final String peerHost, final boolean useClientMode,
+      final SSLContext sslContext) {
+    final SSLEngine engine = sslContext.createSSLEngine(peerHost, 10001);
+    engine.setEnabledProtocols(new String[] {TLS_PROTOCOL});
+    engine.setEnabledCipherSuites(new String[] {TLS_CIPHER_SUITE});
+    engine.setUseClientMode(useClientMode);
+    return engine;
+  }
+
+  private void clientServerTest(final PeerAction clientAction, final PeerAction serverAction) {
+    final ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+    final CompletableFuture<SocketAddress> boundAddress = new CompletableFuture<>();
+
+    final CountDownLatch serversWaiting = new CountDownLatch(1);
+
+    final CompletableFuture<Boolean> serverHandshakeFuture =
+        supplyAsync(
+            () -> server(boundAddress, packetBufferSize,
+                serverAction, serversWaiting, serverEngine),
+            executorService);
+
+    final CompletableFuture<Boolean> clientHandshakeFuture =
+        supplyAsync(
+            () -> client(boundAddress, packetBufferSize,
+                clientAction, serversWaiting, clientEngine),
+            executorService);
+
+    CompletableFuture.allOf(serverHandshakeFuture, clientHandshakeFuture)
+        .join();
+  }
+
+  /*
+   * An action taken on a client or server after the SocketChannel has been established.
+   */
+  private interface PeerAction {
+    boolean apply(final SocketChannel acceptedChannel,
+        final NioSslEngine filter,
+        final ByteBuffer peerNetData) throws IOException;
+  }
+
+  private static boolean client(
+      final CompletableFuture<SocketAddress> boundAddress,
+      final int packetBufferSize,
+      final PeerAction peerAction,
+      final CountDownLatch serversWaiting,
+      final SSLEngine engine) {
+    try {
+      try (final SocketChannel connectedChannel = SocketChannel.open()) {
+        connectedChannel.connect(boundAddress.get());
+        final ByteBuffer peerNetData =
+            ByteBuffer.allocateDirect(packetBufferSize);
+
+        final NioSslEngine filter = new NioSslEngine(engine, bufferPool);
+
+        final boolean result =
+            peerAction.apply(connectedChannel, filter, peerNetData);
+
+        serversWaiting.await(); // wait for last server to give up before closing our socket
+
+        filter.close(connectedChannel);
+
+        return result;
+      }
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      printException("In client:", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static boolean server(
+      final CompletableFuture<SocketAddress> boundAddress,
+      final int packetBufferSize,
+      final PeerAction peerAction,
+      final CountDownLatch serversWaiting,
+      final SSLEngine engine) {
+    try (final ServerSocketChannel boundChannel = ServerSocketChannel.open()) {
+      final InetSocketAddress bindAddress =
+          new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+      boundChannel.bind(bindAddress);
+      boundAddress.complete(boundChannel.getLocalAddress());
+      try (final SocketChannel acceptedChannel = boundChannel.accept()) {
+        final ByteBuffer peerNetData =
+            ByteBuffer.allocateDirect(packetBufferSize);
+
+        final NioSslEngine filter = new NioSslEngine(engine, bufferPool);
+
+        final boolean result =
+            peerAction.apply(acceptedChannel, filter, peerNetData);
+
+        filter.close(acceptedChannel);
+
+        return result;
+      }
+    } catch (IOException e) {
+      printException("In server:", e);
+      throw new RuntimeException(e);
+    } finally {
+      serversWaiting.countDown();
+    }
+  }
+
+  private static void printException(final String context, final Exception e) {
+    System.out.println(context + "\n");
+    e.printStackTrace();
+  }
+
+  private static Properties createKeystoreAndTruststore()
+      throws GeneralSecurityException, IOException {
+    final CertificateMaterial ca = new CertificateBuilder()
+        .commonName("Test CA")
+        .isCA()
+        .generate();
+
+    CertificateMaterial serverCertificate = new CertificateBuilder()
+        .commonName("server")
+        .issuedBy(ca)
+        .generate();
+
+    final CertStores serverStore = CertStores.serverStore();
+    serverStore.withCertificate("server", serverCertificate);
+    serverStore.trust("ca", ca);
+    return serverStore.propertiesWith("all", true, false);
+  }
+
+  /**
+   * @param filter is a newly constructed and intitialized object; it has not been used
+   *        for handshakes previously.
+   * @param peerNetData on entry: don't care about read/write state or contents
+   */
+  private static boolean handshakeTLS(final SocketChannel channel,
+      final NioSslEngine filter,
+      final ByteBuffer peerNetData,
+      final String context) throws IOException {
+    final boolean blocking = channel.isBlocking();
+    try {
+      channel.configureBlocking(false);
+      final boolean result =
+          filter.handshake(channel, 6_000, peerNetData);
+      System.out.println(
+          String.format(
+              "%s TLS settings after successful handshake: Protocol: %s, Cipher Suite: %s",
+              context,
+              filter.engine.getSession().getProtocol(),
+              filter.engine.getSession().getCipherSuite()));
+      return result;
+    } finally {
+      channel.configureBlocking(blocking);
+    }
+  }
+
+  /**
+   * This method is trying to do what Connection readMessages() and processInputBuffer() do
+   * together.
+   *
+   * Note well: peerNetData may contain content on entry to this method. Also the filter's
+   * buffers (e.g. the buffer returned by unwrap) may already contain data from previous
+   * calls to unwrap().
+   *
+   * @param peerNetData will be in write mode on entry and may already contain content
+   */
+  private static byte[] receive(
+      final int bytesToReceive,
+      final NioFilter filter,
+      final SocketChannel channel,
+      final ByteBuffer peerNetData) throws IOException {
+    final byte[] received = new byte[bytesToReceive];
+    // peerNetData in write mode
+    peerNetData.flip();
+    // peerNetData in read mode
+    int pos = 0;
+    while (pos < bytesToReceive) {
+      /*
+       * On first iteration unwrap() is called before the channel is read. This is necessary
+       * since a previous call to this method (receive()) could have left data in the filter
+       * and there might not be any more data coming on the channel (ever).
+       *
+       * If no data was already held in the filter's buffer, and peerNetData was empty
+       * before calling unwrap() then the buffer returned by unwrap will be empty. But before
+       * we start the second loop iteration, we'll read (from the channel into peerNetData).
+       *
+       * The filter's unwrap() method takes peerNetData in read mode and when the method returns
+       * the buffer is in write mode (ready for us to add data to it if needed).
+       */
+      try (final ByteBufferSharing appDataSharing = filter.unwrap(peerNetData)) {
+        // peerNetData in write mode
+        final ByteBuffer appData = appDataSharing.getBuffer();
+        // appData in write mode
+        appData.flip();
+        // appData in read mode
+        if (appData.hasRemaining()) {
+          final int newBytes = Math.min(appData.remaining(), received.length - pos);
+          assert pos + newBytes <= received.length;
+          appData.get(received, pos, newBytes);
+          pos += newBytes;
+        } else {
+          channel.read(peerNetData);
+        }
+        peerNetData.flip();
+        // peerNetData in read mode ready for filter unwrap() call
+        appData.compact();
+        // appData in write mode ready for filter unwrap() call
+      }
+    }
+    peerNetData.compact();
+    // peerNetData in write mode
+    return received;
+  }
+
+  /*
+   * This method is trying to do what Connection.writeFully() does.
+   */
+  private static void send(
+      final int bytesToSend,
+      final NioFilter filter,
+      final SocketChannel channel,
+      final int startingValue)
+      throws IOException {
+    // if we wanted to send more than one buffer-full we could add an outer loop
+    final ByteBuffer appData = ByteBuffer.allocateDirect(bytesToSend);
+    for (int i = 0; i < bytesToSend; i++) {
+      appData.put((byte) (i + startingValue));
+    }
+    appData.flip();
+    try (final ByteBufferSharing netDataSharing = filter.wrap(appData)) {
+      final ByteBuffer netData = netDataSharing.getBuffer();
+      while (netData.remaining() > 0) {
+        channel.write(netData);
+      }
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 7831444200..09d99bb0a7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -18,7 +18,7 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
 import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
-import static javax.net.ssl.SSLEngineResult.Status.OK;
+import static javax.net.ssl.SSLEngineResult.Status.CLOSED;
 import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
 import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER;
 
@@ -40,6 +40,7 @@ import javax.net.ssl.SSLSession;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
+import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
 import org.apache.geode.internal.net.ByteBufferVendor.OpenAttemptTimedOut;
@@ -51,6 +52,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
  * Its use should be confined to one thread or should be protected by external synchronization.
  */
 public class NioSslEngine implements NioFilter {
+  @Immutable
+  private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
   private static final Logger logger = LogService.getLogger();
 
   private final BufferPool bufferPool;
@@ -229,7 +232,8 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
+  public ByteBufferSharing wrap(ByteBuffer appData)
+      throws IOException {
     try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
 
       ByteBuffer myNetData = outputSharing.getBuffer();
@@ -237,24 +241,21 @@ public class NioSslEngine implements NioFilter {
       myNetData.clear();
 
       while (appData.hasRemaining()) {
-        // ensure we have lots of capacity since encrypted data might
-        // be larger than the app data
-        int remaining = myNetData.capacity() - myNetData.position();
-
-        if (remaining < (appData.remaining() * 2)) {
-          int newCapacity = expandedCapacity(appData, myNetData);
-          myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+        final SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+        switch (wrapResult.getStatus()) {
+          case BUFFER_OVERFLOW:
+            final int newCapacity =
+                myNetData.position() + engine.getSession().getPacketBufferSize();
+            myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+            break;
+          case BUFFER_UNDERFLOW:
+          case CLOSED:
+            throw new SSLException("Error encrypting data: " + wrapResult);
         }
 
-        SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
-
         if (wrapResult.getHandshakeStatus() == NEED_TASK) {
           handleBlockingTasks();
         }
-
-        if (wrapResult.getStatus() != OK) {
-          throw new SSLException("Error encrypting data: " + wrapResult);
-        }
       }
 
       myNetData.flip();
@@ -373,6 +374,7 @@ public class NioSslEngine implements NioFilter {
 
   @Override
   public synchronized void close(SocketChannel socketChannel) {
+
     if (closed) {
       return;
     }
@@ -381,19 +383,25 @@ public class NioSslEngine implements NioFilter {
     try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) {
       final ByteBuffer myNetData = outputSharing.getBuffer();
 
-      if (!engine.isOutboundDone()) {
-        ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
-        engine.closeOutbound();
+      engine.closeOutbound();
+
+      SSLEngineResult result = null;
+
+      while (!engine.isOutboundDone()) {
 
         // clear the buffer to receive a CLOSE message from the SSLEngine
         myNetData.clear();
 
         // Get close message
-        SSLEngineResult result = engine.wrap(empty, myNetData);
-
-        if (result.getStatus() != SSLEngineResult.Status.CLOSED) {
-          throw new SSLHandshakeException(
-              "Error closing SSL session.  Status=" + result.getStatus());
+        result = engine.wrap(EMPTY_BYTE_BUFFER, myNetData);
+
+        /*
+         * We would have liked to make this one of the while() conditions but
+         * if status is CLOSED we'll get a "Broken pipe" exception in the next write()
+         * so it needs to be handled here.
+         */
+        if (result.getStatus() == CLOSED) {
+          break;
         }
 
         // Send close message to peer
@@ -402,6 +410,10 @@ public class NioSslEngine implements NioFilter {
           socketChannel.write(myNetData);
         }
       }
+      if (result != null && result.getStatus() != CLOSED) {
+        throw new SSLHandshakeException(
+            "Error closing SSL session.  Status=" + result.getStatus());
+      }
     } catch (ClosedChannelException e) {
       // we can't send a close message if the channel is closed
     } catch (IOException e) {
@@ -416,18 +428,13 @@ public class NioSslEngine implements NioFilter {
     }
   }
 
-  private int expandedCapacity(ByteBuffer sourceBuffer, ByteBuffer targetBuffer) {
-    return Math.max(targetBuffer.position() + sourceBuffer.remaining() * 2,
-        targetBuffer.capacity() * 2);
-  }
-
   @VisibleForTesting
-  public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException {
+  public ByteBufferVendor getOutputBufferVendorForTestingOnly() {
     return outputBufferVendor;
   }
 
   @VisibleForTesting
-  public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException {
+  public ByteBufferVendor getInputBufferVendorForTestingOnly() {
     return inputBufferVendor;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 82e3b3da9e..03f3e77ab5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1801,7 +1801,7 @@ public class Connection implements Runnable {
    * checks to see if an exception should not be logged: i.e., "forcibly closed", "reset by peer",
    * or "connection reset"
    */
-  private static boolean isIgnorableIOException(Exception e) {
+  private static boolean isIgnorableIOException(IOException e) {
     if (e instanceof ClosedChannelException) {
       return true;
     }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index d6b9aa6485..7c72d6780b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SocketChannel;
@@ -190,7 +191,7 @@ public class NioSslEngineTest {
   }
 
   @Test
-  public void wrap() throws Exception {
+  public void engineWrapCausesResizeThenSucceeds() throws Exception {
     try (final ByteBufferSharing outputSharing =
         nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
 
@@ -206,6 +207,7 @@ public class NioSslEngineTest {
       // buffer
       TestSSLEngine testEngine = new TestSSLEngine();
       testEngine.addReturnResult(
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0),
           new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
       spyNioSslEngine.engine = testEngine;
 
@@ -217,12 +219,12 @@ public class NioSslEngineTest {
         appData.flip();
         assertThat(wrappedBuffer).isEqualTo(appData);
       }
-      verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+      verify(spyNioSslEngine, times(2)).handleBlockingTasks();
     }
   }
 
   @Test
-  public void wrapFails() throws IOException {
+  public void engineWrapCausesResizeThenCloses() throws IOException {
     try (final ByteBufferSharing outputSharing =
         nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption buffer
@@ -237,10 +239,12 @@ public class NioSslEngineTest {
       // buffer
       TestSSLEngine testEngine = new TestSSLEngine();
       testEngine.addReturnResult(
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0),
           new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
       spyNioSslEngine.engine = testEngine;
 
-      assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+      assertThatThrownBy(() -> spyNioSslEngine.wrap(appData))
+          .isInstanceOf(SSLException.class)
           .hasMessageContaining("Error encrypting data");
     }
   }
@@ -370,7 +374,7 @@ public class NioSslEngineTest {
     when(mockChannel.socket()).thenReturn(mockSocket);
     when(mockSocket.isClosed()).thenReturn(false);
 
-    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
@@ -384,13 +388,13 @@ public class NioSslEngineTest {
   }
 
   @Test
-  public void closeWhenUnwrapError() throws Exception {
+  public void closeWhenWrapError() throws Exception {
     SocketChannel mockChannel = mock(SocketChannel.class);
     Socket mockSocket = mock(Socket.class);
     when(mockChannel.socket()).thenReturn(mockSocket);
     when(mockSocket.isClosed()).thenReturn(true);
 
-    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(BUFFER_OVERFLOW, FINISHED, 0, 0));
     assertThatThrownBy(() -> nioSslEngine.close(mockChannel)).isInstanceOf(GemFireIOException.class)
@@ -405,14 +409,13 @@ public class NioSslEngineTest {
     when(mockChannel.socket()).thenReturn(mockSocket);
     when(mockSocket.isClosed()).thenReturn(true);
 
-    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
       try (final ByteBufferSharing outputSharing =
           nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
         // give the NioSslEngine something to write on its socket channel, simulating a TLS close
         // message
         outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
-        return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+        return new SSLEngineResult(OK, NEED_UNWRAP, 0, 0);
       }
     });
     when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
@@ -605,10 +608,19 @@ public class NioSslEngineTest {
 
     @Override
     public SSLEngineResult wrap(ByteBuffer[] sources, int i, int i1, ByteBuffer destination) {
-      for (ByteBuffer source : sources) {
+      assertThat(sources.length)
+          .as("test unexpectedly tried to wrap with multiple sources")
+          .isEqualTo(1);
+      final ByteBuffer source = sources[0];
+      final SSLEngineResult nextResult = nextResult();
+      try {
         destination.put(source);
+      } catch (final BufferOverflowException e) {
+        assertThat(BUFFER_OVERFLOW)
+            .as("got unexpected buffer overflow")
+            .isEqualTo(nextResult.getStatus());
       }
-      return nextResult();
+      return nextResult;
     }
 
     @Override
@@ -672,7 +684,9 @@ public class NioSslEngineTest {
 
     @Override
     public SSLSession getSession() {
-      return null;
+      final SSLSession session = mock(SSLSession.class);
+      when(session.getPacketBufferSize()).thenReturn(16 * 1024);
+      return session;
     }
 
     @Override