You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2020/10/30 17:28:59 UTC

[geode] 01/01: GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch backport-1-12-GEODE-8652-and-friends
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 1395c6afe4d943bae87665f580501e5df64de111
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Thu Oct 29 16:38:25 2020 -0700

    GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)
    
    - NioSslEngine.close() proceeds even if readers (or writers) are
      operating on its ByteBuffers, allowing Connection.close() to close
      its socket and proceed.
    
    - NioSslEngine.close() needed a lock only on the output buffer, so
      we split what was a single lock into two. Also instead of using
      synchronized we use a ReentrantLock so we can
      call tryLock() and time out if needed in NioSslEngine.close().
    
    - Since readers/writers may hold locks on these input/output buffers
      when NioSslEngine.close() is called a reference count is maintained
      and the buffers are returned to the pool only when the last user
      is done.
    
    - To manage the locking and reference counting a new AutoCloseable
      ByteBufferSharing interface is introduced with a trivial
      implementation: ByteBufferSharingNoOp and a real implementation:
      ByteBufferSharingImpl.
    
    Co-authored-by: Bill Burcham <bi...@gmail.com>
    Co-authored-by: Darrel Schneider <ds...@pivotal.io>
    Co-authored-by: Ernie Burghardt <bu...@vmware.com>
    (cherry picked from commit 08e9e9673d0ed05555a3d74c6d16e706817cab09)
---
 .../tcp/ConnectionCloseSSLTLSDUnitTest.java        | 235 ++++++++++++
 .../org/apache/geode/internal/tcp/server.keystore  | Bin 0 -> 1256 bytes
 ...LSocketHostNameVerificationIntegrationTest.java |   4 +-
 .../internal/net/SSLSocketIntegrationTest.java     |  57 +--
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 .../geode/internal/net/ByteBufferSharing.java      |  55 +++
 .../geode/internal/net/ByteBufferSharingImpl.java  | 148 ++++++++
 .../geode/internal/net/ByteBufferSharingNoOp.java  |  52 +++
 .../org/apache/geode/internal/net/NioFilter.java   |  69 ++--
 .../apache/geode/internal/net/NioPlainEngine.java  |  27 +-
 .../apache/geode/internal/net/NioSslEngine.java    | 357 +++++++++---------
 .../org/apache/geode/internal/tcp/Connection.java  |  34 +-
 .../org/apache/geode/internal/tcp/MsgReader.java   |  15 +-
 .../internal/net/ByteBufferSharingImplTest.java    | 163 +++++++++
 .../geode/internal/net/NioPlainEngineTest.java     |  47 ++-
 .../geode/internal/net/NioSslEngineTest.java       | 397 +++++++++++----------
 16 files changed, 1199 insertions(+), 462 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
new file mode 100644
index 0000000..586cd53
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase.getBlackboard;
+import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+/**
+ * It would be nice if this test didn't need to use the cache since the test's purpose is to test
+ * that the {@link Connection} class can be closed while readers and writers hold locks on its
+ * internal TLS {@link ByteBuffer}s
+ *
+ * But this test does use the cache (region) because it enabled us to use existing cache messaging
+ * and to use the DistributionMessageObserver (observer) hooks.
+ *
+ * see also ClusterCommunicationsDUnitTest
+ */
+public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
+
+  private static final int SMALL_BUFFER_SIZE = 8000;
+  private static final String UPDATE_ENTERED_GATE = "connectionCloseDUnitTest.regionUpdateEntered";
+  private static final String SUSPEND_UPDATE_GATE = "connectionCloseDUnitTest.suspendRegionUpdate";
+  private static final String regionName = "connectionCloseDUnitTestRegion";
+  private static final Logger logger = LogService.getLogger();
+
+  private static Cache cache;
+
+  @Rule
+  public DistributedRule distributedRule =
+      DistributedRule.builder().withVMCount(3).build();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  private VM locator;
+  private VM sender;
+  private VM receiver;
+
+  @Before
+  public void before() {
+    locator = getVM(0);
+    sender = getVM(1);
+    receiver = getVM(2);
+  }
+
+  @After
+  public void after() {
+    receiver.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+  }
+
+  @Test
+  public void connectionWithHungReaderIsCloseableAndUnhangsReader()
+      throws InterruptedException, TimeoutException {
+
+    getBlackboard().initBlackboard();
+
+    final int locatorPort = createLocator(locator);
+    createCacheAndRegion(sender, locatorPort);
+    createCacheAndRegion(receiver, locatorPort);
+
+    receiver
+        .invoke("set up DistributionMessageObserver to 'hang' sender's put (on receiver)",
+            () -> {
+              final DistributionMessageObserver observer =
+                  new DistributionMessageObserver() {
+
+                    @Override
+                    public void beforeProcessMessage(final ClusterDistributionManager dm,
+                        final DistributionMessage message) {
+                      guardMessageProcessingHook(message, () -> {
+                        try {
+                          getBlackboard().signalGate(UPDATE_ENTERED_GATE);
+                          getBlackboard().waitForGate(SUSPEND_UPDATE_GATE, 5, MINUTES);
+                        } catch (TimeoutException | InterruptedException e) {
+                          fail("message observus interruptus");
+                        }
+                        logger.info("BGB: got before process message: " + message);
+                      });
+                    }
+                  };
+              DistributionMessageObserver.setInstance(observer);
+            });
+
+    final AsyncInvocation<Object> putInvocation = sender.invokeAsync("try a put", () -> {
+      final Region<Object, Object> region = cache.getRegion(regionName);
+      // test is going to close the cache while we are waiting for our ack
+      assertThatThrownBy(() -> {
+        region.put("hello", "world");
+      }).isInstanceOf(DistributedSystemDisconnectedException.class);
+    });
+
+    // wait until our message observer is blocked
+    getBlackboard().waitForGate(UPDATE_ENTERED_GATE, 5, MINUTES);
+
+    // at this point our put() is blocked waiting for a direct ack
+    assertThat(putInvocation.isAlive()).as("put is waiting for remote region to ack").isTrue();
+
+    /*
+     * Now close the cache. The point of calling it is to test that we don't block while trying
+     * to close connections. Cache.close() calls DistributedSystem.disconnect() which in turn
+     * closes all the connections (and their sockets.) We want the sockets to close because that'll
+     * cause our hung put() to see a DistributedSystemDisconnectedException.
+     */
+    sender.invoke("", () -> cache.close());
+
+    // wait for put task to complete: with an exception, that is!
+    putInvocation.get();
+
+    // un-stick our message observer
+    getBlackboard().signalGate(SUSPEND_UPDATE_GATE);
+  }
+
+  private void guardMessageProcessingHook(final DistributionMessage message,
+      final Runnable runnable) {
+    if (message instanceof UpdateMessage) {
+      final UpdateMessage updateMessage = (UpdateMessage) message;
+      if (updateMessage.getRegionPath().equals("/" + regionName)) {
+        runnable.run();
+      }
+    }
+  }
+
+  private int createLocator(VM memberVM) {
+    return memberVM.invoke("create locator", () -> {
+      // if you need to debug SSL communications use this property:
+      // System.setProperty("javax.net.debug", "all");
+      System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+      return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties())
+          .getPort();
+    });
+  }
+
+  private void createCacheAndRegion(VM memberVM, int locatorPort) {
+    memberVM.invoke("start cache and create region", () -> {
+      cache = createCache(locatorPort);
+      cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+    });
+  }
+
+  private Cache createCache(int locatorPort) {
+    // if you need to debug SSL communications use this property:
+    // System.setProperty("javax.net.debug", "all");
+    Properties properties = getDistributedSystemProperties();
+    properties.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+    return new CacheFactory(properties).create();
+  }
+
+  private Properties getDistributedSystemProperties() {
+    Properties properties = new Properties();
+    properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+    properties.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+    properties.setProperty(NAME, "vm" + VM.getCurrentVMNum());
+    properties.setProperty(CONSERVE_SOCKETS, "false"); // we are testing direct ack
+    properties.setProperty(SOCKET_LEASE_TIME, "10000");
+    properties.setProperty(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);
+
+    properties.setProperty(SSL_ENABLED_COMPONENTS, "cluster,locator");
+    properties
+        .setProperty(SSL_KEYSTORE, createTempFileFromResource(getClass(), "server.keystore")
+            .getAbsolutePath());
+    properties.setProperty(SSL_TRUSTSTORE,
+        createTempFileFromResource(getClass(), "server.keystore")
+            .getAbsolutePath());
+    properties.setProperty(SSL_PROTOCOLS, "TLSv1.2");
+    properties.setProperty(SSL_KEYSTORE_PASSWORD, "password");
+    properties.setProperty(SSL_TRUSTSTORE_PASSWORD, "password");
+    properties.setProperty(SSL_REQUIRE_AUTHENTICATION, "true");
+    return properties;
+  }
+
+}
diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore
new file mode 100644
index 0000000..8b5305f
Binary files /dev/null and b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore differ
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index 5483457..91e5f55 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -215,7 +215,9 @@ public class SSLSocketHostNameVerificationIntegrationTest {
           final NioSslEngine nioSslEngine = engine;
           engine.close(socket.getChannel());
           assertThatThrownBy(() -> {
-            nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
+            try (final ByteBufferSharing unused =
+                nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
+            }
           })
               .isInstanceOf(IOException.class);
         }
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 4e6747b..7a3759b 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -244,11 +244,13 @@ public class SSLSocketIntegrationTest {
     ByteBuffer buffer = bbos.getContentBuffer();
     System.out.println(
         "client buffer position is " + buffer.position() + " and limit is " + buffer.limit());
-    ByteBuffer wrappedBuffer = engine.wrap(buffer);
-    System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
-        + " and limit is " + wrappedBuffer.limit());
-    int bytesWritten = clientChannel.write(wrappedBuffer);
-    System.out.println("client bytes written is " + bytesWritten);
+    try (final ByteBufferSharing outputSharing = engine.wrap(buffer)) {
+      ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+      System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+          + " and limit is " + wrappedBuffer.limit());
+      int bytesWritten = clientChannel.write(wrappedBuffer);
+      System.out.println("client bytes written is " + bytesWritten);
+    }
   }
 
   private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis)
@@ -279,7 +281,9 @@ public class SSLSocketIntegrationTest {
           final NioSslEngine nioSslEngine = engine;
           engine.close(socket.getChannel());
           assertThatThrownBy(() -> {
-            nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
+            try (final ByteBufferSharing unused =
+                nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
+            }
           })
               .isInstanceOf(IOException.class);
         }
@@ -293,24 +297,35 @@ public class SSLSocketIntegrationTest {
   private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
       throws IOException {
 
-    ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer);
-    // if we already have unencrypted data skip unwrapping
-    if (unwrapped.position() == 0) {
-      int bytesRead;
-      // if we already have encrypted data skip reading from the socket
-      if (buffer.position() == 0) {
-        bytesRead = socket.getChannel().read(buffer);
-        buffer.flip();
+    try (final ByteBufferSharing sharedBuffer = engine.getUnwrappedBuffer()) {
+      final ByteBuffer unwrapped = sharedBuffer.getBuffer();
+      // if we already have unencrypted data skip unwrapping
+      if (unwrapped.position() == 0) {
+        int bytesRead;
+        // if we already have encrypted data skip reading from the socket
+        if (buffer.position() == 0) {
+          bytesRead = socket.getChannel().read(buffer);
+          buffer.flip();
+        } else {
+          bytesRead = buffer.remaining();
+        }
+        System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+            + buffer.position() + " and limit is " + buffer.limit());
+        try (final ByteBufferSharing sharedBuffer2 = engine.unwrap(buffer)) {
+          final ByteBuffer unwrapped2 = sharedBuffer2.getBuffer();
+
+          unwrapped2.flip();
+          System.out.println("server unwrapped buffer position is " + unwrapped2.position()
+              + " and limit is " + unwrapped2.limit());
+          finishReadMessageFromNIOSSLClient(unwrapped2);
+        }
       } else {
-        bytesRead = buffer.remaining();
+        finishReadMessageFromNIOSSLClient(unwrapped);
       }
-      System.out.println("server bytes read is " + bytesRead + ": buffer position is "
-          + buffer.position() + " and limit is " + buffer.limit());
-      unwrapped = engine.unwrap(buffer);
-      unwrapped.flip();
-      System.out.println("server unwrapped buffer position is " + unwrapped.position()
-          + " and limit is " + unwrapped.limit());
     }
+  }
+
+  private void finishReadMessageFromNIOSSLClient(final ByteBuffer unwrapped) throws IOException {
     ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
     DataInputStream dis = new DataInputStream(bbis);
     String welcome = dis.readUTF();
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 0de147d..af3bd1e 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,3 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
 org/apache/geode/cache/query/internal/xml/ElementType$1
 org/apache/geode/cache/query/internal/xml/ElementType$2
 org/apache/geode/cache/query/internal/xml/ElementType$3
+org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
new file mode 100644
index 0000000..cdfa897
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+
+/**
+ * When a {@link ByteBufferSharing} is acquired in a try-with-resources the buffer is available (for
+ * reading and modification) within the scope of that try block.
+ *
+ * Releases managed ByteBuffer back to pool after last reference is dropped.
+ */
+public interface ByteBufferSharing extends AutoCloseable {
+
+  /**
+   * Call this method only within a try-with-resource in which this {@link ByteBufferSharing} was
+   * acquired. Retain the reference only within the scope of that try-with-resources.
+   *
+   * @return the buffer: manipulable only within the scope of the try-with-resources
+   * @throws IOException if the buffer is no longer accessible
+   */
+  ByteBuffer getBuffer() throws IOException;
+
+  /**
+   * Expand the buffer if needed. This may return a different object so be sure to pay attention to
+   * the return value if you need access to the potentially- expanded buffer.
+   *
+   * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
+   *
+   * @return the same buffer or a different (bigger) buffer
+   * @throws IOException if the buffer is no longer accessible
+   */
+  ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
+
+  /**
+   * Override {@link AutoCloseable#close()} without throws clause since we don't need one.
+   */
+  @Override
+  void close();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
new file mode 100644
index 0000000..e9a941e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.net.BufferPool.BufferType;
+
+/**
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
+ * try-with-resources.
+ */
+class ByteBufferSharingImpl implements ByteBufferSharing {
+
+  static class OpenAttemptTimedOut extends Exception {
+  }
+
+  private final Lock lock;
+  private final AtomicBoolean isClosed;
+  // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
+  private ByteBuffer buffer;
+  private final BufferType bufferType;
+  private final AtomicInteger counter;
+  private final BufferPool bufferPool;
+
+  /**
+   * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
+   *
+   * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
+   * to an external object or is returned to an external caller.)
+   *
+   * This constructor acquires no lock. The reference count will be 1 after this constructor
+   * completes.
+   */
+  ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType,
+      final BufferPool bufferPool) {
+    this.buffer = buffer;
+    this.bufferType = bufferType;
+    this.bufferPool = bufferPool;
+    lock = new ReentrantLock();
+    counter = new AtomicInteger(1);
+    isClosed = new AtomicBoolean(false);
+  }
+
+  /**
+   * The destructor. Called by the resource owner to undo the work of the constructor.
+   */
+  void destruct() {
+    if (isClosed.compareAndSet(false, true)) {
+      dropReference();
+    }
+  }
+
+  /**
+   * This method is for use only by the owner of the shared resource. It's used for handing out
+   * references to the shared resource. So it does reference counting and also acquires a lock.
+   *
+   * Resource owners call this method as the last thing before returning a reference to the caller.
+   * That caller binds that reference to a variable in a try-with-resources statement and relies on
+   * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
+   */
+  ByteBufferSharing open() {
+    lock.lock();
+    addReference();
+    return this;
+  }
+
+  /**
+   * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
+   */
+  ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut {
+    try {
+      if (!lock.tryLock(time, unit)) {
+        throw new OpenAttemptTimedOut();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new OpenAttemptTimedOut();
+    }
+    addReference();
+    return this;
+  }
+
+  @Override
+  public ByteBuffer getBuffer() throws IOException {
+    if (isClosed.get()) {
+      throw new IOException("NioSslEngine has been closed");
+    } else {
+      return buffer;
+    }
+  }
+
+  @Override
+  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+    return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+  }
+
+  @Override
+  public void close() {
+    /*
+     * We are counting on our ReentrantLock throwing an exception if the current thread
+     * does not hold the lock. In that case dropReference() will not be called. This
+     * prevents ill-behaved clients (clients that call close() too many times) from
+     * corrupting our reference count.
+     */
+    lock.unlock();
+    dropReference();
+  }
+
+  private int addReference() {
+    return counter.incrementAndGet();
+  }
+
+  private int dropReference() {
+    final int usages = counter.decrementAndGet();
+    if (usages == 0) {
+      bufferPool.releaseBuffer(bufferType, buffer);
+    }
+    return usages;
+  }
+
+  @VisibleForTesting
+  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+    buffer = newBufferForTesting;
+  }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
new file mode 100644
index 0000000..bd707e3
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
+ * try-with-resources.
+ *
+ * This implementation is a "no-op". It performs no actual locking and no reference counting. It's
+ * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so,
+ * needs no reference counting on buffers, nor any synchronization around access to buffers.
+ *
+ * See also {@link ByteBufferSharingImpl}
+ */
+class ByteBufferSharingNoOp implements ByteBufferSharing {
+
+  private final ByteBuffer buffer;
+
+  ByteBufferSharingNoOp(final ByteBuffer buffer) {
+    this.buffer = buffer;
+  }
+
+  @Override
+  public ByteBuffer getBuffer() {
+    return buffer;
+  }
+
+  @Override
+  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+    throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 9c437ad..eb53f0e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -19,47 +19,53 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
 /**
- * Prior to transmitting a buffer or processing a received buffer
- * a NioFilter should be called to wrap (transmit) or unwrap (received)
- * the buffer in case SSL is being used.<br>
- * Implementations of this class may not be thread-safe in regard to
- * the buffers their methods return. These may be internal state that,
- * if used concurrently by multiple threads could cause corruption.
- * Appropriate external synchronization must be used in order to provide
- * thread-safety. Do this by invoking getSynchObject() and synchronizing on
- * the returned object while using the buffer.
+ * Prior to transmitting a buffer or processing a received buffer a NioFilter should be called to
+ * wrap (transmit) or unwrap (received) the buffer in case SSL is being used.<br>
+ * Implementations of
+ * this class may not be thread-safe in regard to the buffers their methods return. These may be
+ * internal state that, if used concurrently by multiple threads could cause corruption. Appropriate
+ * external synchronization must be used in order to provide thread-safety. Do this by invoking
+ * getSynchObject() and synchronizing on the returned object while using the buffer.
  */
 public interface NioFilter {
 
   /**
    * wrap bytes for transmission to another process
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer wrap(ByteBuffer buffer) throws IOException;
+  ByteBufferSharing wrap(ByteBuffer buffer) throws IOException;
 
   /**
-   * unwrap bytes received from another process. The unwrapped
-   * buffer should be flipped before reading. When done reading invoke
-   * doneReading() to reset for future read ops
+   * unwrap bytes received from another process. The unwrapped buffer should be flipped before
+   * reading. When done reading invoke doneReading() to reset for future read ops
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException;
+  ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException;
 
   /**
-   * ensure that the wrapped buffer has enough room to read the given amount of data.
-   * This must be invoked before readAtLeast. A new buffer may be returned by this method.
+   * ensure that the wrapped buffer has enough room to read the given amount of data. This must be
+   * invoked before readAtLeast. A new buffer may be returned by this method.
    */
   ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
       BufferPool.BufferType bufferType);
 
   /**
-   * read at least the indicated amount of bytes from the given
-   * socket. The buffer position will be ready for reading
-   * the data when this method returns. Note: you must invoke ensureWrappedCapacity
-   * with the given amount prior to each invocation of this method.
+   * read at least the indicated amount of bytes from the given socket. The buffer position will be
+   * ready for reading the data when this method returns. Note: you must invoke
+   * ensureWrappedCapacity with the given amount prior to each invocation of this method.
    * <br>
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
-   * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
+   * unwrappedBuffer
+   * = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+  ByteBufferSharing readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
       throws IOException;
 
   /**
@@ -81,28 +87,19 @@ public interface NioFilter {
     }
   }
 
-  default boolean isClosed() {
-    return false;
-  }
-
   /**
    * invoke this method when you are done using the NioFilter
-   *
    */
   default void close(SocketChannel socketChannel) {
     // nothing by default
   }
 
   /**
-   * returns the unwrapped byte buffer associated with the given wrapped buffer.
+   * Returns the sharing object for the {@link NioFilter}'s unwrapped buffer, if one exists.
+   *
+   * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+   * to call this method in a try-with-resources statement.
    */
-  ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
+  ByteBufferSharing getUnwrappedBuffer();
 
-  /**
-   * returns an object to be used in synchronizing on the use of buffers returned by
-   * a NioFilter.
-   */
-  default Object getSynchObject() {
-    return this;
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 3ebce38..8b5df96 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
+import org.apache.geode.annotations.internal.MakeImmutable;
 import org.apache.geode.internal.Assert;
 
 /**
@@ -27,6 +28,12 @@ import org.apache.geode.internal.Assert;
  * secure communications.
  */
 public class NioPlainEngine implements NioFilter {
+
+  // this variable requires the MakeImmutable annotation but the buffer is empty and
+  // not really modifiable
+  @MakeImmutable
+  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
   private final BufferPool bufferPool;
 
   int lastReadPosition;
@@ -38,14 +45,14 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer wrap(ByteBuffer buffer) {
-    return buffer;
+  public ByteBufferSharing wrap(ByteBuffer buffer) {
+    return shareBuffer(buffer);
   }
 
   @Override
-  public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
+  public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) {
     wrappedBuffer.position(wrappedBuffer.limit());
-    return wrappedBuffer;
+    return shareBuffer(wrappedBuffer);
   }
 
   @Override
@@ -82,7 +89,7 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
+  public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
       throws IOException {
     ByteBuffer buffer = wrappedBuffer;
 
@@ -108,7 +115,7 @@ public class NioPlainEngine implements NioFilter {
     buffer.position(lastProcessedPosition);
     lastProcessedPosition += bytes;
 
-    return buffer;
+    return shareBuffer(buffer);
   }
 
   public void doneReading(ByteBuffer unwrappedBuffer) {
@@ -121,8 +128,12 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
-    return wrappedBuffer;
+  public ByteBufferSharing getUnwrappedBuffer() {
+    return shareBuffer(EMPTY_BUFFER);
+  }
+
+  private ByteBufferSharingNoOp shareBuffer(final ByteBuffer wrappedBuffer) {
+    return new ByteBufferSharingNoOp(wrappedBuffer);
   }
 
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2398b35..7e642ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -40,48 +40,48 @@ import javax.net.ssl.SSLSession;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
-import org.apache.geode.annotations.internal.MakeImmutable;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
+import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 
 /**
- * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread
- * safe. Its use should be confined to one thread or should be protected by external
- * synchronization.
+ * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread safe.
+ * Its use should be confined to one thread or should be protected by external synchronization.
  */
 public class NioSslEngine implements NioFilter {
   private static final Logger logger = LogService.getLogger();
 
-  // this variable requires the MakeImmutable annotation but the buffer is empty and
-  // not really modifiable
-  @MakeImmutable
-  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
   private final BufferPool bufferPool;
 
-  private volatile boolean closed;
+  private boolean closed;
 
   SSLEngine engine;
 
   /**
-   * myNetData holds bytes wrapped by the SSLEngine
+   * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  ByteBuffer myNetData;
+  private final ByteBufferSharingImpl outputSharing;
 
   /**
-   * peerAppData holds the last unwrapped data from a peer
+   * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  ByteBuffer peerAppData;
+  private final ByteBufferSharingImpl inputSharing;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
     int appBufferSize = session.getApplicationBufferSize();
     int packetBufferSize = engine.getSession().getPacketBufferSize();
+    closed = false;
     this.engine = engine;
     this.bufferPool = bufferPool;
-    this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize);
-    this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize);
+    outputSharing =
+        new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
+            TRACKED_SENDER, bufferPool);
+    inputSharing =
+        new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
+            TRACKED_RECEIVER, bufferPool);
   }
 
   /**
@@ -135,57 +135,65 @@ public class NioSslEngine implements NioFilter {
 
       switch (status) {
         case NEED_UNWRAP:
-          // Receive handshaking data from peer
-          int dataRead = socketChannel.read(handshakeBuffer);
-
-          // Process incoming handshaking data
-          handshakeBuffer.flip();
-          engineResult = engine.unwrap(handshakeBuffer, peerAppData);
-          handshakeBuffer.compact();
-          status = engineResult.getHandshakeStatus();
-
-          // if we're not finished, there's nothing to process and no data was read let's hang out
-          // for a little
-          if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
-            Thread.sleep(10);
-          }
+          try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+            final ByteBuffer peerAppData = inputSharing.getBuffer();
+
+            // Receive handshaking data from peer
+            int dataRead = socketChannel.read(handshakeBuffer);
+
+            // Process incoming handshaking data
+            handshakeBuffer.flip();
+
+
+            engineResult = engine.unwrap(handshakeBuffer, peerAppData);
+            handshakeBuffer.compact();
+            status = engineResult.getHandshakeStatus();
+
+            // if we're not finished, there's nothing to process and no data was read let's hang out
+            // for a little
+            if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
+              Thread.sleep(10);
+            }
 
-          if (engineResult.getStatus() == BUFFER_OVERFLOW) {
-            peerAppData =
-                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2);
+            if (engineResult.getStatus() == BUFFER_OVERFLOW) {
+              inputSharing.expandWriteBufferIfNeeded(peerAppData.capacity() * 2);
+            }
+            break;
           }
-          break;
 
         case NEED_WRAP:
-          // Empty the local network packet buffer.
-          myNetData.clear();
-
-          // Generate handshaking data
-          engineResult = engine.wrap(myAppData, myNetData);
-          status = engineResult.getHandshakeStatus();
-
-          // Check status
-          switch (engineResult.getStatus()) {
-            case BUFFER_OVERFLOW:
-              myNetData =
-                  expandWriteBuffer(TRACKED_SENDER, myNetData,
-                      myNetData.capacity() * 2);
-              break;
-            case OK:
-              myNetData.flip();
-              // Send the handshaking data to peer
-              while (myNetData.hasRemaining()) {
-                socketChannel.write(myNetData);
-              }
-              break;
-            case CLOSED:
-              break;
-            default:
-              logger.info("handshake terminated with illegal state due to {}", status);
-              throw new IllegalStateException(
-                  "Unknown SSLEngineResult status: " + engineResult.getStatus());
+          try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+            final ByteBuffer myNetData = outputSharing.getBuffer();
+
+            // Empty the local network packet buffer.
+            myNetData.clear();
+
+            // Generate handshaking data
+            engineResult = engine.wrap(myAppData, myNetData);
+            status = engineResult.getHandshakeStatus();
+
+            // Check status
+            switch (engineResult.getStatus()) {
+              case BUFFER_OVERFLOW:
+                // no need to assign return value because we will never reference it
+                outputSharing.expandWriteBufferIfNeeded(myNetData.capacity() * 2);
+                break;
+              case OK:
+                myNetData.flip();
+                // Send the handshaking data to peer
+                while (myNetData.hasRemaining()) {
+                  socketChannel.write(myNetData);
+                }
+                break;
+              case CLOSED:
+                break;
+              default:
+                logger.info("handshake terminated with illegal state due to {}", status);
+                throw new IllegalStateException(
+                    "Unknown SSLEngineResult status: " + engineResult.getStatus());
+            }
+            break;
           }
-          break;
         case NEED_TASK:
           // Handle blocking tasks
           handleBlockingTasks();
@@ -213,17 +221,6 @@ public class NioSslEngine implements NioFilter {
     return true;
   }
 
-  ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
-      int desiredCapacity) {
-    return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
-  }
-
-  void checkClosed() throws IOException {
-    if (closed) {
-      throw new IOException("NioSslEngine has been closed");
-    }
-  }
-
   void handleBlockingTasks() {
     Runnable task;
     while ((task = engine.getDelegatedTask()) != null) {
@@ -233,72 +230,77 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public synchronized ByteBuffer wrap(ByteBuffer appData) throws IOException {
-    checkClosed();
+  public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
 
-    myNetData.clear();
+      ByteBuffer myNetData = outputSharing.getBuffer();
 
-    while (appData.hasRemaining()) {
-      // ensure we have lots of capacity since encrypted data might
-      // be larger than the app data
-      int remaining = myNetData.capacity() - myNetData.position();
+      myNetData.clear();
 
-      if (remaining < (appData.remaining() * 2)) {
-        int newCapacity = expandedCapacity(appData, myNetData);
-        myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity);
-      }
+      while (appData.hasRemaining()) {
+        // ensure we have lots of capacity since encrypted data might
+        // be larger than the app data
+        int remaining = myNetData.capacity() - myNetData.position();
 
-      SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+        if (remaining < (appData.remaining() * 2)) {
+          int newCapacity = expandedCapacity(appData, myNetData);
+          myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+        }
 
-      if (wrapResult.getHandshakeStatus() == NEED_TASK) {
-        handleBlockingTasks();
-      }
+        SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
 
-      if (wrapResult.getStatus() != OK) {
-        throw new SSLException("Error encrypting data: " + wrapResult);
+        if (wrapResult.getHandshakeStatus() == NEED_TASK) {
+          handleBlockingTasks();
+        }
+
+        if (wrapResult.getStatus() != OK) {
+          throw new SSLException("Error encrypting data: " + wrapResult);
+        }
       }
-    }
 
-    myNetData.flip();
+      myNetData.flip();
 
-    return myNetData;
+      return shareOutputBuffer();
+    }
   }
 
   @Override
-  public synchronized ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException {
-    checkClosed();
-
-    // note that we do not clear peerAppData as it may hold a partial
-    // message. TcpConduit, for instance, uses message chunking to
-    // transmit large payloads and we may have read a partial chunk
-    // during the previous unwrap
-
-    peerAppData.limit(peerAppData.capacity());
-    while (wrappedBuffer.hasRemaining()) {
-      SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
-      switch (unwrapResult.getStatus()) {
-        case BUFFER_OVERFLOW:
-          // buffer overflow expand and try again - double the available decryption space
-          int newCapacity =
-              (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
-          newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
-          peerAppData =
-              bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, newCapacity);
-          peerAppData.limit(peerAppData.capacity());
-          break;
-        case BUFFER_UNDERFLOW:
-          // partial data - need to read more. When this happens the SSLEngine will not have
-          // changed the buffer position
-          wrappedBuffer.compact();
-          return peerAppData;
-        case OK:
-          break;
-        default:
-          throw new SSLException("Error decrypting data: " + unwrapResult);
+  public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+
+      ByteBuffer peerAppData = inputSharing.getBuffer();
+
+      // note that we do not clear peerAppData as it may hold a partial
+      // message. TcpConduit, for instance, uses message chunking to
+      // transmit large payloads and we may have read a partial chunk
+      // during the previous unwrap
+
+      peerAppData.limit(peerAppData.capacity());
+      while (wrappedBuffer.hasRemaining()) {
+        SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
+        switch (unwrapResult.getStatus()) {
+          case BUFFER_OVERFLOW:
+            // buffer overflow expand and try again - double the available decryption space
+            int newCapacity =
+                (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
+            newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
+            peerAppData = inputSharing.expandWriteBufferIfNeeded(newCapacity);
+            peerAppData.limit(peerAppData.capacity());
+            break;
+          case BUFFER_UNDERFLOW:
+            // partial data - need to read more. When this happens the SSLEngine will not have
+            // changed the buffer position
+            wrappedBuffer.compact();
+            return shareInputBuffer();
+          case OK:
+            break;
+          default:
+            throw new SSLException("Error decrypting data: " + unwrapResult);
+        }
       }
+      wrappedBuffer.clear();
+      return shareInputBuffer();
     }
-    wrappedBuffer.clear();
-    return peerAppData;
   }
 
   @Override
@@ -315,50 +317,45 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
+  public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
       ByteBuffer wrappedBuffer) throws IOException {
-    if (peerAppData.capacity() > bytes) {
-      // we already have a buffer that's big enough
-      if (peerAppData.capacity() - peerAppData.position() < bytes) {
-        peerAppData.compact();
-        peerAppData.flip();
-      }
-    }
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
 
-    while (peerAppData.remaining() < bytes) {
-      wrappedBuffer.limit(wrappedBuffer.capacity());
-      int amountRead = channel.read(wrappedBuffer);
-      if (amountRead < 0) {
-        throw new EOFException();
+      ByteBuffer peerAppData = inputSharing.getBuffer();
+
+      if (peerAppData.capacity() > bytes) {
+        // we already have a buffer that's big enough
+        if (peerAppData.capacity() - peerAppData.position() < bytes) {
+          peerAppData.compact();
+          peerAppData.flip();
+        }
       }
-      if (amountRead > 0) {
-        wrappedBuffer.flip();
-        // prep the decoded buffer for writing
-        peerAppData.compact();
-        peerAppData = unwrap(wrappedBuffer);
-        // done writing to the decoded buffer - prep it for reading again
-        peerAppData.flip();
+
+      while (peerAppData.remaining() < bytes) {
+        wrappedBuffer.limit(wrappedBuffer.capacity());
+        int amountRead = channel.read(wrappedBuffer);
+        if (amountRead < 0) {
+          throw new EOFException();
+        }
+        if (amountRead > 0) {
+          wrappedBuffer.flip();
+          // prep the decoded buffer for writing
+          peerAppData.compact();
+          try (final ByteBufferSharing inputSharing2 = unwrap(wrappedBuffer)) {
+            // done writing to the decoded buffer - prep it for reading again
+            final ByteBuffer peerAppDataNew = inputSharing2.getBuffer();
+            peerAppDataNew.flip();
+            peerAppData = peerAppDataNew; // loop needs new reference!
+          }
+        }
       }
+      return shareInputBuffer();
     }
-    return peerAppData;
   }
 
   @Override
-  public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
-    return peerAppData;
-  }
-
-  /**
-   * ensures that the unwrapped buffer associated with the given wrapped buffer has
-   * sufficient capacity for the given amount of bytes. This may compact the
-   * buffer or it may return a new buffer.
-   */
-  public ByteBuffer ensureUnwrappedCapacity(int amount) {
-    // for TTLS the app-data buffers do not need to be tracked direct-buffers since we
-    // do not use them for I/O operations
-    peerAppData =
-        bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount);
-    return peerAppData;
+  public ByteBufferSharing getUnwrappedBuffer() {
+    return shareInputBuffer();
   }
 
   @Override
@@ -369,16 +366,14 @@ public class NioSslEngine implements NioFilter {
   }
 
   @Override
-  public synchronized boolean isClosed() {
-    return closed;
-  }
-
-  @Override
-  public void close(SocketChannel socketChannel) {
+  public synchronized void close(SocketChannel socketChannel) {
     if (closed) {
       return;
     }
-    try {
+    closed = true;
+    inputSharing.destruct();
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
+      final ByteBuffer myNetData = outputSharing.getBuffer();
 
       if (!engine.isOutboundDone()) {
         ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
@@ -405,14 +400,13 @@ public class NioSslEngine implements NioFilter {
       // we can't send a close message if the channel is closed
     } catch (IOException e) {
       throw new GemFireIOException("exception closing SSL session", e);
+    } catch (final OpenAttemptTimedOut _unused) {
+      logger.info(String.format("Couldn't get output lock in time, eliding TLS close message"));
+      if (!engine.isOutboundDone()) {
+        engine.closeOutbound();
+      }
     } finally {
-      ByteBuffer netData = myNetData;
-      ByteBuffer appData = peerAppData;
-      myNetData = null;
-      peerAppData = EMPTY_BUFFER;
-      bufferPool.releaseBuffer(TRACKED_SENDER, netData);
-      bufferPool.releaseBuffer(TRACKED_RECEIVER, appData);
-      this.closed = true;
+      outputSharing.destruct();
     }
   }
 
@@ -421,4 +415,17 @@ public class NioSslEngine implements NioFilter {
         targetBuffer.capacity() * 2);
   }
 
+  @VisibleForTesting
+  public ByteBufferSharing shareOutputBuffer() {
+    return outputSharing.open();
+  }
+
+  private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
+      throws OpenAttemptTimedOut {
+    return outputSharing.open(time, unit);
+  }
+
+  public ByteBufferSharing shareInputBuffer() {
+    return inputSharing.open();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index d93b75c..95e1953 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -78,6 +78,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.ByteBufferSharing;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -789,11 +790,12 @@ public class Connection implements Runnable {
   @VisibleForTesting
   void clearSSLInputBuffer() {
     if (getConduit().useSSL() && ioFilter != null) {
-      synchronized (ioFilter.getSynchObject()) {
-        if (!ioFilter.isClosed()) {
-          // clear out any remaining handshake bytes
-          ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
-          buffer.position(0).limit(0);
+      try (final ByteBufferSharing sharedBuffer = ioFilter.getUnwrappedBuffer()) {
+        // clear out any remaining handshake bytes
+        try {
+          sharedBuffer.getBuffer().position(0).limit(0);
+        } catch (IOException e) {
+          // means the NioFilter was already closed
         }
       }
     }
@@ -2403,8 +2405,9 @@ public class Connection implements Runnable {
         long queueTimeoutTarget = now + asyncQueueTimeout;
         channel.configureBlocking(false);
         try {
-          synchronized (ioFilter.getSynchObject()) {
-            ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+          try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
+            final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+
             int waitTime = 1;
             do {
               owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2557,9 +2560,9 @@ public class Connection implements Runnable {
           }
           // fall through
         }
-        // synchronize on the ioFilter while using its network buffer
-        synchronized (ioFilter.getSynchObject()) {
-          ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+        try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
+          final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+
           while (wrappedBuffer.remaining() > 0) {
             int amtWritten = 0;
             long start = stats.startSocketWrite(true);
@@ -2611,10 +2614,12 @@ public class Connection implements Runnable {
     final Version version = getRemoteVersion();
     try {
       msgReader = new MsgReader(this, ioFilter, version);
+
       ReplyMessage msg;
       int len;
 
-      synchronized (ioFilter.getSynchObject()) {
+      // (we have to lock here to protect between reading header and message body)
+      try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) {
         Header header = msgReader.readHeader();
 
         if (header.getMessageType() == NORMAL_MSG_TYPE) {
@@ -2631,7 +2636,7 @@ public class Connection implements Runnable {
           releaseMsgDestreamer(header.getMessageId(), destreamer);
           len = destreamer.size();
         }
-      } // sync
+      }
       // I'd really just like to call dispatchMessage here. However,
       // that call goes through a bunch of checks that knock about
       // 10% of the performance. Since this direct-ack stuff is all
@@ -2698,8 +2703,9 @@ public class Connection implements Runnable {
   private void processInputBuffer() throws ConnectionException, IOException {
     inputBuffer.flip();
 
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+    try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
+      final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+
       peerDataBuffer.flip();
 
       boolean done = false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 396ece2..503e48b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -26,6 +26,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.ByteBufferSharing;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -54,8 +55,8 @@ public class MsgReader {
   }
 
   Header readHeader() throws IOException {
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+    try (final ByteBufferSharing sharedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES)) {
+      ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
 
       Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
 
@@ -89,8 +90,8 @@ public class MsgReader {
    */
   DistributionMessage readMessage(Header header)
       throws IOException, ClassNotFoundException {
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+    try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
+      ByteBuffer nioInputBuffer = sharedBuffer.getBuffer();
       Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
       this.getStats().incMessagesBeingReceived(true, header.messageLength);
       long startSer = this.getStats().startMsgDeserialization();
@@ -112,8 +113,8 @@ public class MsgReader {
 
   void readChunk(Header header, MsgDestreamer md)
       throws IOException {
-    synchronized (ioFilter.getSynchObject()) {
-      ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
+    try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
+      ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
       this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
       md.addChunk(unwrappedBuffer, header.messageLength);
       // show that the bytes have been consumed by adjusting the buffer's position
@@ -123,7 +124,7 @@ public class MsgReader {
 
 
 
-  private ByteBuffer readAtLeast(int bytes) throws IOException {
+  private ByteBufferSharing readAtLeast(int bytes) throws IOException {
     peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
         BufferPool.BufferType.TRACKED_RECEIVER);
     return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
new file mode 100644
index 0000000..bb5a75f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ByteBufferSharingImplTest {
+
+  private ByteBufferSharingImpl sharing;
+  private BufferPool poolMock;
+  private CountDownLatch clientHasOpenedResource;
+  private CountDownLatch clientMayComplete;
+
+  @Before
+  public void before() {
+    poolMock = mock(BufferPool.class);
+    sharing =
+        new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
+            poolMock);
+    clientHasOpenedResource = new CountDownLatch(1);
+    clientMayComplete = new CountDownLatch(1);
+  }
+
+  @Test
+  public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
+    resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
+      try (final ByteBufferSharing _unused = sharing.open()) {
+      }
+    });
+  }
+
+  @Test
+  public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
+    resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
+      final ByteBufferSharing sharing2 = sharing.open();
+      sharing2.close();
+      verify(poolMock, times(0)).releaseBuffer(any(), any());
+      assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+      verify(poolMock, times(0)).releaseBuffer(any(), any());
+    });
+  }
+
+  @Test
+  public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
+    clientIsLastReferenceHolder("client with balanced close calls", () -> {
+      try (final ByteBufferSharing _unused = sharing.open()) {
+        clientHasOpenedResource.countDown();
+        blockClient();
+      }
+    });
+  }
+
+  @Test
+  public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
+    clientIsLastReferenceHolder("client with extra close calls", () -> {
+      final ByteBufferSharing sharing2 = sharing.open();
+      clientHasOpenedResource.countDown();
+      blockClient();
+      sharing2.close();
+      verify(poolMock, times(1)).releaseBuffer(any(), any());
+      assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+      System.out.println("here");
+    });
+  }
+
+  @Test
+  public void extraCloseDoesNotPrematurelyReturnBufferToPool() {
+    final ByteBufferSharing sharing2 = sharing.open();
+    sharing2.close();
+    assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+    sharing.destruct();
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
+  @Test
+  public void extraCloseDoesNotDecrementRefCount() {
+    final ByteBufferSharing sharing2 = sharing.open();
+    sharing2.close();
+    assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+    final ByteBufferSharing sharing3 = this.sharing.open();
+    sharing.destruct();
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+  }
+
+  private void resourceOwnerIsLastReferenceHolder(final String name, final Runnable client)
+      throws InterruptedException {
+    /*
+     * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
+     */
+
+    /*
+     * clientThread thread is playing the role of the client (of the resource owner)
+     */
+    final Thread clientThread = new Thread(client, name);
+    clientThread.start();
+    clientThread.join();
+
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+
+    sharing.destruct();
+
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
+  private void clientIsLastReferenceHolder(final String name, final Runnable client)
+      throws InterruptedException {
+    /*
+     * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
+     */
+
+    /*
+     * clientThread thread is playing the role of the client (of the resource owner)
+     */
+    final Thread clientThread = new Thread(client, name);
+    clientThread.start();
+
+    clientHasOpenedResource.await();
+
+    sharing.destruct();
+
+    verify(poolMock, times(0)).releaseBuffer(any(), any());
+
+    clientMayComplete.countDown(); // let client finish
+
+    clientThread.join();
+
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
+  private void blockClient() {
+    try {
+      clientMayComplete.await();
+    } catch (InterruptedException e) {
+      fail("test client thread interrupted: " + e);
+    }
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index 166c2a5..bbf7a3b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -50,7 +50,8 @@ public class NioPlainEngineTest {
   public void unwrap() {
     ByteBuffer buffer = ByteBuffer.allocate(100);
     buffer.position(0).limit(buffer.capacity());
-    nioEngine.unwrap(buffer);
+    try (final ByteBufferSharing unused = nioEngine.unwrap(buffer)) {
+    }
     assertThat(buffer.position()).isEqualTo(buffer.limit());
   }
 
@@ -116,23 +117,29 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit()).isEqualTo(amountToRead);
-    assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
-    assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
-
-    data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(5)).read(any(ByteBuffer.class));
-    // at end of last readAtLeast data
-    assertThat(data.position()).isEqualTo(amountToRead);
-    // we read amountToRead bytes
-    assertThat(data.limit()).isEqualTo(amountToRead * 2);
-    // we did 2 more reads from the network
-    assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
-    // the next read will start at the end of consumed data
-    assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
+    try (final ByteBufferSharing sharedBuffer =
+        nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+      assertThat(data.position()).isEqualTo(0);
+      assertThat(data.limit()).isEqualTo(amountToRead);
+      assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
+      assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
+    }
+
+    try (final ByteBufferSharing sharedBuffer =
+        nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      final ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(5)).read(any(ByteBuffer.class));
+      // at end of last readAtLeast data
+      assertThat(data.position()).isEqualTo(amountToRead);
+      // we read amountToRead bytes
+      assertThat(data.limit()).isEqualTo(amountToRead * 2);
+      // we did 2 more reads from the network
+      assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
+      // the next read will start at the end of consumed data
+      assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
+    }
 
   }
 
@@ -147,7 +154,9 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+    try (final ByteBufferSharing unused =
+        nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+    }
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index aef1672..e9b01cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -62,13 +62,14 @@ import org.apache.geode.test.junit.categories.MembershipTest;
 
 @Category({MembershipTest.class})
 public class NioSslEngineTest {
-  private static final int netBufferSize = 10000;
-  private static final int appBufferSize = 20000;
+  private static final int netBufferSize = 4096;
+  private static final int appBufferSize = 32768;
 
   private SSLEngine mockEngine;
   private DMStats mockStats;
   private NioSslEngine nioSslEngine;
   private NioSslEngine spyNioSslEngine;
+  private BufferPool spyBufferPool;
 
   @Before
   public void setUp() throws Exception {
@@ -81,11 +82,20 @@ public class NioSslEngineTest {
 
     mockStats = mock(DMStats.class);
 
-    nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats));
+    final BufferPool bufferPool = new BufferPool(mockStats);
+    spyBufferPool = spy(bufferPool);
+    nioSslEngine = new NioSslEngine(mockEngine, spyBufferPool);
     spyNioSslEngine = spy(nioSslEngine);
   }
 
   @Test
+  public void engineUsesDirectBuffers() throws IOException {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+      assertThat(outputSharing.getBuffer().isDirect()).isTrue();
+    }
+  }
+
+  @Test
   public void handshake() throws Exception {
     SocketChannel mockChannel = mock(SocketChannel.class);
     when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
@@ -114,7 +124,7 @@ public class NioSslEngineTest {
     verify(mockEngine, atLeast(2)).getHandshakeStatus();
     verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
     verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
-    verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class),
+    verify(spyBufferPool, times(2)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
         any(ByteBuffer.class), any(Integer.class));
     verify(spyNioSslEngine, times(1)).handleBlockingTasks();
     verify(mockChannel, times(3)).read(any(ByteBuffer.class));
@@ -178,148 +188,148 @@ public class NioSslEngineTest {
         .hasMessageContaining("SSL Handshake terminated with status");
   }
 
-
-  @Test
-  public void checkClosed() throws Exception {
-    nioSslEngine.checkClosed();
-  }
-
-  @Test(expected = IOException.class)
-  public void checkClosedThrows() throws Exception {
-    when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
-        new SSLEngineResult(CLOSED, FINISHED, 0, 100));
-    nioSslEngine.close(mock(SocketChannel.class));
-    nioSslEngine.checkClosed();
-  }
-
-  @Test
-  public void synchObjectIsSelf() {
-    // for thread-safety the synchronization object given to outside entities
-    // must be the the engine itself. This allows external manipulation or
-    // use of the engine's buffers to be protected in the same way as its synchronized
-    // methods
-    assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine);
-  }
-
   @Test
   public void wrap() throws Exception {
-    // make the application data too big to fit into the engine's encryption buffer
-    ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
-    byte[] appBytes = new byte[appData.capacity()];
-    Arrays.fill(appBytes, (byte) 0x1F);
-    appData.put(appBytes);
-    appData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(
-        new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
-    spyNioSslEngine.engine = testEngine;
-
-    ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
-
-    verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class),
-        any(ByteBuffer.class), any(Integer.class));
-    appData.flip();
-    assertThat(wrappedBuffer).isEqualTo(appData);
-    verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+
+      // make the application data too big to fit into the engine's encryption buffer
+      ByteBuffer appData =
+          ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
+      byte[] appBytes = new byte[appData.capacity()];
+      Arrays.fill(appBytes, (byte) 0x1F);
+      appData.put(appBytes);
+      appData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(
+          new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
+      spyNioSslEngine.engine = testEngine;
+
+      try (final ByteBufferSharing outputSharing2 = spyNioSslEngine.wrap(appData)) {
+        ByteBuffer wrappedBuffer = outputSharing2.getBuffer();
+
+        verify(spyBufferPool, times(1)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
+            any(ByteBuffer.class), any(Integer.class));
+        appData.flip();
+        assertThat(wrappedBuffer).isEqualTo(appData);
+      }
+      verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+    }
   }
 
   @Test
-  public void wrapFails() {
-    // make the application data too big to fit into the engine's encryption buffer
-    ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
-    byte[] appBytes = new byte[appData.capacity()];
-    Arrays.fill(appBytes, (byte) 0x1F);
-    appData.put(appBytes);
-    appData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(
-        new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
-    spyNioSslEngine.engine = testEngine;
-
-    assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
-        .hasMessageContaining("Error encrypting data");
+  public void wrapFails() throws IOException {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+      // make the application data too big to fit into the engine's encryption buffer
+      ByteBuffer appData =
+          ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
+      byte[] appBytes = new byte[appData.capacity()];
+      Arrays.fill(appBytes, (byte) 0x1F);
+      appData.put(appBytes);
+      appData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(
+          new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
+      spyNioSslEngine.engine = testEngine;
+
+      assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+          .hasMessageContaining("Error encrypting data");
+    }
   }
 
   @Test
   public void unwrapWithBufferOverflow() throws Exception {
-    // make the application data too big to fit into the engine's encryption buffer
-    int originalPeerAppDataCapacity = nioSslEngine.peerAppData.capacity();
-    int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
-    nioSslEngine.peerAppData.position(originalPeerAppDataPosition);
-    ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
-    byte[] netBytes = new byte[wrappedData.capacity()];
-    Arrays.fill(netBytes, (byte) 0x1F);
-    wrappedData.put(netBytes);
-    wrappedData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    spyNioSslEngine.engine = testEngine;
-
-    testEngine.addReturnResult(
-        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
-        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
-        new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
-        new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
-
-    int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
-    expectedCapacity =
-        2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
-    expectedCapacity =
-        2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
-    ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
-    unwrappedBuffer.flip();
-    assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      // make the application data too big to fit into the engine's encryption buffer
+      final ByteBuffer peerAppData = inputSharing.getBuffer();
+
+      int originalPeerAppDataCapacity = peerAppData.capacity();
+      int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
+      peerAppData.position(originalPeerAppDataPosition);
+      ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
+      byte[] netBytes = new byte[wrappedData.capacity()];
+      Arrays.fill(netBytes, (byte) 0x1F);
+      wrappedData.put(netBytes);
+      wrappedData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      spyNioSslEngine.engine = testEngine;
+
+      testEngine.addReturnResult(
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
+          new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
+          new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
+
+      int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
+      expectedCapacity =
+          2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+      expectedCapacity =
+          2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+      try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
+        ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+        unwrappedBuffer.flip();
+        assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
+      }
+    }
   }
 
 
   @Test
   public void unwrapWithBufferUnderflow() throws Exception {
-    ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
-    byte[] netBytes = new byte[wrappedData.capacity() / 2];
-    Arrays.fill(netBytes, (byte) 0x1F);
-    wrappedData.put(netBytes);
-    wrappedData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
-    spyNioSslEngine.engine = testEngine;
-
-    ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
-    unwrappedBuffer.flip();
-    assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
-    assertThat(wrappedData.position()).isEqualTo(netBytes.length);
-  }
-
-  @Test
-  public void unwrapWithDecryptionError() {
-    // make the application data too big to fit into the engine's encryption buffer
-    ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
-    byte[] netBytes = new byte[wrappedData.capacity() / 2];
-    Arrays.fill(netBytes, (byte) 0x1F);
-    wrappedData.put(netBytes);
-    wrappedData.flip();
-
-    // create an engine that will transfer bytes from the application buffer to the encrypted buffer
-    TestSSLEngine testEngine = new TestSSLEngine();
-    testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
-    spyNioSslEngine.engine = testEngine;
-
-    assertThatThrownBy(() -> spyNioSslEngine.unwrap(wrappedData)).isInstanceOf(SSLException.class)
-        .hasMessageContaining("Error decrypting data");
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      ByteBuffer wrappedData =
+          ByteBuffer.allocate(inputSharing.getBuffer().capacity());
+      byte[] netBytes = new byte[wrappedData.capacity() / 2];
+      Arrays.fill(netBytes, (byte) 0x1F);
+      wrappedData.put(netBytes);
+      wrappedData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
+      spyNioSslEngine.engine = testEngine;
+
+      try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
+        ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+        unwrappedBuffer.flip();
+        assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
+      }
+      assertThat(wrappedData.position()).isEqualTo(netBytes.length);
+    }
   }
 
   @Test
-  public void ensureUnwrappedCapacity() {
-    ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize);
-    int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2;
-    ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity);
-    assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
+  public void unwrapWithDecryptionError() throws IOException {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      // make the application data too big to fit into the engine's encryption buffer
+      ByteBuffer wrappedData =
+          ByteBuffer.allocate(inputSharing.getBuffer().capacity());
+      byte[] netBytes = new byte[wrappedData.capacity() / 2];
+      Arrays.fill(netBytes, (byte) 0x1F);
+      wrappedData.put(netBytes);
+      wrappedData.flip();
+
+      // create an engine that will transfer bytes from the application buffer to the encrypted
+      // buffer
+      TestSSLEngine testEngine = new TestSSLEngine();
+      testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+      spyNioSslEngine.engine = testEngine;
+
+      assertThatThrownBy(() -> {
+        try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) {
+        }
+      }).isInstanceOf(SSLException.class)
+          .hasMessageContaining("Error decrypting data");
+    }
   }
 
   @Test
@@ -333,7 +343,11 @@ public class NioSslEngineTest {
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
-    assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IOException.class)
+    assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
+        .isInstanceOf(IOException.class)
+        .hasMessageContaining("NioSslEngine has been closed");
+    assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
+        .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
     nioSslEngine.close(mockChannel);
   }
@@ -362,10 +376,12 @@ public class NioSslEngineTest {
 
     when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
-      // give the NioSslEngine something to write on its socket channel, simulating a TLS close
-      // message
-      nioSslEngine.myNetData.put("Goodbye cruel world".getBytes());
-      return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+      try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+        // give the NioSslEngine something to write on its socket channel, simulating a TLS close
+        // message
+        outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
+        return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+      }
     });
     when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
     nioSslEngine.close(mockChannel);
@@ -396,37 +412,42 @@ public class NioSslEngineTest {
     ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
     SocketChannel mockChannel = mock(SocketChannel.class);
 
-    // force a compaction by making the decoded buffer appear near to being full
-    ByteBuffer unwrappedBuffer = nioSslEngine.peerAppData;
-    unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
-    unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
-
-    // simulate some socket reads
-    when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
-      @Override
-      public Integer answer(InvocationOnMock invocation) throws Throwable {
-        ByteBuffer buffer = invocation.getArgument(0);
-        buffer.position(buffer.position() + individualRead);
-        return individualRead;
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      // force a compaction by making the decoded buffer appear near to being full
+      ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
+      unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
+      unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
+
+      // simulate some socket reads
+      when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
+        @Override
+        public Integer answer(InvocationOnMock invocation) throws Throwable {
+          ByteBuffer buffer = invocation.getArgument(0);
+          buffer.position(buffer.position() + individualRead);
+          return individualRead;
+        }
+      });
+
+      TestSSLEngine testSSLEngine = new TestSSLEngine();
+      testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
+      nioSslEngine.engine = testSSLEngine;
+
+      try (final ByteBufferSharing sharedBuffer =
+          nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+        ByteBuffer data = sharedBuffer.getBuffer();
+        verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+        assertThat(data.position()).isEqualTo(0);
+        assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
       }
-    });
-
-    TestSSLEngine testSSLEngine = new TestSSLEngine();
-    testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
-    nioSslEngine.engine = testSSLEngine;
-
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+    }
   }
 
 
   /**
-   * This tests the case where a message header has been read and part of a message has been
-   * read, but the decoded buffer is too small to hold all of the message. In this case
-   * the readAtLeast method will have to expand the capacity of the decoded buffer and return
-   * the new, expanded, buffer as the method result.
+   * This tests the case where a message header has been read and part of a message has been read,
+   * but the decoded buffer is too small to hold all of the message. In this case the readAtLeast
+   * method will have to expand the capacity of the decoded buffer and return the new, expanded,
+   * buffer as the method result.
    */
   @Test
   public void readAtLeastUsingSmallAppBuffer() throws Exception {
@@ -440,7 +461,11 @@ public class NioSslEngineTest {
     int initialUnwrappedBufferSize = 100;
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-    nioSslEngine.peerAppData = unwrappedBuffer;
+
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -460,22 +485,26 @@ public class NioSslEngineTest {
         new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
-    // The initial available space in the unwrapped buffer should have doubled
-    int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
-    assertThat(nioSslEngine.peerAppData.capacity())
-        .isEqualTo(2 * initialFreeSpace + preexistingBytes);
+    try (final ByteBufferSharing sharedBuffer =
+        nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+      assertThat(data.position()).isEqualTo(0);
+      assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+      // The initial available space in the unwrapped buffer should have doubled
+      int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
+      try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+        assertThat(inputSharing.getBuffer().capacity())
+            .isEqualTo(2 * initialFreeSpace + preexistingBytes);
+      }
+    }
   }
 
 
   /**
-   * This tests the case where a message header has been read and part of a message has been
-   * read, but the decoded buffer is too small to hold all of the message. In this case
-   * the buffer is completely full and should only take one overflow response to resolve
-   * the problem.
+   * This tests the case where a message header has been read and part of a message has been read,
+   * but the decoded buffer is too small to hold all of the message. In this case the buffer is
+   * completely full and should only take one overflow response to resolve the problem.
    */
   @Test
   public void readAtLeastUsingSmallAppBufferAtWriteLimit() throws Exception {
@@ -490,7 +519,10 @@ public class NioSslEngineTest {
     // force buffer expansion by making a small decoded buffer appear near to being full
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-    nioSslEngine.peerAppData = unwrappedBuffer;
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -510,11 +542,14 @@ public class NioSslEngineTest {
         new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
-    verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
-    assertThat(data.position()).isEqualTo(0);
-    assertThat(data.limit())
-        .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
+    try (final ByteBufferSharing sharedBuffer =
+        nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+      ByteBuffer data = sharedBuffer.getBuffer();
+      verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
+      assertThat(data.position()).isEqualTo(0);
+      assertThat(data.limit())
+          .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
+    }
   }
 
 
@@ -652,8 +687,8 @@ public class NioSslEngineTest {
     }
 
     /**
-     * add an engine operation result to be returned by wrap or unwrap.
-     * Like Mockito's thenReturn(), the last return result will repeat forever
+     * add an engine operation result to be returned by wrap or unwrap. Like Mockito's thenReturn(),
+     * the last return result will repeat forever
      */
     void addReturnResult(SSLEngineResult... sslEngineResult) {
       for (SSLEngineResult result : sslEngineResult) {