You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2021/06/17 15:41:19 UTC

[geode] 01/02: Revert "GEODE-9141: (2 of 2) Handle in-buffer concurrency"

This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c13a62199a3720c69c142b0dcd2f20130ac3b1a4
Author: Ernest Burghardt <eb...@pivotal.io>
AuthorDate: Thu Jun 17 10:39:47 2021 -0500

    Revert "GEODE-9141: (2 of 2) Handle in-buffer concurrency"
    
    This reverts commit 1a8eb5aec580eb75871060793ea65d62f5f2d959.
---
 ...LSocketHostNameVerificationIntegrationTest.java |   6 +-
 .../internal/net/SSLSocketIntegrationTest.java     |   3 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   2 +-
 .../geode/internal/net/ByteBufferSharing.java      |  15 -
 .../geode/internal/net/ByteBufferSharingNoOp.java  |   5 -
 .../geode/internal/net/ByteBufferVendor.java       | 144 +++------
 .../apache/geode/internal/net/NioSslEngine.java    |  50 +--
 .../apache/geode/internal/net/SocketCreator.java   |   9 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 334 ++++++++++-----------
 .../geode/internal/net/ByteBufferVendorTest.java   |  36 +--
 .../geode/internal/net/NioSslEngineTest.java       |  41 ++-
 .../apache/geode/internal/tcp/ConnectionTest.java  |   1 -
 12 files changed, 285 insertions(+), 361 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index e86bfea..a70f3b1 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -103,9 +103,6 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
   @Before
   public void setUp() throws Exception {
-
-    SocketCreatorFactory.close(); // to clear socket creators made in previous tests
-
     IgnoredException.addIgnoredException("javax.net.ssl.SSLException: Read timed out");
 
     this.localHost = InetAddress.getLoopbackAddress();
@@ -175,7 +172,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
     try {
       this.socketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-          sslEngine, 0,
+          sslEngine, 0, true,
           ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
           new BufferPool(mock(DMStats.class)));
 
@@ -208,6 +205,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(),
                 sslEngine,
                 timeoutMillis,
+                false,
                 ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
                 new BufferPool(mock(DMStats.class)));
       } catch (Throwable throwable) {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 13e9d5b..e7ac191 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -217,7 +217,7 @@ public class SSLSocketIntegrationTest {
     clientSocket = clientChannel.socket();
     NioSslEngine engine =
         clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0,
+            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, true,
             ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
     clientChannel.configureBlocking(true);
 
@@ -267,6 +267,7 @@ public class SSLSocketIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(), sc.createSSLEngine("localhost", 1234,
                 false),
                 timeoutMillis,
+                false,
                 ByteBuffer.allocate(65535),
                 new BufferPool(mock(DMStats.class)));
 
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index cd1af3a..af3bd1e 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,4 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
 org/apache/geode/cache/query/internal/xml/ElementType$1
 org/apache/geode/cache/query/internal/xml/ElementType$2
 org/apache/geode/cache/query/internal/xml/ElementType$3
-org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
\ No newline at end of file
+org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
index c8a94ce..cdfa897 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -42,27 +42,12 @@ public interface ByteBufferSharing extends AutoCloseable {
    *
    * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
    *
-   * This variant is for use when the buffer is being written to.
-   *
    * @return the same buffer or a different (bigger) buffer
    * @throws IOException if the buffer is no longer accessible
    */
   ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
 
   /**
-   * Expand the buffer if needed. This may return a different object so be sure to pay attention to
-   * the return value if you need access to the potentially- expanded buffer.
-   *
-   * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
-   *
-   * This variant is for use when the buffer is being read from.
-   *
-   * @return the same buffer or a different (bigger) buffer
-   * @throws IOException if the buffer is no longer accessible
-   */
-  ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException;
-
-  /**
    * Override {@link AutoCloseable#close()} without throws clause since we don't need one.
    */
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
index 4f36e5b..4a8bc49 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -48,10 +48,5 @@ class ByteBufferSharingNoOp implements ByteBufferSharing {
   }
 
   @Override
-  public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
-    throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
-  }
-
-  @Override
   public void close() {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
index 1dc74f0..4933247 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.net;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,49 +27,49 @@ import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
 
 /**
- * Produces (via {@link #open()}) an {@link ByteBufferSharing} meant to used only within a
- * try-with-resources block. The resource controls access to a secondary resource
- * (via {@link ByteBufferSharing#getBuffer()}) within the scope of try-with-resources.
- * Neither the object returned by {@link #open()}, nor the object returned by invoking
- * {@link ByteBufferSharing#getBuffer()} on that object may be used outside the scope of
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
  * try-with-resources.
  */
-public class ByteBufferVendor {
+class ByteBufferVendor implements ByteBufferSharing {
 
   static class OpenAttemptTimedOut extends Exception {
   }
 
-  private interface ByteBufferSharingInternal extends ByteBufferSharing {
-    void releaseBuffer();
-  }
-
-  private final Lock lock = new ReentrantLock();
-  private final AtomicBoolean isDestructed = new AtomicBoolean(false);
-  private final AtomicInteger counter = new AtomicInteger(1);
-  // the object referenced by sharing is guarded by lock
-  private final ByteBufferSharingInternal sharing;
+  private final Lock lock;
+  private final AtomicBoolean isDestructed;
+  // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
+  private volatile ByteBuffer buffer;
+  private final BufferType bufferType;
+  private final AtomicInteger counter;
+  private final BufferPool bufferPool;
 
-  /*
-   * These constructors are for use only by the owner of the shared resource.
+  /**
+   * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
    *
    * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
    * to an external object or is returned to an external caller.)
    *
-   * Constructors acquire no locks. The reference count will be 1 after a constructor
+   * This constructor acquires no lock. The reference count will be 1 after this constructor
    * completes.
    */
+  ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType,
+                   final BufferPool bufferPool) {
+    this.buffer = buffer;
+    this.bufferType = bufferType;
+    this.bufferPool = bufferPool;
+    lock = new ReentrantLock();
+    counter = new AtomicInteger(1);
+    isDestructed = new AtomicBoolean(false);
+  }
 
   /**
-   * When you have a ByteBuffer available before construction, use this constructor.
-   *
-   * @param bufferArg is the ByteBuffer
-   * @param bufferType needed for freeing the buffer later
-   * @param bufferPool needed for freeing the buffer later
+   * The destructor. Called by the resource owner to undo the work of the constructor.
    */
-  public ByteBufferVendor(final ByteBuffer bufferArg,
-      final BufferType bufferType,
-      final BufferPool bufferPool) {
-    sharing = new ByteBufferSharingInternalImpl(bufferArg, bufferType, bufferPool);
+  void destruct() {
+    if (isDestructed.compareAndSet(false, true)) {
+      dropReference();
+    }
   }
 
   /**
@@ -79,19 +78,18 @@ public class ByteBufferVendor {
    *
    * Resource owners call this method as the last thing before returning a reference to the caller.
    * That caller binds that reference to a variable in a try-with-resources statement and relies on
-   * the AutoCloseable protocol to invoke {@link AutoCloseable#close()} on the object at
-   * the end of the block.
+   * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
    */
-  public ByteBufferSharing open() throws IOException {
+  ByteBufferSharing open() throws IOException {
     lock.lock();
     addReferenceAfterLock();
-    return sharing;
+    return this;
   }
 
   /**
    * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
    */
-  public ByteBufferSharing open(final long time, final TimeUnit unit)
+  ByteBufferSharing open(final long time, final TimeUnit unit)
       throws OpenAttemptTimedOut, IOException {
     try {
       if (!lock.tryLock(time, unit)) {
@@ -102,25 +100,24 @@ public class ByteBufferVendor {
       throw new OpenAttemptTimedOut();
     }
     addReferenceAfterLock();
-    return sharing;
-  }
-
-  /**
-   * The destructor. Called by the resource owner to undo the work of the constructor.
-   */
-  public void destruct() {
-    if (isDestructed.compareAndSet(false, true)) {
-      dropReference();
-    }
+    return this;
   }
 
-  private void exposingResource() throws IOException {
+  @Override
+  public ByteBuffer getBuffer() throws IOException {
     if (isDestructed.get()) {
       throwClosed();
     }
+    return buffer;
   }
 
-  private void close() {
+  @Override
+  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+    return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+  }
+
+  @Override
+  public void close() {
     /*
      * We are counting on our ReentrantLock throwing an exception if the current thread
      * does not hold the lock. In that case dropReference() will not be called. This
@@ -145,11 +142,16 @@ public class ByteBufferVendor {
   private int dropReference() {
     final int usages = counter.decrementAndGet();
     if (usages == 0) {
-      sharing.releaseBuffer();
+      bufferPool.releaseBuffer(bufferType, buffer);
     }
     return usages;
   }
 
+  @VisibleForTesting
+  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+    buffer = newBufferForTesting;
+  }
+
   private void addReferenceAfterLock() throws IOException {
     try {
       addReference();
@@ -163,54 +165,4 @@ public class ByteBufferVendor {
     throw new IOException("NioSslEngine has been closed");
   }
 
-  private class ByteBufferSharingInternalImpl implements ByteBufferSharingInternal {
-
-    /*
-     * mutable because in general our ByteBuffer may need to be resized (grown or compacted)
-     * no concurrency concerns since ByteBufferSharingNotNull is guarded by ByteBufferVendor.lock
-     */
-    private ByteBuffer buffer;
-    private final BufferType bufferType;
-    private final BufferPool bufferPool;
-
-    public ByteBufferSharingInternalImpl(final ByteBuffer buffer,
-        final BufferType bufferType,
-        final BufferPool bufferPool) {
-      Objects.requireNonNull(buffer);
-      this.buffer = buffer;
-      this.bufferType = bufferType;
-      this.bufferPool = bufferPool;
-    }
-
-    @Override
-    public ByteBuffer getBuffer() throws IOException {
-      exposingResource();
-      return buffer;
-    }
-
-    @Override
-    public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
-      return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
-    }
-
-    @Override
-    public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
-      return buffer = bufferPool.expandReadBufferIfNeeded(bufferType, getBuffer(), newCapacity);
-    }
-
-    @Override
-    public void close() {
-      ByteBufferVendor.this.close();
-    }
-
-    @Override
-    public void releaseBuffer() {
-      bufferPool.releaseBuffer(bufferType, buffer);
-    }
-  }
-
-  @VisibleForTesting
-  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
-    ((ByteBufferSharingInternalImpl) sharing).buffer = newBufferForTesting;
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 4c603a0..fc91a31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter {
   /**
    * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  private final ByteBufferVendor outputBufferVendor;
+  private final ByteBufferVendor outputSharing;
 
   /**
    * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  private final ByteBufferVendor inputBufferVendor;
+  private final ByteBufferVendor inputSharing;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
@@ -76,10 +76,10 @@ public class NioSslEngine implements NioFilter {
     closed = false;
     this.engine = engine;
     this.bufferPool = bufferPool;
-    outputBufferVendor =
+    outputSharing =
         new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
             TRACKED_SENDER, bufferPool);
-    inputBufferVendor =
+    inputSharing =
         new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
             TRACKED_RECEIVER, bufferPool);
   }
@@ -98,7 +98,7 @@ public class NioSslEngine implements NioFilter {
           peerNetData.capacity(), engine.getSession().getPacketBufferSize()));
     }
 
-    final ByteBuffer handshakeBuffer = peerNetData;
+    ByteBuffer handshakeBuffer = peerNetData;
     handshakeBuffer.clear();
 
     ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]);
@@ -135,7 +135,7 @@ public class NioSslEngine implements NioFilter {
 
       switch (status) {
         case NEED_UNWRAP:
-          try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+          try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
             final ByteBuffer peerAppData = inputSharing.getBuffer();
 
             // Receive handshaking data from peer
@@ -162,7 +162,7 @@ public class NioSslEngine implements NioFilter {
           }
 
         case NEED_WRAP:
-          try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
+          try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
             final ByteBuffer myNetData = outputSharing.getBuffer();
 
             // Empty the local network packet buffer.
@@ -231,7 +231,7 @@ public class NioSslEngine implements NioFilter {
 
   @Override
   public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
-    try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
 
       ByteBuffer myNetData = outputSharing.getBuffer();
 
@@ -260,13 +260,13 @@ public class NioSslEngine implements NioFilter {
 
       myNetData.flip();
 
-      return outputBufferVendor.open();
+      return shareOutputBuffer();
     }
   }
 
   @Override
   public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
-    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -292,7 +292,7 @@ public class NioSslEngine implements NioFilter {
             // partial data - need to read more. When this happens the SSLEngine will not have
             // changed the buffer position
             wrappedBuffer.compact();
-            return inputBufferVendor.open();
+            return shareInputBuffer();
           case OK:
             break;
           default:// if there is data in the decrypted buffer return it. Otherwise signal that we're
@@ -305,7 +305,7 @@ public class NioSslEngine implements NioFilter {
         }
       }
       wrappedBuffer.clear();
-      return inputBufferVendor.open();
+      return shareInputBuffer();
     }
   }
 
@@ -325,7 +325,7 @@ public class NioSslEngine implements NioFilter {
   @Override
   public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
       ByteBuffer wrappedBuffer) throws IOException {
-    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -355,13 +355,13 @@ public class NioSslEngine implements NioFilter {
           }
         }
       }
-      return inputBufferVendor.open();
+      return shareInputBuffer();
     }
   }
 
   @Override
   public ByteBufferSharing getUnwrappedBuffer() throws IOException {
-    return inputBufferVendor.open();
+    return shareInputBuffer();
   }
 
   @Override
@@ -377,8 +377,8 @@ public class NioSslEngine implements NioFilter {
       return;
     }
     closed = true;
-    inputBufferVendor.destruct();
-    try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) {
+    inputSharing.destruct();
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
       final ByteBuffer myNetData = outputSharing.getBuffer();
 
       if (!engine.isOutboundDone()) {
@@ -412,7 +412,7 @@ public class NioSslEngine implements NioFilter {
         engine.closeOutbound();
       }
     } finally {
-      outputBufferVendor.destruct();
+      outputSharing.destruct();
     }
   }
 
@@ -422,12 +422,16 @@ public class NioSslEngine implements NioFilter {
   }
 
   @VisibleForTesting
-  public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException {
-    return outputBufferVendor;
+  public ByteBufferSharing shareOutputBuffer() throws IOException {
+    return outputSharing.open();
   }
 
-  @VisibleForTesting
-  public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException {
-    return inputBufferVendor;
+  private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
+      throws OpenAttemptTimedOut, IOException {
+    return outputSharing.open(time, unit);
+  }
+
+  public ByteBufferSharing shareInputBuffer() throws IOException {
+    return inputSharing.open();
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index a31bbb2..a232fca 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -803,16 +803,21 @@ public class SocketCreator extends TcpSocketCreatorImpl {
    * @param socketChannel the socket's NIO channel
    * @param engine the sslEngine (see createSSLEngine)
    * @param timeout handshake timeout in milliseconds. No timeout if <= 0
+   * @param clientSocket set to true if you initiated the connect(), false if you accepted it
    * @param peerNetBuffer the buffer to use in reading data fron socketChannel. This should also be
    *        used in subsequent I/O operations
    * @return The SSLEngine to be used in processing data for sending/receiving from the channel
    */
-  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel,
-      SSLEngine engine,
+  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, SSLEngine engine,
       int timeout,
+      boolean clientSocket,
       ByteBuffer peerNetBuffer,
       BufferPool bufferPool)
       throws IOException {
+    engine.setUseClientMode(clientSocket);
+    if (!clientSocket) {
+      engine.setNeedClientAuth(sslConfig.isRequireAuth());
+    }
     while (!socketChannel.finishConnect()) {
       try {
         Thread.sleep(50);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 107aa9f..f5a1886 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,7 +18,6 @@ import static java.lang.Boolean.FALSE;
 import static java.lang.ThreadLocal.withInitial;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
 import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
-import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
 import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
 
 import java.io.DataInput;
@@ -80,7 +79,6 @@ import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.ByteBufferSharing;
-import org.apache.geode.internal.net.ByteBufferVendor;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -119,7 +117,7 @@ public class Connection implements Runnable {
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
-  public static final int SMALL_BUFFER_SIZE =
+  static final int SMALL_BUFFER_SIZE =
       Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
   /**
@@ -311,14 +309,11 @@ public class Connection implements Runnable {
   /** name of thread that we're currently performing an operation in (may be null) */
   private String ackThreadName;
 
-  /*
-   * This object mediates access to the input ByteBuffer and ensures its return to
-   * pool after last use. This reference couldn't be final since it is initialized
-   * in createIoFilter() not in the constructors. It had to be initialized there
-   * because in general we have to construct an SSLEngine before we know the buffer
-   * size and createIoFilter() is where we create that object.
-   */
-  private ByteBufferVendor inputBufferVendor;
+  /** the buffer used for message receipt */
+  private ByteBuffer inputBuffer;
+
+  /** Lock used to protect the input buffer */
+  public final Object inputBufferLock = new Object();
 
   /** the length of the next message to be dispatched */
   private int messageLength;
@@ -894,28 +889,27 @@ public class Connection implements Runnable {
     waitForAddressCompletion();
 
     InternalDistributedMember myAddr = owner.getConduit().getMemberId();
-    try (final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE)) {
-      /*
-       * Note a byte of zero is always written because old products serialized a member id with
-       * always sends an ip address. My reading of the ip-address specs indicated that the first
-       * byte of a valid address would never be 0.
-       */
-      connectHandshake.writeByte(0);
-      connectHandshake.writeByte(HANDSHAKE_VERSION);
-      // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
-      InternalDataSerializer.invokeToData(myAddr, connectHandshake);
-      connectHandshake.writeBoolean(sharedResource);
-      connectHandshake.writeBoolean(preserveOrder);
-      connectHandshake.writeLong(uniqueId);
-      // write the product version ordinal
-      Version.CURRENT.writeOrdinal(connectHandshake, true);
-      connectHandshake.writeInt(dominoCount.get() + 1);
-      // this writes the sending member + thread name that is stored in senderName
-      // on the receiver to show the cause of reader thread creation
-      connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
-          MsgIdGenerator.NO_MSG_ID);
-      writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
-    }
+    final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
+    /*
+     * Note a byte of zero is always written because old products serialized a member id with always
+     * sends an ip address. My reading of the ip-address specs indicated that the first byte of a
+     * valid address would never be 0.
+     */
+    connectHandshake.writeByte(0);
+    connectHandshake.writeByte(HANDSHAKE_VERSION);
+    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
+    InternalDataSerializer.invokeToData(myAddr, connectHandshake);
+    connectHandshake.writeBoolean(sharedResource);
+    connectHandshake.writeBoolean(preserveOrder);
+    connectHandshake.writeLong(uniqueId);
+    // write the product version ordinal
+    Version.CURRENT.writeOrdinal(connectHandshake, true);
+    connectHandshake.writeInt(dominoCount.get() + 1);
+    // this writes the sending member + thread name that is stored in senderName
+    // on the receiver to show the cause of reader thread creation
+    connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
+        MsgIdGenerator.NO_MSG_ID);
+    writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
   }
 
   /**
@@ -1359,7 +1353,7 @@ public class Connection implements Runnable {
         if (!isReceiver && !hasResidualReaderThread()) {
           // receivers release the input buffer when exiting run(). Senders use the
           // inputBuffer for reading direct-reply responses
-          inputBufferVendor.destruct();
+          releaseInputBuffer();
         }
         lengthSet = false;
       }
@@ -1500,7 +1494,7 @@ public class Connection implements Runnable {
         }
       }
 
-      inputBufferVendor.destruct();
+      releaseInputBuffer();
 
       // make sure that if the reader thread exits we notify a thread waiting for the handshake.
       notifyHandshakeWaiter(false);
@@ -1512,6 +1506,16 @@ public class Connection implements Runnable {
     }
   }
 
+  private void releaseInputBuffer() {
+    synchronized (inputBufferLock) {
+      ByteBuffer tmp = inputBuffer;
+      if (tmp != null) {
+        inputBuffer = null;
+        getBufferPool().releaseReceiveBuffer(tmp);
+      }
+    }
+  }
+
   BufferPool getBufferPool() {
     return owner.getBufferPool();
   }
@@ -1533,6 +1537,9 @@ public class Connection implements Runnable {
   }
 
   private void readMessages() {
+    if (closing.get()) {
+      return;
+    }
     // take a snapshot of uniqueId to detect reconnect attempts
     SocketChannel channel;
     try {
@@ -1578,6 +1585,8 @@ public class Connection implements Runnable {
     // we should not change the state of the connection if we are a handshake reader thread
     // as there is a race between this thread and the application thread doing direct ack
     boolean handshakeHasBeenRead = false;
+    // if we're using SSL/TLS the input buffer may already have data to process
+    boolean skipInitialRead = getInputBuffer().position() > 0;
     try {
       for (boolean isInitialRead = true;;) {
         if (stopped) {
@@ -1597,9 +1606,8 @@ public class Connection implements Runnable {
           break;
         }
 
-        try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
-          ByteBuffer buff = inputSharing.getBuffer();
-
+        try {
+          ByteBuffer buff = getInputBuffer();
           synchronized (stateLock) {
             connectionState = STATE_READING;
           }
@@ -1608,8 +1616,6 @@ public class Connection implements Runnable {
             amountRead = channel.read(buff);
           } else {
             isInitialRead = false;
-            // if we're using SSL/TLS the input buffer may already have data to process
-            final boolean skipInitialRead = buff.position() > 0;
             if (!skipInitialRead) {
               amountRead = channel.read(buff);
             } else {
@@ -1674,11 +1680,11 @@ public class Connection implements Runnable {
         } catch (IOException e) {
           // "Socket closed" check needed for Solaris jdk 1.4.2_08
           if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
-            if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
-              logger.info("{} io exception for {}", p2pReaderName(), this, e);
+            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
+              logger.debug("{} io exception for {}", p2pReaderName(), this, e);
             }
-            if (logger.isDebugEnabled()) {
-              if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
+            if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
+              if (logger.isDebugEnabled()) {
                 logger.debug(
                     "{} received unexpected WSACancelBlockingCall exception, which may result in a hang",
                     p2pReaderName());
@@ -1722,55 +1728,28 @@ public class Connection implements Runnable {
   private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException {
     if (getConduit().useSSL() && channel != null) {
       InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
-      String hostName;
-      if (remoteAddr != null) {
-        hostName = remoteAddr.getHostName();
-      } else {
-        hostName = SocketCreator.getHostName(address.getAddress());
-      }
       SSLEngine engine =
-          getConduit().getSocketCreator().createSSLEngine(hostName,
-              address.getPort(), clientSocket);
-
-      final int packetBufferSize = engine.getSession().getPacketBufferSize();
-
-      inputBufferVendor =
-          new ByteBufferVendor(
-              getBufferPool().acquireDirectReceiveBuffer(packetBufferSize),
-              TRACKED_RECEIVER,
-              getBufferPool());
+          getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort(),
+              clientSocket);
 
+      int packetBufferSize = engine.getSession().getPacketBufferSize();
+      if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) {
+        // TLS has a minimum input buffer size constraint
+        if (inputBuffer != null) {
+          getBufferPool().releaseReceiveBuffer(inputBuffer);
+        }
+        inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize);
+      }
       if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
         channel.socket().setReceiveBufferSize(packetBufferSize);
       }
       if (channel.socket().getSendBufferSize() < packetBufferSize) {
         channel.socket().setSendBufferSize(packetBufferSize);
       }
-      try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
-        final ByteBuffer inputBuffer = inputSharing.getBuffer();
-        /*
-         * It's ok to share the inputBuffer with handshakeSSLSocketChannel() since that method
-         * accesses the referenced buffer for the handshake which completes before returning
-         * control here. The NioSslEngine retains no reference to the buffer.
-         */
-        ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
-            getConduit().idleConnectionTimeout, inputBuffer,
-            getBufferPool());
-      }
+      ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
+          getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
+          getBufferPool());
     } else {
-      final int allocSize;
-      if (recvBufferSize == -1) {
-        allocSize = owner.getConduit().tcpBufferSize;
-      } else {
-        allocSize = recvBufferSize;
-      }
-
-      inputBufferVendor =
-          new ByteBufferVendor(
-              getBufferPool().acquireDirectReceiveBuffer(allocSize),
-              TRACKED_RECEIVER,
-              getBufferPool());
-
       ioFilter = new NioPlainEngine(getBufferPool());
     }
   }
@@ -1802,13 +1781,9 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-
-    if (e instanceof SSLException && msg.contains("status = closed")) {
-      return true; // engine has been closed - this is normal
-    }
-
-    return (msg.contains("forcibly closed") || msg.contains("reset by peer")
-        || msg.contains("connection reset") || msg.contains("socket is closed"));
+    return msg.contains("forcibly closed")
+        || msg.contains("reset by peer")
+        || msg.contains("connection reset");
   }
 
   private static boolean validMsgType(int msgType) {
@@ -2652,6 +2627,20 @@ public class Connection implements Runnable {
   }
 
   /**
+   * gets the buffer for receiving message length bytes
+   */
+  private ByteBuffer getInputBuffer() {
+    if (inputBuffer == null) {
+      int allocSize = recvBufferSize;
+      if (allocSize == -1) {
+        allocSize = owner.getConduit().tcpBufferSize;
+      }
+      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
+    }
+    return inputBuffer;
+  }
+
+  /**
    * @throws SocketTimeoutException if wait expires.
    * @throws ConnectionException if ack is not received
    */
@@ -2758,93 +2747,72 @@ public class Connection implements Runnable {
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
-    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
-      // can't be final because in some cases we expand the buffer (resulting in a new object)
-      ByteBuffer inputBuffer = inputSharing.getBuffer();
-      inputBuffer.flip();
+    inputBuffer.flip();
 
-      try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
-        final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+    try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
+      final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
 
-        peerDataBuffer.flip();
+      peerDataBuffer.flip();
 
-        boolean done = false;
+      boolean done = false;
 
-        while (!done && connected) {
-          owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-          int remaining = peerDataBuffer.remaining();
-          if (lengthSet || remaining >= MSG_HEADER_BYTES) {
-            if (!lengthSet) {
-              if (readMessageHeader(peerDataBuffer)) {
-                break;
-              }
+      while (!done && connected) {
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+        int remaining = peerDataBuffer.remaining();
+        if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+          if (!lengthSet) {
+            if (readMessageHeader(peerDataBuffer)) {
+              break;
             }
-            if (remaining >= messageLength + MSG_HEADER_BYTES) {
-              lengthSet = false;
-              peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
-              // don't trust the message deserialization to leave the position in
-              // the correct spot. Some of the serialization uses buffered
-              // streams that can leave the position at the wrong spot
-              int startPos = peerDataBuffer.position();
-              int oldLimit = peerDataBuffer.limit();
-              peerDataBuffer.limit(startPos + messageLength);
-
-              if (handshakeRead) {
-                try {
-                  readMessage(peerDataBuffer);
-                } catch (SerializationException e) {
-                  logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
-                  throw e;
-                }
-              } else {
-                try (ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
-                    DataInputStream dis = new DataInputStream(bbis)) {
-                  if (!isReceiver) {
-                    // we read the handshake and then stop processing since we don't want
-                    // to process the input buffer anymore in a handshake thread
-                    readHandshakeForSender(dis, peerDataBuffer);
-                    return;
-                  }
-                  if (readHandshakeForReceiver(dis)) {
-                    ioFilter.doneReading(peerDataBuffer);
-                    return;
-                  }
-                }
-              }
-              if (!connected) {
-                continue;
+          }
+          if (remaining >= messageLength + MSG_HEADER_BYTES) {
+            lengthSet = false;
+            peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
+            // don't trust the message deserialization to leave the position in
+            // the correct spot. Some of the serialization uses buffered
+            // streams that can leave the position at the wrong spot
+            int startPos = peerDataBuffer.position();
+            int oldLimit = peerDataBuffer.limit();
+            peerDataBuffer.limit(startPos + messageLength);
+
+            if (handshakeRead) {
+              try {
+                readMessage(peerDataBuffer);
+              } catch (SerializationException e) {
+                logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
+                throw e;
               }
-              accessed();
-              peerDataBuffer.limit(oldLimit);
-              peerDataBuffer.position(startPos + messageLength);
             } else {
-              done = true;
-              if (getConduit().useSSL()) {
+              ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
+              DataInputStream dis = new DataInputStream(bbis);
+              if (!isReceiver) {
+                // we read the handshake and then stop processing since we don't want
+                // to process the input buffer anymore in a handshake thread
+                readHandshakeForSender(dis, peerDataBuffer);
+                return;
+              }
+              if (readHandshakeForReceiver(dis)) {
                 ioFilter.doneReading(peerDataBuffer);
-              } else {
-                // compact or resize the buffer
-                final int oldBufferSize = inputBuffer.capacity();
-                final int allocSize = messageLength + MSG_HEADER_BYTES;
-                if (oldBufferSize < allocSize) {
-                  // need a bigger buffer
-                  logger.info(
-                      "Allocating larger network read buffer, new size is {} old size was {}.",
-                      allocSize, oldBufferSize);
-                  inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
-                } else {
-                  if (inputBuffer.position() != 0) {
-                    inputBuffer.compact();
-                  } else {
-                    inputBuffer.position(inputBuffer.limit());
-                    inputBuffer.limit(inputBuffer.capacity());
-                  }
-                }
+                return;
               }
             }
+            if (!connected) {
+              continue;
+            }
+            accessed();
+            peerDataBuffer.limit(oldLimit);
+            peerDataBuffer.position(startPos + messageLength);
           } else {
-            ioFilter.doneReading(peerDataBuffer);
             done = true;
+            if (getConduit().useSSL()) {
+              ioFilter.doneReading(peerDataBuffer);
+            } else {
+              compactOrResizeBuffer(messageLength);
+            }
           }
+        } else {
+          ioFilter.doneReading(peerDataBuffer);
+          done = true;
         }
       }
     }
@@ -2979,9 +2947,10 @@ public class Connection implements Runnable {
   private void readMessage(ByteBuffer peerDataBuffer) {
     if (messageType == NORMAL_MSG_TYPE) {
       owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
-      try (ByteBufferInputStream bbis =
+      ByteBufferInputStream bbis =
           remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
-              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) {
+              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
+      try {
         ReplyProcessor21.initMessageRPId();
         // add serialization stats
         long startSer = owner.getConduit().getStats().startMsgDeserialization();
@@ -3234,8 +3203,34 @@ public class Connection implements Runnable {
   private void setThreadName(int dominoNumber) {
     Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " "
         + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un")
-        + "ordered sender uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
-        + " local port=" + socket.getLocalPort() + " remote port=" + socket.getPort());
+        + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
+        + " port=" + socket.getPort());
+  }
+
+  private void compactOrResizeBuffer(int messageLength) {
+    final int oldBufferSize = inputBuffer.capacity();
+    int allocSize = messageLength + MSG_HEADER_BYTES;
+    if (oldBufferSize < allocSize) {
+      // need a bigger buffer
+      logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
+          allocSize, oldBufferSize);
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
+
+      if (oldBuffer != null) {
+        int oldByteCount = oldBuffer.remaining();
+        inputBuffer.put(oldBuffer);
+        inputBuffer.position(oldByteCount);
+        getBufferPool().releaseReceiveBuffer(oldBuffer);
+      }
+    } else {
+      if (inputBuffer.position() != 0) {
+        inputBuffer.compact();
+      } else {
+        inputBuffer.position(inputBuffer.limit());
+        inputBuffer.limit(inputBuffer.capacity());
+      }
+    }
   }
 
   private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck)
@@ -3320,7 +3315,6 @@ public class Connection implements Runnable {
    * socket is properly closed at this end. When that is the case isResidualReaderThread
    * will return true.
    */
-  @VisibleForTesting
   public boolean hasResidualReaderThread() {
     return hasResidualReaderThread;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
index fe4f08b..359d8aa 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
@@ -32,11 +32,11 @@ import org.junit.Test;
 public class ByteBufferVendorTest {
 
   @FunctionalInterface
-  private interface Foo {
+  private static interface Foo {
     void run() throws IOException;
   }
 
-  private ByteBufferVendor sharingVendor;
+  private ByteBufferVendor sharing;
   private BufferPool poolMock;
   private CountDownLatch clientHasOpenedResource;
   private CountDownLatch clientMayComplete;
@@ -44,7 +44,7 @@ public class ByteBufferVendorTest {
   @Before
   public void before() {
     poolMock = mock(BufferPool.class);
-    sharingVendor =
+    sharing =
         new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     clientHasOpenedResource = new CountDownLatch(1);
@@ -54,7 +54,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
-      try (final ByteBufferSharing _unused = sharingVendor.open()) {
+      try (final ByteBufferSharing _unused = sharing.open()) {
       }
     });
   }
@@ -62,7 +62,7 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharingVendor.open();
+      final ByteBufferSharing sharing2 = sharing.open();
       sharing2.close();
       verify(poolMock, times(0)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
@@ -73,7 +73,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
     clientIsLastReferenceHolder("client with balanced close calls", () -> {
-      try (final ByteBufferSharing _unused = sharingVendor.open()) {
+      try (final ByteBufferSharing _unused = sharing.open()) {
         clientHasOpenedResource.countDown();
         blockClient();
       }
@@ -83,42 +83,36 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
     clientIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharingVendor.open();
+      final ByteBufferSharing sharing2 = sharing.open();
       clientHasOpenedResource.countDown();
       blockClient();
       sharing2.close();
       verify(poolMock, times(1)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+      System.out.println("here");
     });
   }
 
   @Test
   public void extraCloseDoesNotPrematurelyReturnBufferToPool() throws IOException {
-    final ByteBufferSharing sharing2 = sharingVendor.open();
+    final ByteBufferSharing sharing2 = sharing.open();
     sharing2.close();
     assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
     verify(poolMock, times(0)).releaseBuffer(any(), any());
-    sharingVendor.destruct();
+    sharing.destruct();
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
 
   @Test
   public void extraCloseDoesNotDecrementRefCount() throws IOException {
-    final ByteBufferSharing sharing2 = sharingVendor.open();
+    final ByteBufferSharing sharing2 = sharing.open();
     sharing2.close();
     assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
-    final ByteBufferSharing sharing3 = this.sharingVendor.open();
-    sharingVendor.destruct();
+    final ByteBufferSharing sharing3 = this.sharing.open();
+    sharing.destruct();
     verify(poolMock, times(0)).releaseBuffer(any(), any());
   }
 
-  @Test
-  public void destructIsIdempotent() {
-    sharingVendor.destruct();
-    sharingVendor.destruct();
-    verify(poolMock, times(1)).releaseBuffer(any(), any());
-  }
-
   private void resourceOwnerIsLastReferenceHolder(final String name, final Foo client)
       throws InterruptedException {
     /*
@@ -134,7 +128,7 @@ public class ByteBufferVendorTest {
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
-    sharingVendor.destruct();
+    sharing.destruct();
 
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
@@ -153,7 +147,7 @@ public class ByteBufferVendorTest {
 
     clientHasOpenedResource.await();
 
-    sharingVendor.destruct();
+    sharing.destruct();
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index d6b9aa6..62a858c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -90,8 +90,7 @@ public class NioSslEngineTest {
 
   @Test
   public void engineUsesDirectBuffers() throws IOException {
-    try (final ByteBufferSharing outputSharing =
-        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
       assertThat(outputSharing.getBuffer().isDirect()).isTrue();
     }
   }
@@ -191,8 +190,7 @@ public class NioSslEngineTest {
 
   @Test
   public void wrap() throws Exception {
-    try (final ByteBufferSharing outputSharing =
-        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
 
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer appData =
@@ -223,8 +221,7 @@ public class NioSslEngineTest {
 
   @Test
   public void wrapFails() throws IOException {
-    try (final ByteBufferSharing outputSharing =
-        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer appData =
           ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
@@ -247,8 +244,7 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferOverflow() throws Exception {
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       // make the application data too big to fit into the engine's encryption buffer
       final ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -288,8 +284,7 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferUnderflow() throws Exception {
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
       byte[] netBytes = new byte[wrappedData.capacity() / 2];
@@ -314,8 +309,7 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithDecryptionError() throws IOException {
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
@@ -374,10 +368,10 @@ public class NioSslEngineTest {
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
-    assertThatThrownBy(() -> nioSslEngine.getOutputBufferVendorForTestingOnly().open().getBuffer())
+    assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
-    assertThatThrownBy(() -> nioSslEngine.getInputBufferVendorForTestingOnly().open().getBuffer())
+    assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
     nioSslEngine.close(mockChannel);
@@ -407,8 +401,7 @@ public class NioSslEngineTest {
 
     when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
-      try (final ByteBufferSharing outputSharing =
-          nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+      try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
         // give the NioSslEngine something to write on its socket channel, simulating a TLS close
         // message
         outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
@@ -444,8 +437,7 @@ public class NioSslEngineTest {
     ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
     SocketChannel mockChannel = mock(SocketChannel.class);
 
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       // force a compaction by making the decoded buffer appear near to being full
       ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
       unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
@@ -495,7 +487,10 @@ public class NioSslEngineTest {
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
 
-    nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -523,8 +518,7 @@ public class NioSslEngineTest {
       assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
       // The initial available space in the unwrapped buffer should have doubled
       int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
-      try (final ByteBufferSharing inputSharing =
-          nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+      try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
         assertThat(inputSharing.getBuffer().capacity())
             .isEqualTo(2 * initialFreeSpace + preexistingBytes);
       }
@@ -550,7 +544,10 @@ public class NioSslEngineTest {
     // force buffer expansion by making a small decoded buffer appear near to being full
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-    nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index 40f1ed3..c064afb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -103,7 +103,6 @@ public class ConnectionTest {
     TCPConduit tcpConduit = mock(TCPConduit.class);
 
     when(connectionTable.getConduit()).thenReturn(tcpConduit);
-    when(connectionTable.getBufferPool()).thenReturn(mock(BufferPool.class));
     when(distributionConfig.getMemberTimeout()).thenReturn(100);
     when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));