You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2022/04/27 21:05:36 UTC

[geode] branch support/1.12 updated (1319ca0d66 -> e961ed9090)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a change to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


    from 1319ca0d66 GEODE-10229: GII image should fill disk region RVV's exceptions. (#7602)
     new b4c1cf8b3d GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message  (#7449) (#7615)
     new e961ed9090 GEODE-10122: fix quorm loss in test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/P2PMessagingConcurrencyDUnitTest.java |   2 +-
 ...P2pMessagingSslTlsKeyUpdateDistributedTest.java | 377 ++++++++++++++++
 .../tcp/ConnectionCloseSSLTLSDUnitTest.java        |   1 -
 .../internal/net/NioSslEngineKeyUpdateTest.java    | 497 +++++++++++++++++++++
 .../apache/geode/internal/net/NioSslEngine.java    |  67 +--
 .../org/apache/geode/internal/tcp/Connection.java  |   2 +-
 .../geode/internal/net/NioSslEngineTest.java       |  38 +-
 7 files changed, 939 insertions(+), 45 deletions(-)
 create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
 create mode 100644 geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java


[geode] 01/02: GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message (#7449) (#7615)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b4c1cf8b3de150dc5e052f89608bee5da0222657
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Tue Apr 26 14:54:03 2022 -0700

    GEODE-10122: P2P Messaging Handles TLS KeyUpdate Message  (#7449) (#7615)
    
    * Key expiration works for TLSv1.3 and GCM-based ciphers
    * TLS KeyUpdate messages are processed correctly
    * Removed dependencies on: Mockito 4, JUnit 5, GeodeParamsRunner
    * Removed dependency on DistributedBlackboard rule
    (cherry picked from commit d2535394a82ac5faf10f004f4e3c15f756f7b177)
    (cherry picked from commit 07c08e95025ff955c9b361db4b97902ce722be81)
---
 .../internal/P2PMessagingConcurrencyDUnitTest.java |   2 +-
 ...P2pMessagingSslTlsKeyUpdateDistributedTest.java | 367 +++++++++++++++
 .../tcp/ConnectionCloseSSLTLSDUnitTest.java        |   1 -
 .../internal/net/NioSslEngineKeyUpdateTest.java    | 497 +++++++++++++++++++++
 .../apache/geode/internal/net/NioSslEngine.java    |  67 +--
 .../org/apache/geode/internal/tcp/Connection.java  |   2 +-
 .../geode/internal/net/NioSslEngineTest.java       |  38 +-
 7 files changed, 929 insertions(+), 45 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
index 4c8677c43f..ad975886c7 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java
@@ -167,7 +167,6 @@ public class P2PMessagingConcurrencyDUnitTest {
       bytesTransferredAdder = new LongAdder();
 
       final ClusterDistributionManager cdm = getCDM();
-      final Random random = new Random(RANDOM_SEED);
       final AtomicInteger nextSenderId = new AtomicInteger();
 
       /*
@@ -194,6 +193,7 @@ public class P2PMessagingConcurrencyDUnitTest {
           throw new RuntimeException("doSending failed", e);
         }
         final int firstMessageId = senderId * SENDER_COUNT;
+        final Random random = new Random(RANDOM_SEED);
         for (int messageId = firstMessageId; messageId < firstMessageId
             + MESSAGES_PER_SENDER; messageId++) {
           final TestMessage msg = new TestMessage(receiverMember, random, messageId);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
new file mode 100644
index 0000000000..3a887d5521
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.distributed.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_CIPHERS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.security.GeneralSecurityException;
+import java.security.Security;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.LongAdder;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.test.version.VersionManager;
+
+/**
+ * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number
+ * of bytes that may be encoded with a key. When that limit is reached, a TLS
+ * KeyUpdate message is generated (by the SSLEngine). That message causes a new
+ * key to be negotiated between the peer SSLEngines.
+ *
+ * This test arranges for a low encryption byte limit to be set for the sending member
+ * and then for the receiving member. With the low byte limit configured, the test
+ * sends P2P messages via TLS and verifies request-reply message processing.
+ */
+@Category({MembershipTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class P2pMessagingSslTlsKeyUpdateDistributedTest {
+
+  private static final String TLS_PROTOCOL = "TLSv1.3";
+  private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384";
+
+  private static final int ENCRYPTED_BYTES_LIMIT = 64 * 1024;
+
+  private static final int MESSAGE_SIZE = 1024;
+
+  /*
+   * How many messages will be generated? We generate enough to cause KeyUpdate
+   * to be generated, and then we generate many more beyond that. Even with buggy
+   * wrap/unwrap logic, the retries in DirectChannel.sendToMany() and the transparent
+   * connection reestablishment in ConnectionTable can mask those bugs. So to reliably
+   * fail in the presence of bugs we need to generate lots of extra messages.
+   */
+  private static final int MESSAGES_PER_SENDER = ENCRYPTED_BYTES_LIMIT / MESSAGE_SIZE + 2000;
+
+  {
+    assertThat(MESSAGE_SIZE * MESSAGES_PER_SENDER > 10 * ENCRYPTED_BYTES_LIMIT);
+  }
+
+  public static final int MAX_REPLY_WAIT_MILLIS = 1_000;
+
+  private static Properties geodeConfigurationProperties;
+
+  @Rule
+  public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  private MemberVM sender;
+  private MemberVM receiver;
+
+  @After
+  public void afterEach() {
+    clusterStartupRule.getVM(0).bounceForcibly();
+    clusterStartupRule.getVM(1).bounceForcibly();
+    clusterStartupRule.getVM(2).bounceForcibly();
+  }
+
+  /*
+   * bytes sent on sender JVM, bytes received on receiver JVM
+   * (not used in test JVM)
+   */
+  private static LongAdder bytesTransferredAdder;
+
+  // in receiver JVM only
+  private static LongAdder repliesGeneratedAdder;
+
+  // in sender JVM only
+  private static LongAdder repliesReceivedAdder;
+
+
+  private void configureJVMsAndStartClusterMembers(
+      final long locatorEncryptedBytesLimit,
+      final long senderEncryptedBytesLimit,
+      final long receiverEncryptedBytesLimit)
+      throws GeneralSecurityException, IOException {
+
+    clusterStartupRule.getVM(0).invoke(
+        setSecurityProperties(locatorEncryptedBytesLimit));
+    clusterStartupRule.getVM(1).invoke(
+        setSecurityProperties(senderEncryptedBytesLimit));
+    clusterStartupRule.getVM(2).invoke(
+        setSecurityProperties(receiverEncryptedBytesLimit));
+
+    final Properties senderConfiguration = geodeConfigurationProperties();
+    final Properties receiverConfiguration = geodeConfigurationProperties();
+
+    final MemberVM locator =
+        clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION,
+            x -> x.withProperties(senderConfiguration).withConnectionToLocator()
+                .withoutClusterConfigurationService().withoutManagementRestService());
+
+    sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort());
+    receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort());
+  }
+
+  private @NotNull SerializableRunnableIF setSecurityProperties(final long encryptedBytesLimit) {
+    return () -> {
+      Security.setProperty("jdk.tls.keyLimits",
+          "AES/GCM/NoPadding KeyUpdate " + encryptedBytesLimit);
+
+      final Class<?> sslCipher = Class.forName("sun.security.ssl.SSLCipher");
+      final Field cipherLimits = sslCipher.getDeclaredField("cipherLimits");
+      cipherLimits.setAccessible(true);
+      assertThat((Map<String, Long>) cipherLimits.get(null)).containsEntry(
+          "AES/GCM/NOPADDING:KEYUPDATE",
+          encryptedBytesLimit);
+    };
+  }
+
+  @Test
+  @Parameters({
+      "137438953472, 65536, 137438953472",
+      "137438953472, 137438953472, 65536",
+  })
+  public void testP2PMessagingWithKeyUpdate(
+      final long locatorEncryptedBytesLimit,
+      final long senderEncryptedBytesLimit,
+      final long receiverEncryptedBytesLimit)
+      throws GeneralSecurityException, IOException {
+
+    configureJVMsAndStartClusterMembers(locatorEncryptedBytesLimit, senderEncryptedBytesLimit,
+        receiverEncryptedBytesLimit);
+
+    final InternalDistributedMember receiverMember =
+        receiver.invoke("get receiving member id", () -> {
+
+          bytesTransferredAdder = new LongAdder();
+          repliesGeneratedAdder = new LongAdder();
+
+          final ClusterDistributionManager cdm = getCDM();
+          final InternalDistributedMember localMember = cdm.getDistribution().getLocalMember();
+          return localMember;
+        });
+
+    // by returning a value from the invoked lambda we make invocation synchronous
+    final Boolean sendingComplete =
+        sender.invoke("message sending and reply counting", () -> {
+
+          bytesTransferredAdder = new LongAdder();
+          repliesReceivedAdder = new LongAdder();
+
+          final ClusterDistributionManager cdm = getCDM();
+
+          int failedRecipientCount = 0;
+          int droppedRepliesCount = 0;
+
+          final ReplyProcessor21[] replyProcessors = new ReplyProcessor21[MESSAGES_PER_SENDER];
+
+          // this loop sends request messages
+          for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) {
+
+            final ReplyProcessor21 replyProcessor = new ReplyProcessor21(cdm, receiverMember);
+            replyProcessors[messageId] = replyProcessor;
+            final TestMessage msg = new TestMessage(messageId, receiverMember,
+                replyProcessor.getProcessorId());
+
+            final Set<InternalDistributedMember> failedRecipients = cdm.putOutgoing(msg);
+            if (failedRecipients == null) {
+              bytesTransferredAdder.add(MESSAGE_SIZE);
+            } else {
+              failedRecipientCount += failedRecipients.size();
+            }
+          }
+
+          // this loop counts reply arrivals
+          for (int messageId = 0; messageId < MESSAGES_PER_SENDER; messageId++) {
+            final ReplyProcessor21 replyProcessor = replyProcessors[messageId];
+            final boolean receivedReply =
+                replyProcessor.waitForRepliesUninterruptibly(MAX_REPLY_WAIT_MILLIS);
+            if (receivedReply) {
+              repliesReceivedAdder.increment();
+            } else {
+              droppedRepliesCount += 1;
+            }
+          }
+
+          assertThat((long) failedRecipientCount).as("message delivery failed N times").isZero();
+          assertThat((long) droppedRepliesCount).as("some replies were dropped").isZero();
+          return true;
+        });
+
+    // at this point, sender is done sending
+    final long bytesSent = getByteCount(sender);
+
+    await().untilAsserted(
+        () -> {
+          assertThat(getRepliesGenerated()).isEqualTo(MESSAGES_PER_SENDER);
+          assertThat(getRepliesReceived()).isEqualTo(MESSAGES_PER_SENDER);
+          assertThat(getByteCount(receiver))
+              .as("bytes received != bytes sent")
+              .isEqualTo(bytesSent);
+        });
+  }
+
+  private long getRepliesGenerated() {
+    return receiver.invoke(() -> repliesGeneratedAdder.sum());
+  }
+
+  private long getRepliesReceived() {
+    return sender.invoke(() -> repliesReceivedAdder.sum());
+  }
+
+  private long getByteCount(final MemberVM member) {
+    return member.invoke(() -> bytesTransferredAdder.sum());
+  }
+
+  private static ClusterDistributionManager getCDM() {
+    return (ClusterDistributionManager) ((InternalCache) CacheFactory.getAnyInstance())
+        .getDistributionManager();
+  }
+
+  private static class TestMessage extends DistributionMessage {
+    private volatile int messageId;
+    private volatile int replyProcessorId;
+    private volatile int length;
+
+    TestMessage(final int messageId,
+        final InternalDistributedMember receiver,
+        final int replyProcessorId) {
+      setRecipient(receiver);
+      this.messageId = messageId;
+      this.replyProcessorId = replyProcessorId;
+    }
+
+    // necessary for deserialization
+    public TestMessage() {
+      messageId = 0;
+      replyProcessorId = 0;
+    }
+
+    @Override
+    public int getProcessorType() {
+      return OperationExecutors.STANDARD_EXECUTOR;
+    }
+
+    @Override
+    protected void process(final ClusterDistributionManager dm) {
+
+      // In case bugs cause fromData to be called more times than this method,
+      // we don't count the bytes as "transferred" until we're in this method.
+      bytesTransferredAdder.add(length);
+
+      final ReplyMessage replyMsg = new ReplyMessage();
+      replyMsg.setRecipient(getSender());
+      replyMsg.setProcessorId(replyProcessorId);
+      replyMsg.setReturnValue("howdy!");
+      dm.putOutgoing(replyMsg);
+      repliesGeneratedAdder.increment();
+    }
+
+    @Override
+    public void toData(final DataOutput out, final SerializationContext context)
+        throws IOException {
+      super.toData(out, context);
+
+      out.writeInt(messageId);
+      out.writeInt(replyProcessorId);
+
+      final ThreadLocalRandom random = ThreadLocalRandom.current();
+      final int length = MESSAGE_SIZE;
+
+      out.writeInt(length);
+
+      final byte[] payload = new byte[length];
+      random.nextBytes(payload);
+
+      out.write(payload);
+    }
+
+    @Override
+    public void fromData(final DataInput in, final DeserializationContext context)
+        throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+
+      messageId = in.readInt();
+      replyProcessorId = in.readInt();
+
+      length = in.readInt();
+
+      final byte[] payload = new byte[length];
+
+      in.readFully(payload);
+    }
+
+    @Override
+    public int getDSFID() {
+      return NO_FIXED_ID; // for testing only!
+    }
+  }
+
+  private static @NotNull Properties geodeConfigurationProperties()
+      throws GeneralSecurityException, IOException {
+    // subsequent calls must return the same value so members agree on credentials
+    if (geodeConfigurationProperties == null) {
+      final CertificateMaterial ca = new CertificateBuilder()
+          .commonName("Test CA")
+          .isCA()
+          .generate();
+
+      final CertificateMaterial serverCertificate = new CertificateBuilder()
+          .commonName("member")
+          .issuedBy(ca)
+          .generate();
+
+      final CertStores memberStore = new CertStores("member");
+      memberStore.withCertificate("member", serverCertificate);
+      memberStore.trust("ca", ca);
+      // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc
+      final Properties props = memberStore.propertiesWith("all", false, false);
+      props.setProperty(SSL_PROTOCOLS, TLS_PROTOCOL);
+      props.setProperty(SSL_CIPHERS, TLS_CIPHER_SUITE);
+      geodeConfigurationProperties = props;
+    }
+    return geodeConfigurationProperties;
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
index 586cd53ebd..c3043493d8 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
@@ -138,7 +138,6 @@ public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
                         } catch (TimeoutException | InterruptedException e) {
                           fail("message observus interruptus");
                         }
-                        logger.info("BGB: got before process message: " + message);
                       });
                     }
                   };
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java
new file mode 100644
index 0000000000..57e361ada3
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/NioSslEngineKeyUpdateTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.security.GeneralSecurityException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.security.UnrecoverableKeyException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.geode.cache.ssl.CertStores;
+import org.apache.geode.cache.ssl.CertificateBuilder;
+import org.apache.geode.cache.ssl.CertificateMaterial;
+import org.apache.geode.distributed.internal.DMStats;
+
+/**
+ * In TLSv1.3, when a GCM-based cipher is used, there is a limit on the number
+ * of bytes that may be encoded with a key. When that limit is reached, a TLS
+ * KeyUpdate message is generated (by the SSLEngine). That message causes a new
+ * key to be negotiated between the peer SSLEngines.
+ *
+ * Geode's {@link NioSslEngine} class (subclass of {@link NioFilter}) encapsulates
+ * Java's {@link SSLEngine}. Geode's MsgStreamer, Connection, and ConnectionTable
+ * classes interact with NioSslEngine to accomplish peer-to-peer (P2P) messaging.
+ *
+ * This test constructs a pair of SSLEngine's and wraps them in NioSslEngine.
+ * Rather than relying on MsgStreamer, Connection, or ConnectionTable classes
+ * (which themselves have a lot of dependencies on other classes), this test
+ * implements simplified logic for driving the sending and receiving of data,
+ * i.e. calling NioSslEngine.wrap() and NioSslEngine.unwrap(). See
+ * {@link #send(int, NioFilter, SocketChannel, int)} and
+ * {@link #receive(int, NioFilter, SocketChannel, ByteBuffer)}.
+ *
+ * The {@link #keyUpdateDuringSecureDataTransferTest()} arranges for the encrypted bytes limit
+ * to be reached very quickly (see {@link #ENCRYPTED_BYTES_LIMIT}).
+ * The test verifies data transfer continues correctly, after the limit is reached.
+ * This indirectly verifies that the KeyUpdate protocol initiated by the sending
+ * SSLEngine is correctly handled by all the components involved.
+ */
+public class NioSslEngineKeyUpdateTest {
+
+  private static final String TLS_PROTOCOL = "TLSv1.3";
+  private static final String TLS_CIPHER_SUITE = "TLS_AES_256_GCM_SHA384";
+
+  // number of bytes the GCM cipher can encrypt before initiating a KeyUpdate
+  private static final int ENCRYPTED_BYTES_LIMIT = 1;
+
+  {
+    Security.setProperty("jdk.tls.keyLimits",
+        "AES/GCM/NoPadding KeyUpdate " + ENCRYPTED_BYTES_LIMIT);
+  }
+
+  private static BufferPool bufferPool;
+  private static SSLContext sslContext;
+  private static KeyStore keystore;
+  private static char[] keystorePassword;
+  private static KeyStore truststore;
+
+  private SSLEngine clientEngine;
+  private SSLEngine serverEngine;
+  private int packetBufferSize;
+
+  @BeforeClass
+  public static void beforeClass() throws GeneralSecurityException, IOException {
+    DMStats mockStats = mock(DMStats.class);
+    bufferPool = new BufferPool(mockStats);
+
+    final Properties securityProperties = createKeystoreAndTruststore();
+
+    keystore = KeyStore.getInstance("JKS");
+    keystorePassword = securityProperties.getProperty(SSL_KEYSTORE_PASSWORD).toCharArray();
+    keystore.load(new FileInputStream(securityProperties.getProperty(SSL_KEYSTORE)),
+        keystorePassword);
+
+    truststore = KeyStore.getInstance("JKS");
+    final char[] truststorePassword =
+        securityProperties.getProperty(SSL_TRUSTSTORE_PASSWORD).toCharArray();
+    truststore.load(new FileInputStream(securityProperties.getProperty(SSL_TRUSTSTORE)),
+        truststorePassword);
+  }
+
+  @Before
+  public void before() throws NoSuchAlgorithmException, UnrecoverableKeyException,
+      KeyStoreException, KeyManagementException {
+    final KeyManagerFactory kmf = KeyManagerFactory.getInstance("PKIX");
+    kmf.init(keystore, keystorePassword);
+    final TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+    tmf.init(truststore);
+
+    sslContext = SSLContext.getInstance("TLS");
+    final SecureRandom random = new SecureRandom(new byte[] {1, 2, 3});
+    sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), random);
+
+    final SSLParameters defaultParameters = sslContext.getDefaultSSLParameters();
+    final String[] protocols = defaultParameters.getProtocols();
+    final String[] cipherSuites = defaultParameters.getCipherSuites();
+    System.out.println(
+        String.format("TLS settings (default) before handshake: Protocols: %s, Cipher Suites: %s",
+            Arrays.toString(protocols), Arrays.toString(cipherSuites)));
+
+    clientEngine = createSSLEngine("server-host", true, sslContext);
+    packetBufferSize = clientEngine.getSession().getPacketBufferSize();
+
+    serverEngine = createSSLEngine("client-host", false, sslContext);
+  }
+
+  /*
+   * Verify initial handshake succeeds in the presence of KeyUpdate messages
+   * (i.e. updating send-side cryptographic keys).
+   *
+   * This test verifies, primarily, the behavior of
+   * NioSslEngine.handshake(SocketChannel, int, ByteBuffer)
+   */
+  @Test
+  public void keyUpdateDuringInitialHandshakeTest() {
+    clientServerTest(
+        (channel, filter, peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Client:");
+          return true;
+        },
+        (channel, filter, peerNetData1) -> {
+          handshakeTLS(channel, filter, peerNetData1,
+              "Server:");
+          return true;
+        });
+  }
+
+  /*
+   * Building on keyUpdateDuringInitialHandshakeTest(), this test verifies that
+   * after the handshake succeeds, subsequent data transfer succeeds in the presence
+   * of KeyUpdate messages (i.e. updating send-side cryptographic keys).
+   *
+   * This test verifies, primarily, the behavior of NioSslEngine#wrap(ByteBuffer).
+   */
+  @Test
+  public void keyUpdateDuringSecureDataTransferTest() {
+    clientServerTest(
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Client:");
+          /*
+           * In order to verify that KeyUpdate is properly handled in NioSslEngine.wrap()
+           * we must arrange for the KeyUpdate (generation and processing) to occur after
+           * the initial handshake and before the handshaking during NioSslEngine.close().
+           *
+           * If we call send() only once, regardless of the number of bytes wrapped (even
+           * if it exceeds the encryption byte limit set in jdk.tls.keyLimits, the status
+           * result from SSLEngine.wrap() will be OK. We will fail to encounter the situation
+           * where it is e.g. BUFFER_OVERFLOW.
+           *
+           * By calling send() with bytesToSend >= the limit, we can be sure that the
+           * subsequent call to send() will trigger the KeyUpdate and will require proper
+           * handling of that situation in NioSslEngine.wrap().
+           */
+          send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0);
+          send(ENCRYPTED_BYTES_LIMIT, filter, channel, ENCRYPTED_BYTES_LIMIT);
+
+          return true;
+        },
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Server:");
+          /*
+           * Call receive() twice (like we did for send()) just to test that receive() is
+           * leaving buffers in the correct readable/writable state when it returns.
+           */
+          for (int i = 0; i < 2; i++) {
+            final byte[] received =
+                receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData);
+            assertThat(received).hasSize(ENCRYPTED_BYTES_LIMIT);
+            for (int j = i * ENCRYPTED_BYTES_LIMIT; j < (i + 1)
+                * ENCRYPTED_BYTES_LIMIT; j++) {
+              assertThat(received[j % received.length]).isEqualTo((byte) j);
+            }
+          }
+          return true;
+        });
+  }
+
+  /*
+   * Building on keyUpdateDuringSecureDataTransferTest(), this test verifies that
+   * NioSslEngine.close() succeeds in the presence of KeyUpdate messages
+   * (i.e. updating send-side cryptographic keys). This test is important because
+   * NioSslEngine.close() involves some TLS handshaking.
+   *
+   * This test verifies, primarily, the behavior of NioSslEngine#close(SocketChannel).
+   */
+  @Test
+  public void keyUpdateDuringSocketCloseHandshakeTest() {
+    clientServerTest(
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Client:");
+          /*
+           * Leave send-side SSLEngine in a state where it will generate a KeyUpdate
+           * TLS message during (but not before) NioSslEngine.close().
+           */
+          send(ENCRYPTED_BYTES_LIMIT, filter, channel, 0);
+          return true;
+        },
+        (final SocketChannel channel,
+            final NioSslEngine filter,
+            final ByteBuffer peerNetData) -> {
+          handshakeTLS(channel, filter, peerNetData, "Server:");
+          receive(ENCRYPTED_BYTES_LIMIT, filter, channel, peerNetData);
+          /*
+           * No need to validate the received data since our purpose is only to verify that
+           * NioSslEngine.close() succeeds cleanly.
+           */
+          return true;
+        });
+  }
+
+  private static SSLEngine createSSLEngine(final String peerHost, final boolean useClientMode,
+      final SSLContext sslContext) {
+    final SSLEngine engine = sslContext.createSSLEngine(peerHost, 10001);
+    engine.setEnabledProtocols(new String[] {TLS_PROTOCOL});
+    engine.setEnabledCipherSuites(new String[] {TLS_CIPHER_SUITE});
+    engine.setUseClientMode(useClientMode);
+    return engine;
+  }
+
+  private void clientServerTest(final PeerAction clientAction, final PeerAction serverAction) {
+    final ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+    final CompletableFuture<SocketAddress> boundAddress = new CompletableFuture<>();
+
+    final CountDownLatch serversWaiting = new CountDownLatch(1);
+
+    final CompletableFuture<Boolean> serverHandshakeFuture =
+        supplyAsync(
+            () -> server(boundAddress, packetBufferSize,
+                serverAction, serversWaiting, serverEngine),
+            executorService);
+
+    final CompletableFuture<Boolean> clientHandshakeFuture =
+        supplyAsync(
+            () -> client(boundAddress, packetBufferSize,
+                clientAction, serversWaiting, clientEngine),
+            executorService);
+
+    CompletableFuture.allOf(serverHandshakeFuture, clientHandshakeFuture)
+        .join();
+  }
+
+  /*
+   * An action taken on a client or server after the SocketChannel has been established.
+   */
+  private interface PeerAction {
+    boolean apply(final SocketChannel acceptedChannel,
+        final NioSslEngine filter,
+        final ByteBuffer peerNetData) throws IOException;
+  }
+
+  private static boolean client(
+      final CompletableFuture<SocketAddress> boundAddress,
+      final int packetBufferSize,
+      final PeerAction peerAction,
+      final CountDownLatch serversWaiting,
+      final SSLEngine engine) {
+    try {
+      try (final SocketChannel connectedChannel = SocketChannel.open()) {
+        connectedChannel.connect(boundAddress.get());
+        final ByteBuffer peerNetData =
+            ByteBuffer.allocateDirect(packetBufferSize);
+
+        final NioSslEngine filter = new NioSslEngine(engine, bufferPool);
+
+        final boolean result =
+            peerAction.apply(connectedChannel, filter, peerNetData);
+
+        serversWaiting.await(); // wait for last server to give up before closing our socket
+
+        filter.close(connectedChannel);
+
+        return result;
+      }
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      printException("In client:", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static boolean server(
+      final CompletableFuture<SocketAddress> boundAddress,
+      final int packetBufferSize,
+      final PeerAction peerAction,
+      final CountDownLatch serversWaiting,
+      final SSLEngine engine) {
+    try (final ServerSocketChannel boundChannel = ServerSocketChannel.open()) {
+      final InetSocketAddress bindAddress =
+          new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+      boundChannel.bind(bindAddress);
+      boundAddress.complete(boundChannel.getLocalAddress());
+      try (final SocketChannel acceptedChannel = boundChannel.accept()) {
+        final ByteBuffer peerNetData =
+            ByteBuffer.allocateDirect(packetBufferSize);
+
+        final NioSslEngine filter = new NioSslEngine(engine, bufferPool);
+
+        final boolean result =
+            peerAction.apply(acceptedChannel, filter, peerNetData);
+
+        filter.close(acceptedChannel);
+
+        return result;
+      }
+    } catch (IOException e) {
+      printException("In server:", e);
+      throw new RuntimeException(e);
+    } finally {
+      serversWaiting.countDown();
+    }
+  }
+
+  private static void printException(final String context, final Exception e) {
+    System.out.println(context + "\n");
+    e.printStackTrace();
+  }
+
+  private static Properties createKeystoreAndTruststore()
+      throws GeneralSecurityException, IOException {
+    final CertificateMaterial ca = new CertificateBuilder()
+        .commonName("Test CA")
+        .isCA()
+        .generate();
+
+    CertificateMaterial serverCertificate = new CertificateBuilder()
+        .commonName("server")
+        .issuedBy(ca)
+        .generate();
+
+    final CertStores serverStore = CertStores.serverStore();
+    serverStore.withCertificate("server", serverCertificate);
+    serverStore.trust("ca", ca);
+    return serverStore.propertiesWith("all", true, false);
+  }
+
+  /**
+   * @param filter is a newly constructed and intitialized object; it has not been used
+   *        for handshakes previously.
+   * @param peerNetData on entry: don't care about read/write state or contents
+   */
+  private static boolean handshakeTLS(final SocketChannel channel,
+      final NioSslEngine filter,
+      final ByteBuffer peerNetData,
+      final String context) throws IOException {
+    final boolean blocking = channel.isBlocking();
+    try {
+      channel.configureBlocking(false);
+      final boolean result =
+          filter.handshake(channel, 6_000, peerNetData);
+      System.out.println(
+          String.format(
+              "%s TLS settings after successful handshake: Protocol: %s, Cipher Suite: %s",
+              context,
+              filter.engine.getSession().getProtocol(),
+              filter.engine.getSession().getCipherSuite()));
+      return result;
+    } finally {
+      channel.configureBlocking(blocking);
+    }
+  }
+
+  /**
+   * This method is trying to do what Connection readMessages() and processInputBuffer() do
+   * together.
+   *
+   * Note well: peerNetData may contain content on entry to this method. Also the filter's
+   * buffers (e.g. the buffer returned by unwrap) may already contain data from previous
+   * calls to unwrap().
+   *
+   * @param peerNetData will be in write mode on entry and may already contain content
+   */
+  private static byte[] receive(
+      final int bytesToReceive,
+      final NioFilter filter,
+      final SocketChannel channel,
+      final ByteBuffer peerNetData) throws IOException {
+    final byte[] received = new byte[bytesToReceive];
+    // peerNetData in write mode
+    peerNetData.flip();
+    // peerNetData in read mode
+    int pos = 0;
+    while (pos < bytesToReceive) {
+      /*
+       * On first iteration unwrap() is called before the channel is read. This is necessary
+       * since a previous call to this method (receive()) could have left data in the filter
+       * and there might not be any more data coming on the channel (ever).
+       *
+       * If no data was already held in the filter's buffer, and peerNetData was empty
+       * before calling unwrap() then the buffer returned by unwrap will be empty. But before
+       * we start the second loop iteration, we'll read (from the channel into peerNetData).
+       *
+       * The filter's unwrap() method takes peerNetData in read mode and when the method returns
+       * the buffer is in write mode (ready for us to add data to it if needed).
+       */
+      try (final ByteBufferSharing appDataSharing = filter.unwrap(peerNetData)) {
+        // peerNetData in write mode
+        final ByteBuffer appData = appDataSharing.getBuffer();
+        // appData in write mode
+        appData.flip();
+        // appData in read mode
+        if (appData.hasRemaining()) {
+          final int newBytes = Math.min(appData.remaining(), received.length - pos);
+          assert pos + newBytes <= received.length;
+          appData.get(received, pos, newBytes);
+          pos += newBytes;
+        } else {
+          channel.read(peerNetData);
+        }
+        peerNetData.flip();
+        // peerNetData in read mode ready for filter unwrap() call
+        appData.compact();
+        // appData in write mode ready for filter unwrap() call
+      }
+    }
+    peerNetData.compact();
+    // peerNetData in write mode
+    return received;
+  }
+
+  /*
+   * This method is trying to do what Connection.writeFully() does.
+   */
+  private static void send(
+      final int bytesToSend,
+      final NioFilter filter,
+      final SocketChannel channel,
+      final int startingValue)
+      throws IOException {
+    // if we wanted to send more than one buffer-full we could add an outer loop
+    final ByteBuffer appData = ByteBuffer.allocateDirect(bytesToSend);
+    for (int i = 0; i < bytesToSend; i++) {
+      appData.put((byte) (i + startingValue));
+    }
+    appData.flip();
+    try (final ByteBufferSharing netDataSharing = filter.wrap(appData)) {
+      final ByteBuffer netData = netDataSharing.getBuffer();
+      while (netData.remaining() > 0) {
+        channel.write(netData);
+      }
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 1bae2522f7..ae2cfeb5b4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -18,7 +18,7 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
 import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
-import static javax.net.ssl.SSLEngineResult.Status.OK;
+import static javax.net.ssl.SSLEngineResult.Status.CLOSED;
 import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
 import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER;
 
@@ -40,6 +40,7 @@ import javax.net.ssl.SSLSession;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
+import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
 import org.apache.geode.internal.net.ByteBufferVendor.OpenAttemptTimedOut;
@@ -51,6 +52,8 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
  * Its use should be confined to one thread or should be protected by external synchronization.
  */
 public class NioSslEngine implements NioFilter {
+  @Immutable
+  private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
   private static final Logger logger = LogService.getLogger();
 
   private final BufferPool bufferPool;
@@ -229,7 +232,8 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
+  public ByteBufferSharing wrap(ByteBuffer appData)
+      throws IOException {
     try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
 
       ByteBuffer myNetData = outputSharing.getBuffer();
@@ -237,24 +241,21 @@ public class NioSslEngine implements NioFilter {
       myNetData.clear();
 
       while (appData.hasRemaining()) {
-        // ensure we have lots of capacity since encrypted data might
-        // be larger than the app data
-        int remaining = myNetData.capacity() - myNetData.position();
-
-        if (remaining < (appData.remaining() * 2)) {
-          int newCapacity = expandedCapacity(appData, myNetData);
-          myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+        final SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+        switch (wrapResult.getStatus()) {
+          case BUFFER_OVERFLOW:
+            final int newCapacity =
+                myNetData.position() + engine.getSession().getPacketBufferSize();
+            myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+            break;
+          case BUFFER_UNDERFLOW:
+          case CLOSED:
+            throw new SSLException("Error encrypting data: " + wrapResult);
         }
 
-        SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
-
         if (wrapResult.getHandshakeStatus() == NEED_TASK) {
           handleBlockingTasks();
         }
-
-        if (wrapResult.getStatus() != OK) {
-          throw new SSLException("Error encrypting data: " + wrapResult);
-        }
       }
 
       myNetData.flip();
@@ -372,6 +373,7 @@ public class NioSslEngine implements NioFilter {
 
   @Override
   public synchronized void close(SocketChannel socketChannel) {
+
     if (closed) {
       return;
     }
@@ -380,19 +382,25 @@ public class NioSslEngine implements NioFilter {
     try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) {
       final ByteBuffer myNetData = outputSharing.getBuffer();
 
-      if (!engine.isOutboundDone()) {
-        ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
-        engine.closeOutbound();
+      engine.closeOutbound();
+
+      SSLEngineResult result = null;
+
+      while (!engine.isOutboundDone()) {
 
         // clear the buffer to receive a CLOSE message from the SSLEngine
         myNetData.clear();
 
         // Get close message
-        SSLEngineResult result = engine.wrap(empty, myNetData);
-
-        if (result.getStatus() != SSLEngineResult.Status.CLOSED) {
-          throw new SSLHandshakeException(
-              "Error closing SSL session.  Status=" + result.getStatus());
+        result = engine.wrap(EMPTY_BYTE_BUFFER, myNetData);
+
+        /*
+         * We would have liked to make this one of the while() conditions but
+         * if status is CLOSED we'll get a "Broken pipe" exception in the next write()
+         * so it needs to be handled here.
+         */
+        if (result.getStatus() == CLOSED) {
+          break;
         }
 
         // Send close message to peer
@@ -401,6 +409,10 @@ public class NioSslEngine implements NioFilter {
           socketChannel.write(myNetData);
         }
       }
+      if (result != null && result.getStatus() != CLOSED) {
+        throw new SSLHandshakeException(
+            "Error closing SSL session.  Status=" + result.getStatus());
+      }
     } catch (ClosedChannelException e) {
       // we can't send a close message if the channel is closed
     } catch (IOException e) {
@@ -415,18 +427,13 @@ public class NioSslEngine implements NioFilter {
     }
   }
 
-  private int expandedCapacity(ByteBuffer sourceBuffer, ByteBuffer targetBuffer) {
-    return Math.max(targetBuffer.position() + sourceBuffer.remaining() * 2,
-        targetBuffer.capacity() * 2);
-  }
-
   @VisibleForTesting
-  public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException {
+  public ByteBufferVendor getOutputBufferVendorForTestingOnly() {
     return outputBufferVendor;
   }
 
   @VisibleForTesting
-  public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException {
+  public ByteBufferVendor getInputBufferVendorForTestingOnly() {
     return inputBufferVendor;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 476fc24f67..c13ba5c5c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1791,7 +1791,7 @@ public class Connection implements Runnable {
    * checks to see if an exception should not be logged: i.e., "forcibly closed", "reset by peer",
    * or "connection reset"
    */
-  private static boolean isIgnorableIOException(Exception e) {
+  private static boolean isIgnorableIOException(IOException e) {
     if (e instanceof ClosedChannelException) {
       return true;
     }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index d6b9aa6485..7c72d6780b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SocketChannel;
@@ -190,7 +191,7 @@ public class NioSslEngineTest {
   }
 
   @Test
-  public void wrap() throws Exception {
+  public void engineWrapCausesResizeThenSucceeds() throws Exception {
     try (final ByteBufferSharing outputSharing =
         nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
 
@@ -206,6 +207,7 @@ public class NioSslEngineTest {
       // buffer
       TestSSLEngine testEngine = new TestSSLEngine();
       testEngine.addReturnResult(
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0),
           new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
       spyNioSslEngine.engine = testEngine;
 
@@ -217,12 +219,12 @@ public class NioSslEngineTest {
         appData.flip();
         assertThat(wrappedBuffer).isEqualTo(appData);
       }
-      verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+      verify(spyNioSslEngine, times(2)).handleBlockingTasks();
     }
   }
 
   @Test
-  public void wrapFails() throws IOException {
+  public void engineWrapCausesResizeThenCloses() throws IOException {
     try (final ByteBufferSharing outputSharing =
         nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption buffer
@@ -237,10 +239,12 @@ public class NioSslEngineTest {
       // buffer
       TestSSLEngine testEngine = new TestSSLEngine();
       testEngine.addReturnResult(
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_TASK, 0, 0),
           new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
       spyNioSslEngine.engine = testEngine;
 
-      assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+      assertThatThrownBy(() -> spyNioSslEngine.wrap(appData))
+          .isInstanceOf(SSLException.class)
           .hasMessageContaining("Error encrypting data");
     }
   }
@@ -370,7 +374,7 @@ public class NioSslEngineTest {
     when(mockChannel.socket()).thenReturn(mockSocket);
     when(mockSocket.isClosed()).thenReturn(false);
 
-    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
@@ -384,13 +388,13 @@ public class NioSslEngineTest {
   }
 
   @Test
-  public void closeWhenUnwrapError() throws Exception {
+  public void closeWhenWrapError() throws Exception {
     SocketChannel mockChannel = mock(SocketChannel.class);
     Socket mockSocket = mock(Socket.class);
     when(mockChannel.socket()).thenReturn(mockSocket);
     when(mockSocket.isClosed()).thenReturn(true);
 
-    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
+    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE).thenReturn(Boolean.TRUE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(BUFFER_OVERFLOW, FINISHED, 0, 0));
     assertThatThrownBy(() -> nioSslEngine.close(mockChannel)).isInstanceOf(GemFireIOException.class)
@@ -405,14 +409,13 @@ public class NioSslEngineTest {
     when(mockChannel.socket()).thenReturn(mockSocket);
     when(mockSocket.isClosed()).thenReturn(true);
 
-    when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
       try (final ByteBufferSharing outputSharing =
           nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
         // give the NioSslEngine something to write on its socket channel, simulating a TLS close
         // message
         outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
-        return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+        return new SSLEngineResult(OK, NEED_UNWRAP, 0, 0);
       }
     });
     when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
@@ -605,10 +608,19 @@ public class NioSslEngineTest {
 
     @Override
     public SSLEngineResult wrap(ByteBuffer[] sources, int i, int i1, ByteBuffer destination) {
-      for (ByteBuffer source : sources) {
+      assertThat(sources.length)
+          .as("test unexpectedly tried to wrap with multiple sources")
+          .isEqualTo(1);
+      final ByteBuffer source = sources[0];
+      final SSLEngineResult nextResult = nextResult();
+      try {
         destination.put(source);
+      } catch (final BufferOverflowException e) {
+        assertThat(BUFFER_OVERFLOW)
+            .as("got unexpected buffer overflow")
+            .isEqualTo(nextResult.getStatus());
       }
-      return nextResult();
+      return nextResult;
     }
 
     @Override
@@ -672,7 +684,9 @@ public class NioSslEngineTest {
 
     @Override
     public SSLSession getSession() {
-      return null;
+      final SSLSession session = mock(SSLSession.class);
+      when(session.getPacketBufferSize()).thenReturn(16 * 1024);
+      return session;
     }
 
     @Override


[geode] 02/02: GEODE-10122: fix quorm loss in test

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit e961ed9090b6c5ce7922d4a54f50ff9339f2fbfb
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Wed Apr 27 11:47:59 2022 -0700

    GEODE-10122: fix quorm loss in test
    
    (cherry picked from commit cedff28ffb826c627894902bff4f4c6f27a1e514)
---
 .../internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java   | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
index 3a887d5521..a364cbbae8 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2pMessagingSslTlsKeyUpdateDistributedTest.java
@@ -101,6 +101,16 @@ public class P2pMessagingSslTlsKeyUpdateDistributedTest {
 
   @After
   public void afterEach() {
+    /*
+     * Disconnect DSs before killing JVMs.
+     */
+    clusterStartupRule.getVM(2).invoke(
+        () -> ClusterStartupRule.getCache().close());
+    clusterStartupRule.getVM(1).invoke(
+        () -> ClusterStartupRule.getCache().close());
+    clusterStartupRule.getVM(0).invoke(
+        () -> ClusterStartupRule.getCache().close());
+
     clusterStartupRule.getVM(0).bounceForcibly();
     clusterStartupRule.getVM(1).bounceForcibly();
     clusterStartupRule.getVM(2).bounceForcibly();