You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/06/17 15:19:12 UTC

[geode] branch support/1.12 updated (70f6955 -> 1a8eb5a)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a change to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from 70f6955  GEODE-9289: Configuration compatibile with pre-1.12.0 versions. (#6592)
     new bba4242  GEODE-9141: (1 of 2) rename ByteBufferSharingImpl to ByteBuferVendor
     new 1a8eb5a  GEODE-9141: (2 of 2) Handle in-buffer concurrency * Connection uses a ByteBufferVendor to mediate access to inputBuffer * Prevent return to pool before socket closer is finished

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...LSocketHostNameVerificationIntegrationTest.java |   6 +-
 .../internal/net/SSLSocketIntegrationTest.java     |   3 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   2 +-
 .../geode/internal/net/ByteBufferSharing.java      |  15 +
 .../geode/internal/net/ByteBufferSharingNoOp.java  |   7 +-
 ...ufferSharingImpl.java => ByteBufferVendor.java} | 144 ++++++---
 .../apache/geode/internal/net/NioSslEngine.java    |  56 ++--
 .../apache/geode/internal/net/SocketCreator.java   |   9 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 334 +++++++++++----------
 .../internal/net/ByteBufferConcurrencyTest.java    |  16 +-
 ...ringImplTest.java => ByteBufferVendorTest.java} |  40 +--
 .../geode/internal/net/NioSslEngineTest.java       |  41 +--
 .../apache/geode/internal/tcp/ConnectionTest.java  |   1 +
 13 files changed, 375 insertions(+), 299 deletions(-)
 rename geode-core/src/main/java/org/apache/geode/internal/net/{ByteBufferSharingImpl.java => ByteBufferVendor.java} (53%)
 rename geode-core/src/test/java/org/apache/geode/internal/net/{ByteBufferSharingImplTest.java => ByteBufferVendorTest.java} (84%)

[geode] 02/02: GEODE-9141: (2 of 2) Handle in-buffer concurrency * Connection uses a ByteBufferVendor to mediate access to inputBuffer * Prevent return to pool before socket closer is finished

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 1a8eb5aec580eb75871060793ea65d62f5f2d959
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Sat Apr 17 09:12:13 2021 -0700

    GEODE-9141: (2 of 2) Handle in-buffer concurrency
    * Connection uses a ByteBufferVendor to mediate access to inputBuffer
    * Prevent return to pool before socket closer is finished
    
    (cherry picked from commit 9d0d4d1d33794d0f6a21c3bcae71e965cbbd7fbd)
    (cherry picked from commit 9e8b3972fcf449eed4d41c254cf3f553e517eaa1)
    (cherry picked from commit c4730deed48bb4513bd04486d4e8c09cdd3bb5a9)
---
 ...LSocketHostNameVerificationIntegrationTest.java |   6 +-
 .../internal/net/SSLSocketIntegrationTest.java     |   3 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   2 +-
 .../geode/internal/net/ByteBufferSharing.java      |  15 +
 .../geode/internal/net/ByteBufferSharingNoOp.java  |   5 +
 .../geode/internal/net/ByteBufferVendor.java       | 144 ++++++---
 .../apache/geode/internal/net/NioSslEngine.java    |  50 ++-
 .../apache/geode/internal/net/SocketCreator.java   |   9 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 334 +++++++++++----------
 .../geode/internal/net/ByteBufferVendorTest.java   |  36 ++-
 .../geode/internal/net/NioSslEngineTest.java       |  41 +--
 .../apache/geode/internal/tcp/ConnectionTest.java  |   1 +
 12 files changed, 361 insertions(+), 285 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index a70f3b1..e86bfea 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -103,6 +103,9 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
   @Before
   public void setUp() throws Exception {
+
+    SocketCreatorFactory.close(); // to clear socket creators made in previous tests
+
     IgnoredException.addIgnoredException("javax.net.ssl.SSLException: Read timed out");
 
     this.localHost = InetAddress.getLoopbackAddress();
@@ -172,7 +175,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
     try {
       this.socketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-          sslEngine, 0, true,
+          sslEngine, 0,
           ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
           new BufferPool(mock(DMStats.class)));
 
@@ -205,7 +208,6 @@ public class SSLSocketHostNameVerificationIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(),
                 sslEngine,
                 timeoutMillis,
-                false,
                 ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
                 new BufferPool(mock(DMStats.class)));
       } catch (Throwable throwable) {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index e7ac191..13e9d5b 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -217,7 +217,7 @@ public class SSLSocketIntegrationTest {
     clientSocket = clientChannel.socket();
     NioSslEngine engine =
         clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, true,
+            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0,
             ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
     clientChannel.configureBlocking(true);
 
@@ -267,7 +267,6 @@ public class SSLSocketIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(), sc.createSSLEngine("localhost", 1234,
                 false),
                 timeoutMillis,
-                false,
                 ByteBuffer.allocate(65535),
                 new BufferPool(mock(DMStats.class)));
 
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index af3bd1e..cd1af3a 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,4 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
 org/apache/geode/cache/query/internal/xml/ElementType$1
 org/apache/geode/cache/query/internal/xml/ElementType$2
 org/apache/geode/cache/query/internal/xml/ElementType$3
-org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
+org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
index cdfa897..c8a94ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -42,12 +42,27 @@ public interface ByteBufferSharing extends AutoCloseable {
    *
    * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
    *
+   * This variant is for use when the buffer is being written to.
+   *
    * @return the same buffer or a different (bigger) buffer
    * @throws IOException if the buffer is no longer accessible
    */
   ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
 
   /**
+   * Expand the buffer if needed. This may return a different object so be sure to pay attention to
+   * the return value if you need access to the potentially- expanded buffer.
+   *
+   * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
+   *
+   * This variant is for use when the buffer is being read from.
+   *
+   * @return the same buffer or a different (bigger) buffer
+   * @throws IOException if the buffer is no longer accessible
+   */
+  ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException;
+
+  /**
    * Override {@link AutoCloseable#close()} without throws clause since we don't need one.
    */
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
index 4a8bc49..4f36e5b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -48,5 +48,10 @@ class ByteBufferSharingNoOp implements ByteBufferSharing {
   }
 
   @Override
+  public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
+    throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
+  }
+
+  @Override
   public void close() {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
index 4933247..1dc74f0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.net;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,49 +28,49 @@ import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
 
 /**
- * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
- * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
+ * Produces (via {@link #open()}) an {@link ByteBufferSharing} meant to used only within a
+ * try-with-resources block. The resource controls access to a secondary resource
+ * (via {@link ByteBufferSharing#getBuffer()}) within the scope of try-with-resources.
+ * Neither the object returned by {@link #open()}, nor the object returned by invoking
+ * {@link ByteBufferSharing#getBuffer()} on that object may be used outside the scope of
  * try-with-resources.
  */
-class ByteBufferVendor implements ByteBufferSharing {
+public class ByteBufferVendor {
 
   static class OpenAttemptTimedOut extends Exception {
   }
 
-  private final Lock lock;
-  private final AtomicBoolean isDestructed;
-  // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
-  private volatile ByteBuffer buffer;
-  private final BufferType bufferType;
-  private final AtomicInteger counter;
-  private final BufferPool bufferPool;
+  private interface ByteBufferSharingInternal extends ByteBufferSharing {
+    void releaseBuffer();
+  }
 
-  /**
-   * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
+  private final Lock lock = new ReentrantLock();
+  private final AtomicBoolean isDestructed = new AtomicBoolean(false);
+  private final AtomicInteger counter = new AtomicInteger(1);
+  // the object referenced by sharing is guarded by lock
+  private final ByteBufferSharingInternal sharing;
+
+  /*
+   * These constructors are for use only by the owner of the shared resource.
    *
    * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
    * to an external object or is returned to an external caller.)
    *
-   * This constructor acquires no lock. The reference count will be 1 after this constructor
+   * Constructors acquire no locks. The reference count will be 1 after a constructor
    * completes.
    */
-  ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType,
-                   final BufferPool bufferPool) {
-    this.buffer = buffer;
-    this.bufferType = bufferType;
-    this.bufferPool = bufferPool;
-    lock = new ReentrantLock();
-    counter = new AtomicInteger(1);
-    isDestructed = new AtomicBoolean(false);
-  }
 
   /**
-   * The destructor. Called by the resource owner to undo the work of the constructor.
+   * When you have a ByteBuffer available before construction, use this constructor.
+   *
+   * @param bufferArg is the ByteBuffer
+   * @param bufferType needed for freeing the buffer later
+   * @param bufferPool needed for freeing the buffer later
    */
-  void destruct() {
-    if (isDestructed.compareAndSet(false, true)) {
-      dropReference();
-    }
+  public ByteBufferVendor(final ByteBuffer bufferArg,
+      final BufferType bufferType,
+      final BufferPool bufferPool) {
+    sharing = new ByteBufferSharingInternalImpl(bufferArg, bufferType, bufferPool);
   }
 
   /**
@@ -78,18 +79,19 @@ class ByteBufferVendor implements ByteBufferSharing {
    *
    * Resource owners call this method as the last thing before returning a reference to the caller.
    * That caller binds that reference to a variable in a try-with-resources statement and relies on
-   * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
+   * the AutoCloseable protocol to invoke {@link AutoCloseable#close()} on the object at
+   * the end of the block.
    */
-  ByteBufferSharing open() throws IOException {
+  public ByteBufferSharing open() throws IOException {
     lock.lock();
     addReferenceAfterLock();
-    return this;
+    return sharing;
   }
 
   /**
    * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
    */
-  ByteBufferSharing open(final long time, final TimeUnit unit)
+  public ByteBufferSharing open(final long time, final TimeUnit unit)
       throws OpenAttemptTimedOut, IOException {
     try {
       if (!lock.tryLock(time, unit)) {
@@ -100,24 +102,25 @@ class ByteBufferVendor implements ByteBufferSharing {
       throw new OpenAttemptTimedOut();
     }
     addReferenceAfterLock();
-    return this;
+    return sharing;
   }
 
-  @Override
-  public ByteBuffer getBuffer() throws IOException {
-    if (isDestructed.get()) {
-      throwClosed();
+  /**
+   * The destructor. Called by the resource owner to undo the work of the constructor.
+   */
+  public void destruct() {
+    if (isDestructed.compareAndSet(false, true)) {
+      dropReference();
     }
-    return buffer;
   }
 
-  @Override
-  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
-    return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+  private void exposingResource() throws IOException {
+    if (isDestructed.get()) {
+      throwClosed();
+    }
   }
 
-  @Override
-  public void close() {
+  private void close() {
     /*
      * We are counting on our ReentrantLock throwing an exception if the current thread
      * does not hold the lock. In that case dropReference() will not be called. This
@@ -142,16 +145,11 @@ class ByteBufferVendor implements ByteBufferSharing {
   private int dropReference() {
     final int usages = counter.decrementAndGet();
     if (usages == 0) {
-      bufferPool.releaseBuffer(bufferType, buffer);
+      sharing.releaseBuffer();
     }
     return usages;
   }
 
-  @VisibleForTesting
-  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
-    buffer = newBufferForTesting;
-  }
-
   private void addReferenceAfterLock() throws IOException {
     try {
       addReference();
@@ -165,4 +163,54 @@ class ByteBufferVendor implements ByteBufferSharing {
     throw new IOException("NioSslEngine has been closed");
   }
 
+  private class ByteBufferSharingInternalImpl implements ByteBufferSharingInternal {
+
+    /*
+     * mutable because in general our ByteBuffer may need to be resized (grown or compacted)
+     * no concurrency concerns since ByteBufferSharingNotNull is guarded by ByteBufferVendor.lock
+     */
+    private ByteBuffer buffer;
+    private final BufferType bufferType;
+    private final BufferPool bufferPool;
+
+    public ByteBufferSharingInternalImpl(final ByteBuffer buffer,
+        final BufferType bufferType,
+        final BufferPool bufferPool) {
+      Objects.requireNonNull(buffer);
+      this.buffer = buffer;
+      this.bufferType = bufferType;
+      this.bufferPool = bufferPool;
+    }
+
+    @Override
+    public ByteBuffer getBuffer() throws IOException {
+      exposingResource();
+      return buffer;
+    }
+
+    @Override
+    public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+      return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+    }
+
+    @Override
+    public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
+      return buffer = bufferPool.expandReadBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+    }
+
+    @Override
+    public void close() {
+      ByteBufferVendor.this.close();
+    }
+
+    @Override
+    public void releaseBuffer() {
+      bufferPool.releaseBuffer(bufferType, buffer);
+    }
+  }
+
+  @VisibleForTesting
+  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+    ((ByteBufferSharingInternalImpl) sharing).buffer = newBufferForTesting;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index fc91a31..4c603a0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter {
   /**
    * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  private final ByteBufferVendor outputSharing;
+  private final ByteBufferVendor outputBufferVendor;
 
   /**
    * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  private final ByteBufferVendor inputSharing;
+  private final ByteBufferVendor inputBufferVendor;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
@@ -76,10 +76,10 @@ public class NioSslEngine implements NioFilter {
     closed = false;
     this.engine = engine;
     this.bufferPool = bufferPool;
-    outputSharing =
+    outputBufferVendor =
         new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
             TRACKED_SENDER, bufferPool);
-    inputSharing =
+    inputBufferVendor =
         new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
             TRACKED_RECEIVER, bufferPool);
   }
@@ -98,7 +98,7 @@ public class NioSslEngine implements NioFilter {
           peerNetData.capacity(), engine.getSession().getPacketBufferSize()));
     }
 
-    ByteBuffer handshakeBuffer = peerNetData;
+    final ByteBuffer handshakeBuffer = peerNetData;
     handshakeBuffer.clear();
 
     ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]);
@@ -135,7 +135,7 @@ public class NioSslEngine implements NioFilter {
 
       switch (status) {
         case NEED_UNWRAP:
-          try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+          try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
             final ByteBuffer peerAppData = inputSharing.getBuffer();
 
             // Receive handshaking data from peer
@@ -162,7 +162,7 @@ public class NioSslEngine implements NioFilter {
           }
 
         case NEED_WRAP:
-          try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+          try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
             final ByteBuffer myNetData = outputSharing.getBuffer();
 
             // Empty the local network packet buffer.
@@ -231,7 +231,7 @@ public class NioSslEngine implements NioFilter {
 
   @Override
   public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
-    try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
 
       ByteBuffer myNetData = outputSharing.getBuffer();
 
@@ -260,13 +260,13 @@ public class NioSslEngine implements NioFilter {
 
       myNetData.flip();
 
-      return shareOutputBuffer();
+      return outputBufferVendor.open();
     }
   }
 
   @Override
   public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
-    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -292,7 +292,7 @@ public class NioSslEngine implements NioFilter {
             // partial data - need to read more. When this happens the SSLEngine will not have
             // changed the buffer position
             wrappedBuffer.compact();
-            return shareInputBuffer();
+            return inputBufferVendor.open();
           case OK:
             break;
           default:// if there is data in the decrypted buffer return it. Otherwise signal that we're
@@ -305,7 +305,7 @@ public class NioSslEngine implements NioFilter {
         }
       }
       wrappedBuffer.clear();
-      return shareInputBuffer();
+      return inputBufferVendor.open();
     }
   }
 
@@ -325,7 +325,7 @@ public class NioSslEngine implements NioFilter {
   @Override
   public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
       ByteBuffer wrappedBuffer) throws IOException {
-    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -355,13 +355,13 @@ public class NioSslEngine implements NioFilter {
           }
         }
       }
-      return shareInputBuffer();
+      return inputBufferVendor.open();
     }
   }
 
   @Override
   public ByteBufferSharing getUnwrappedBuffer() throws IOException {
-    return shareInputBuffer();
+    return inputBufferVendor.open();
   }
 
   @Override
@@ -377,8 +377,8 @@ public class NioSslEngine implements NioFilter {
       return;
     }
     closed = true;
-    inputSharing.destruct();
-    try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
+    inputBufferVendor.destruct();
+    try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) {
       final ByteBuffer myNetData = outputSharing.getBuffer();
 
       if (!engine.isOutboundDone()) {
@@ -412,7 +412,7 @@ public class NioSslEngine implements NioFilter {
         engine.closeOutbound();
       }
     } finally {
-      outputSharing.destruct();
+      outputBufferVendor.destruct();
     }
   }
 
@@ -422,16 +422,12 @@ public class NioSslEngine implements NioFilter {
   }
 
   @VisibleForTesting
-  public ByteBufferSharing shareOutputBuffer() throws IOException {
-    return outputSharing.open();
+  public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException {
+    return outputBufferVendor;
   }
 
-  private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
-      throws OpenAttemptTimedOut, IOException {
-    return outputSharing.open(time, unit);
-  }
-
-  public ByteBufferSharing shareInputBuffer() throws IOException {
-    return inputSharing.open();
+  @VisibleForTesting
+  public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException {
+    return inputBufferVendor;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index a232fca..a31bbb2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -803,21 +803,16 @@ public class SocketCreator extends TcpSocketCreatorImpl {
    * @param socketChannel the socket's NIO channel
    * @param engine the sslEngine (see createSSLEngine)
    * @param timeout handshake timeout in milliseconds. No timeout if <= 0
-   * @param clientSocket set to true if you initiated the connect(), false if you accepted it
    * @param peerNetBuffer the buffer to use in reading data fron socketChannel. This should also be
    *        used in subsequent I/O operations
    * @return The SSLEngine to be used in processing data for sending/receiving from the channel
    */
-  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, SSLEngine engine,
+  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel,
+      SSLEngine engine,
       int timeout,
-      boolean clientSocket,
       ByteBuffer peerNetBuffer,
       BufferPool bufferPool)
       throws IOException {
-    engine.setUseClientMode(clientSocket);
-    if (!clientSocket) {
-      engine.setNeedClientAuth(sslConfig.isRequireAuth());
-    }
     while (!socketChannel.finishConnect()) {
       try {
         Thread.sleep(50);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f5a1886..107aa9f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,6 +18,7 @@ import static java.lang.Boolean.FALSE;
 import static java.lang.ThreadLocal.withInitial;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
 import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
 import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
 
 import java.io.DataInput;
@@ -79,6 +80,7 @@ import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.ByteBufferSharing;
+import org.apache.geode.internal.net.ByteBufferVendor;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -117,7 +119,7 @@ public class Connection implements Runnable {
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
-  static final int SMALL_BUFFER_SIZE =
+  public static final int SMALL_BUFFER_SIZE =
       Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
   /**
@@ -309,11 +311,14 @@ public class Connection implements Runnable {
   /** name of thread that we're currently performing an operation in (may be null) */
   private String ackThreadName;
 
-  /** the buffer used for message receipt */
-  private ByteBuffer inputBuffer;
-
-  /** Lock used to protect the input buffer */
-  public final Object inputBufferLock = new Object();
+  /*
+   * This object mediates access to the input ByteBuffer and ensures its return to
+   * pool after last use. This reference couldn't be final since it is initialized
+   * in createIoFilter() not in the constructors. It had to be initialized there
+   * because in general we have to construct an SSLEngine before we know the buffer
+   * size and createIoFilter() is where we create that object.
+   */
+  private ByteBufferVendor inputBufferVendor;
 
   /** the length of the next message to be dispatched */
   private int messageLength;
@@ -889,27 +894,28 @@ public class Connection implements Runnable {
     waitForAddressCompletion();
 
     InternalDistributedMember myAddr = owner.getConduit().getMemberId();
-    final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
-    /*
-     * Note a byte of zero is always written because old products serialized a member id with always
-     * sends an ip address. My reading of the ip-address specs indicated that the first byte of a
-     * valid address would never be 0.
-     */
-    connectHandshake.writeByte(0);
-    connectHandshake.writeByte(HANDSHAKE_VERSION);
-    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
-    InternalDataSerializer.invokeToData(myAddr, connectHandshake);
-    connectHandshake.writeBoolean(sharedResource);
-    connectHandshake.writeBoolean(preserveOrder);
-    connectHandshake.writeLong(uniqueId);
-    // write the product version ordinal
-    Version.CURRENT.writeOrdinal(connectHandshake, true);
-    connectHandshake.writeInt(dominoCount.get() + 1);
-    // this writes the sending member + thread name that is stored in senderName
-    // on the receiver to show the cause of reader thread creation
-    connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
-        MsgIdGenerator.NO_MSG_ID);
-    writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
+    try (final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE)) {
+      /*
+       * Note a byte of zero is always written because old products serialized a member id with
+       * always sends an ip address. My reading of the ip-address specs indicated that the first
+       * byte of a valid address would never be 0.
+       */
+      connectHandshake.writeByte(0);
+      connectHandshake.writeByte(HANDSHAKE_VERSION);
+      // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
+      InternalDataSerializer.invokeToData(myAddr, connectHandshake);
+      connectHandshake.writeBoolean(sharedResource);
+      connectHandshake.writeBoolean(preserveOrder);
+      connectHandshake.writeLong(uniqueId);
+      // write the product version ordinal
+      Version.CURRENT.writeOrdinal(connectHandshake, true);
+      connectHandshake.writeInt(dominoCount.get() + 1);
+      // this writes the sending member + thread name that is stored in senderName
+      // on the receiver to show the cause of reader thread creation
+      connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
+          MsgIdGenerator.NO_MSG_ID);
+      writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
+    }
   }
 
   /**
@@ -1353,7 +1359,7 @@ public class Connection implements Runnable {
         if (!isReceiver && !hasResidualReaderThread()) {
           // receivers release the input buffer when exiting run(). Senders use the
           // inputBuffer for reading direct-reply responses
-          releaseInputBuffer();
+          inputBufferVendor.destruct();
         }
         lengthSet = false;
       }
@@ -1494,7 +1500,7 @@ public class Connection implements Runnable {
         }
       }
 
-      releaseInputBuffer();
+      inputBufferVendor.destruct();
 
       // make sure that if the reader thread exits we notify a thread waiting for the handshake.
       notifyHandshakeWaiter(false);
@@ -1506,16 +1512,6 @@ public class Connection implements Runnable {
     }
   }
 
-  private void releaseInputBuffer() {
-    synchronized (inputBufferLock) {
-      ByteBuffer tmp = inputBuffer;
-      if (tmp != null) {
-        inputBuffer = null;
-        getBufferPool().releaseReceiveBuffer(tmp);
-      }
-    }
-  }
-
   BufferPool getBufferPool() {
     return owner.getBufferPool();
   }
@@ -1537,9 +1533,6 @@ public class Connection implements Runnable {
   }
 
   private void readMessages() {
-    if (closing.get()) {
-      return;
-    }
     // take a snapshot of uniqueId to detect reconnect attempts
     SocketChannel channel;
     try {
@@ -1585,8 +1578,6 @@ public class Connection implements Runnable {
     // we should not change the state of the connection if we are a handshake reader thread
     // as there is a race between this thread and the application thread doing direct ack
     boolean handshakeHasBeenRead = false;
-    // if we're using SSL/TLS the input buffer may already have data to process
-    boolean skipInitialRead = getInputBuffer().position() > 0;
     try {
       for (boolean isInitialRead = true;;) {
         if (stopped) {
@@ -1606,8 +1597,9 @@ public class Connection implements Runnable {
           break;
         }
 
-        try {
-          ByteBuffer buff = getInputBuffer();
+        try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+          ByteBuffer buff = inputSharing.getBuffer();
+
           synchronized (stateLock) {
             connectionState = STATE_READING;
           }
@@ -1616,6 +1608,8 @@ public class Connection implements Runnable {
             amountRead = channel.read(buff);
           } else {
             isInitialRead = false;
+            // if we're using SSL/TLS the input buffer may already have data to process
+            final boolean skipInitialRead = buff.position() > 0;
             if (!skipInitialRead) {
               amountRead = channel.read(buff);
             } else {
@@ -1680,11 +1674,11 @@ public class Connection implements Runnable {
         } catch (IOException e) {
           // "Socket closed" check needed for Solaris jdk 1.4.2_08
           if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
-            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
-              logger.debug("{} io exception for {}", p2pReaderName(), this, e);
+            if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
+              logger.info("{} io exception for {}", p2pReaderName(), this, e);
             }
-            if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
-              if (logger.isDebugEnabled()) {
+            if (logger.isDebugEnabled()) {
+              if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
                 logger.debug(
                     "{} received unexpected WSACancelBlockingCall exception, which may result in a hang",
                     p2pReaderName());
@@ -1728,28 +1722,55 @@ public class Connection implements Runnable {
   private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException {
     if (getConduit().useSSL() && channel != null) {
       InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
+      String hostName;
+      if (remoteAddr != null) {
+        hostName = remoteAddr.getHostName();
+      } else {
+        hostName = SocketCreator.getHostName(address.getAddress());
+      }
       SSLEngine engine =
-          getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort(),
-              clientSocket);
+          getConduit().getSocketCreator().createSSLEngine(hostName,
+              address.getPort(), clientSocket);
+
+      final int packetBufferSize = engine.getSession().getPacketBufferSize();
+
+      inputBufferVendor =
+          new ByteBufferVendor(
+              getBufferPool().acquireDirectReceiveBuffer(packetBufferSize),
+              TRACKED_RECEIVER,
+              getBufferPool());
 
-      int packetBufferSize = engine.getSession().getPacketBufferSize();
-      if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) {
-        // TLS has a minimum input buffer size constraint
-        if (inputBuffer != null) {
-          getBufferPool().releaseReceiveBuffer(inputBuffer);
-        }
-        inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize);
-      }
       if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
         channel.socket().setReceiveBufferSize(packetBufferSize);
       }
       if (channel.socket().getSendBufferSize() < packetBufferSize) {
         channel.socket().setSendBufferSize(packetBufferSize);
       }
-      ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
-          getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
-          getBufferPool());
+      try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+        final ByteBuffer inputBuffer = inputSharing.getBuffer();
+        /*
+         * It's ok to share the inputBuffer with handshakeSSLSocketChannel() since that method
+         * accesses the referenced buffer for the handshake which completes before returning
+         * control here. The NioSslEngine retains no reference to the buffer.
+         */
+        ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
+            getConduit().idleConnectionTimeout, inputBuffer,
+            getBufferPool());
+      }
     } else {
+      final int allocSize;
+      if (recvBufferSize == -1) {
+        allocSize = owner.getConduit().tcpBufferSize;
+      } else {
+        allocSize = recvBufferSize;
+      }
+
+      inputBufferVendor =
+          new ByteBufferVendor(
+              getBufferPool().acquireDirectReceiveBuffer(allocSize),
+              TRACKED_RECEIVER,
+              getBufferPool());
+
       ioFilter = new NioPlainEngine(getBufferPool());
     }
   }
@@ -1781,9 +1802,13 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return msg.contains("forcibly closed")
-        || msg.contains("reset by peer")
-        || msg.contains("connection reset");
+
+    if (e instanceof SSLException && msg.contains("status = closed")) {
+      return true; // engine has been closed - this is normal
+    }
+
+    return (msg.contains("forcibly closed") || msg.contains("reset by peer")
+        || msg.contains("connection reset") || msg.contains("socket is closed"));
   }
 
   private static boolean validMsgType(int msgType) {
@@ -2627,20 +2652,6 @@ public class Connection implements Runnable {
   }
 
   /**
-   * gets the buffer for receiving message length bytes
-   */
-  private ByteBuffer getInputBuffer() {
-    if (inputBuffer == null) {
-      int allocSize = recvBufferSize;
-      if (allocSize == -1) {
-        allocSize = owner.getConduit().tcpBufferSize;
-      }
-      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
-    }
-    return inputBuffer;
-  }
-
-  /**
    * @throws SocketTimeoutException if wait expires.
    * @throws ConnectionException if ack is not received
    */
@@ -2747,72 +2758,93 @@ public class Connection implements Runnable {
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
-    inputBuffer.flip();
+    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+      // can't be final because in some cases we expand the buffer (resulting in a new object)
+      ByteBuffer inputBuffer = inputSharing.getBuffer();
+      inputBuffer.flip();
 
-    try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
-      final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+      try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
+        final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
 
-      peerDataBuffer.flip();
+        peerDataBuffer.flip();
 
-      boolean done = false;
+        boolean done = false;
 
-      while (!done && connected) {
-        owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-        int remaining = peerDataBuffer.remaining();
-        if (lengthSet || remaining >= MSG_HEADER_BYTES) {
-          if (!lengthSet) {
-            if (readMessageHeader(peerDataBuffer)) {
-              break;
+        while (!done && connected) {
+          owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+          int remaining = peerDataBuffer.remaining();
+          if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+            if (!lengthSet) {
+              if (readMessageHeader(peerDataBuffer)) {
+                break;
+              }
             }
-          }
-          if (remaining >= messageLength + MSG_HEADER_BYTES) {
-            lengthSet = false;
-            peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
-            // don't trust the message deserialization to leave the position in
-            // the correct spot. Some of the serialization uses buffered
-            // streams that can leave the position at the wrong spot
-            int startPos = peerDataBuffer.position();
-            int oldLimit = peerDataBuffer.limit();
-            peerDataBuffer.limit(startPos + messageLength);
-
-            if (handshakeRead) {
-              try {
-                readMessage(peerDataBuffer);
-              } catch (SerializationException e) {
-                logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
-                throw e;
+            if (remaining >= messageLength + MSG_HEADER_BYTES) {
+              lengthSet = false;
+              peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
+              // don't trust the message deserialization to leave the position in
+              // the correct spot. Some of the serialization uses buffered
+              // streams that can leave the position at the wrong spot
+              int startPos = peerDataBuffer.position();
+              int oldLimit = peerDataBuffer.limit();
+              peerDataBuffer.limit(startPos + messageLength);
+
+              if (handshakeRead) {
+                try {
+                  readMessage(peerDataBuffer);
+                } catch (SerializationException e) {
+                  logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
+                  throw e;
+                }
+              } else {
+                try (ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
+                    DataInputStream dis = new DataInputStream(bbis)) {
+                  if (!isReceiver) {
+                    // we read the handshake and then stop processing since we don't want
+                    // to process the input buffer anymore in a handshake thread
+                    readHandshakeForSender(dis, peerDataBuffer);
+                    return;
+                  }
+                  if (readHandshakeForReceiver(dis)) {
+                    ioFilter.doneReading(peerDataBuffer);
+                    return;
+                  }
+                }
               }
-            } else {
-              ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
-              DataInputStream dis = new DataInputStream(bbis);
-              if (!isReceiver) {
-                // we read the handshake and then stop processing since we don't want
-                // to process the input buffer anymore in a handshake thread
-                readHandshakeForSender(dis, peerDataBuffer);
-                return;
+              if (!connected) {
+                continue;
               }
-              if (readHandshakeForReceiver(dis)) {
+              accessed();
+              peerDataBuffer.limit(oldLimit);
+              peerDataBuffer.position(startPos + messageLength);
+            } else {
+              done = true;
+              if (getConduit().useSSL()) {
                 ioFilter.doneReading(peerDataBuffer);
-                return;
+              } else {
+                // compact or resize the buffer
+                final int oldBufferSize = inputBuffer.capacity();
+                final int allocSize = messageLength + MSG_HEADER_BYTES;
+                if (oldBufferSize < allocSize) {
+                  // need a bigger buffer
+                  logger.info(
+                      "Allocating larger network read buffer, new size is {} old size was {}.",
+                      allocSize, oldBufferSize);
+                  inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
+                } else {
+                  if (inputBuffer.position() != 0) {
+                    inputBuffer.compact();
+                  } else {
+                    inputBuffer.position(inputBuffer.limit());
+                    inputBuffer.limit(inputBuffer.capacity());
+                  }
+                }
               }
             }
-            if (!connected) {
-              continue;
-            }
-            accessed();
-            peerDataBuffer.limit(oldLimit);
-            peerDataBuffer.position(startPos + messageLength);
           } else {
+            ioFilter.doneReading(peerDataBuffer);
             done = true;
-            if (getConduit().useSSL()) {
-              ioFilter.doneReading(peerDataBuffer);
-            } else {
-              compactOrResizeBuffer(messageLength);
-            }
           }
-        } else {
-          ioFilter.doneReading(peerDataBuffer);
-          done = true;
         }
       }
     }
@@ -2947,10 +2979,9 @@ public class Connection implements Runnable {
   private void readMessage(ByteBuffer peerDataBuffer) {
     if (messageType == NORMAL_MSG_TYPE) {
       owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
-      ByteBufferInputStream bbis =
+      try (ByteBufferInputStream bbis =
           remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
-              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
-      try {
+              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) {
         ReplyProcessor21.initMessageRPId();
         // add serialization stats
         long startSer = owner.getConduit().getStats().startMsgDeserialization();
@@ -3203,34 +3234,8 @@ public class Connection implements Runnable {
   private void setThreadName(int dominoNumber) {
     Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " "
         + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un")
-        + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
-        + " port=" + socket.getPort());
-  }
-
-  private void compactOrResizeBuffer(int messageLength) {
-    final int oldBufferSize = inputBuffer.capacity();
-    int allocSize = messageLength + MSG_HEADER_BYTES;
-    if (oldBufferSize < allocSize) {
-      // need a bigger buffer
-      logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
-          allocSize, oldBufferSize);
-      ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
-
-      if (oldBuffer != null) {
-        int oldByteCount = oldBuffer.remaining();
-        inputBuffer.put(oldBuffer);
-        inputBuffer.position(oldByteCount);
-        getBufferPool().releaseReceiveBuffer(oldBuffer);
-      }
-    } else {
-      if (inputBuffer.position() != 0) {
-        inputBuffer.compact();
-      } else {
-        inputBuffer.position(inputBuffer.limit());
-        inputBuffer.limit(inputBuffer.capacity());
-      }
-    }
+        + "ordered sender uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
+        + " local port=" + socket.getLocalPort() + " remote port=" + socket.getPort());
   }
 
   private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck)
@@ -3315,6 +3320,7 @@ public class Connection implements Runnable {
    * socket is properly closed at this end. When that is the case isResidualReaderThread
    * will return true.
    */
+  @VisibleForTesting
   public boolean hasResidualReaderThread() {
     return hasResidualReaderThread;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
index 359d8aa..fe4f08b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
@@ -32,11 +32,11 @@ import org.junit.Test;
 public class ByteBufferVendorTest {
 
   @FunctionalInterface
-  private static interface Foo {
+  private interface Foo {
     void run() throws IOException;
   }
 
-  private ByteBufferVendor sharing;
+  private ByteBufferVendor sharingVendor;
   private BufferPool poolMock;
   private CountDownLatch clientHasOpenedResource;
   private CountDownLatch clientMayComplete;
@@ -44,7 +44,7 @@ public class ByteBufferVendorTest {
   @Before
   public void before() {
     poolMock = mock(BufferPool.class);
-    sharing =
+    sharingVendor =
         new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     clientHasOpenedResource = new CountDownLatch(1);
@@ -54,7 +54,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
-      try (final ByteBufferSharing _unused = sharing.open()) {
+      try (final ByteBufferSharing _unused = sharingVendor.open()) {
       }
     });
   }
@@ -62,7 +62,7 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharing.open();
+      final ByteBufferSharing sharing2 = sharingVendor.open();
       sharing2.close();
       verify(poolMock, times(0)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
@@ -73,7 +73,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
     clientIsLastReferenceHolder("client with balanced close calls", () -> {
-      try (final ByteBufferSharing _unused = sharing.open()) {
+      try (final ByteBufferSharing _unused = sharingVendor.open()) {
         clientHasOpenedResource.countDown();
         blockClient();
       }
@@ -83,36 +83,42 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
     clientIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharing.open();
+      final ByteBufferSharing sharing2 = sharingVendor.open();
       clientHasOpenedResource.countDown();
       blockClient();
       sharing2.close();
       verify(poolMock, times(1)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
-      System.out.println("here");
     });
   }
 
   @Test
   public void extraCloseDoesNotPrematurelyReturnBufferToPool() throws IOException {
-    final ByteBufferSharing sharing2 = sharing.open();
+    final ByteBufferSharing sharing2 = sharingVendor.open();
     sharing2.close();
     assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
     verify(poolMock, times(0)).releaseBuffer(any(), any());
-    sharing.destruct();
+    sharingVendor.destruct();
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
 
   @Test
   public void extraCloseDoesNotDecrementRefCount() throws IOException {
-    final ByteBufferSharing sharing2 = sharing.open();
+    final ByteBufferSharing sharing2 = sharingVendor.open();
     sharing2.close();
     assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
-    final ByteBufferSharing sharing3 = this.sharing.open();
-    sharing.destruct();
+    final ByteBufferSharing sharing3 = this.sharingVendor.open();
+    sharingVendor.destruct();
     verify(poolMock, times(0)).releaseBuffer(any(), any());
   }
 
+  @Test
+  public void destructIsIdempotent() {
+    sharingVendor.destruct();
+    sharingVendor.destruct();
+    verify(poolMock, times(1)).releaseBuffer(any(), any());
+  }
+
   private void resourceOwnerIsLastReferenceHolder(final String name, final Foo client)
       throws InterruptedException {
     /*
@@ -128,7 +134,7 @@ public class ByteBufferVendorTest {
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
-    sharing.destruct();
+    sharingVendor.destruct();
 
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
@@ -147,7 +153,7 @@ public class ByteBufferVendorTest {
 
     clientHasOpenedResource.await();
 
-    sharing.destruct();
+    sharingVendor.destruct();
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index 62a858c..d6b9aa6 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -90,7 +90,8 @@ public class NioSslEngineTest {
 
   @Test
   public void engineUsesDirectBuffers() throws IOException {
-    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing =
+        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
       assertThat(outputSharing.getBuffer().isDirect()).isTrue();
     }
   }
@@ -190,7 +191,8 @@ public class NioSslEngineTest {
 
   @Test
   public void wrap() throws Exception {
-    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing =
+        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
 
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer appData =
@@ -221,7 +223,8 @@ public class NioSslEngineTest {
 
   @Test
   public void wrapFails() throws IOException {
-    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+    try (final ByteBufferSharing outputSharing =
+        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer appData =
           ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
@@ -244,7 +247,8 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferOverflow() throws Exception {
-    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption buffer
       final ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -284,7 +288,8 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferUnderflow() throws Exception {
-    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
       byte[] netBytes = new byte[wrappedData.capacity() / 2];
@@ -309,7 +314,8 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithDecryptionError() throws IOException {
-    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
@@ -368,10 +374,10 @@ public class NioSslEngineTest {
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
-    assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
+    assertThatThrownBy(() -> nioSslEngine.getOutputBufferVendorForTestingOnly().open().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
-    assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
+    assertThatThrownBy(() -> nioSslEngine.getInputBufferVendorForTestingOnly().open().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
     nioSslEngine.close(mockChannel);
@@ -401,7 +407,8 @@ public class NioSslEngineTest {
 
     when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
-      try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+      try (final ByteBufferSharing outputSharing =
+          nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
         // give the NioSslEngine something to write on its socket channel, simulating a TLS close
         // message
         outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
@@ -437,7 +444,8 @@ public class NioSslEngineTest {
     ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
     SocketChannel mockChannel = mock(SocketChannel.class);
 
-    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+    try (final ByteBufferSharing inputSharing =
+        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
       // force a compaction by making the decoded buffer appear near to being full
       ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
       unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
@@ -487,10 +495,7 @@ public class NioSslEngineTest {
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
 
-    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
-      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
-      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
-    }
+    nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -518,7 +523,8 @@ public class NioSslEngineTest {
       assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
       // The initial available space in the unwrapped buffer should have doubled
       int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
-      try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      try (final ByteBufferSharing inputSharing =
+          nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
         assertThat(inputSharing.getBuffer().capacity())
             .isEqualTo(2 * initialFreeSpace + preexistingBytes);
       }
@@ -544,10 +550,7 @@ public class NioSslEngineTest {
     // force buffer expansion by making a small decoded buffer appear near to being full
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
-      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
-      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
-    }
+    nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index c064afb..40f1ed3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -103,6 +103,7 @@ public class ConnectionTest {
     TCPConduit tcpConduit = mock(TCPConduit.class);
 
     when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(connectionTable.getBufferPool()).thenReturn(mock(BufferPool.class));
     when(distributionConfig.getMemberTimeout()).thenReturn(100);
     when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));
 

[geode] 01/02: GEODE-9141: (1 of 2) rename ByteBufferSharingImpl to ByteBuferVendor

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit bba424244b8011daf4e7d1285334ad91ce4497fe
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Sat Apr 17 13:15:53 2021 -0700

    GEODE-9141: (1 of 2) rename ByteBufferSharingImpl to ByteBuferVendor
    
    (cherry picked from commit 38a3540583a1d0a402b026ee0d33ae4b0a2907d3)
    (cherry picked from commit e0fa01dd9ec9c61504d517e77d1620f8e7975b73)
---
 .../apache/geode/internal/net/ByteBufferSharingNoOp.java |  2 +-
 ...{ByteBufferSharingImpl.java => ByteBufferVendor.java} |  6 +++---
 .../java/org/apache/geode/internal/net/NioSslEngine.java | 10 +++++-----
 .../geode/internal/net/ByteBufferConcurrencyTest.java    | 16 ++++++++--------
 ...ferSharingImplTest.java => ByteBufferVendorTest.java} |  6 +++---
 .../org/apache/geode/internal/net/NioSslEngineTest.java  |  4 ++--
 6 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
index bd707e3..4a8bc49 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
  * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so,
  * needs no reference counting on buffers, nor any synchronization around access to buffers.
  *
- * See also {@link ByteBufferSharingImpl}
+ * See also {@link ByteBufferVendor}
  */
 class ByteBufferSharingNoOp implements ByteBufferSharing {
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
similarity index 96%
rename from geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
index b083d09..4933247 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
@@ -31,7 +31,7 @@ import org.apache.geode.internal.net.BufferPool.BufferType;
  * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
  * try-with-resources.
  */
-class ByteBufferSharingImpl implements ByteBufferSharing {
+class ByteBufferVendor implements ByteBufferSharing {
 
   static class OpenAttemptTimedOut extends Exception {
   }
@@ -53,8 +53,8 @@ class ByteBufferSharingImpl implements ByteBufferSharing {
    * This constructor acquires no lock. The reference count will be 1 after this constructor
    * completes.
    */
-  ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType,
-      final BufferPool bufferPool) {
+  ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType,
+                   final BufferPool bufferPool) {
     this.buffer = buffer;
     this.bufferType = bufferType;
     this.bufferPool = bufferPool;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 8969ecc..fc91a31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -42,7 +42,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.GemFireIOException;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
-import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut;
+import org.apache.geode.internal.net.ByteBufferVendor.OpenAttemptTimedOut;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 
@@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter {
   /**
    * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  private final ByteBufferSharingImpl outputSharing;
+  private final ByteBufferVendor outputSharing;
 
   /**
    * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  private final ByteBufferSharingImpl inputSharing;
+  private final ByteBufferVendor inputSharing;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
@@ -77,10 +77,10 @@ public class NioSslEngine implements NioFilter {
     this.engine = engine;
     this.bufferPool = bufferPool;
     outputSharing =
-        new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
+        new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
             TRACKED_SENDER, bufferPool);
     inputSharing =
-        new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
+        new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
             TRACKED_RECEIVER, bufferPool);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java
index 7c1f383..bf95963 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java
@@ -47,8 +47,8 @@ public class ByteBufferConcurrencyTest {
       throws Exception {
     poolMock = mock(BufferPool.class);
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferSharingImpl sharing =
-        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferVendor sharing =
+        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     executor.inParallel(() -> {
       sharing.destruct();
@@ -78,8 +78,8 @@ public class ByteBufferConcurrencyTest {
     }).when(poolMock).releaseBuffer(any(), any());
 
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferSharingImpl sharing =
-        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferVendor sharing =
+        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
 
     executor.inParallel(() -> {
@@ -109,8 +109,8 @@ public class ByteBufferConcurrencyTest {
       throws Exception {
     poolMock = mock(BufferPool.class);
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferSharingImpl sharing =
-        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferVendor sharing =
+        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
 
     final AtomicBoolean inUse = new AtomicBoolean(false);
@@ -134,8 +134,8 @@ public class ByteBufferConcurrencyTest {
       throws Exception {
     poolMock = mock(BufferPool.class);
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferSharingImpl sharing =
-        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferVendor sharing =
+        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
 
     final AtomicBoolean inUse = new AtomicBoolean(false);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
similarity index 97%
rename from geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
index d863076..359d8aa 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
@@ -29,14 +29,14 @@ import java.util.concurrent.CountDownLatch;
 import org.junit.Before;
 import org.junit.Test;
 
-public class ByteBufferSharingImplTest {
+public class ByteBufferVendorTest {
 
   @FunctionalInterface
   private static interface Foo {
     void run() throws IOException;
   }
 
-  private ByteBufferSharingImpl sharing;
+  private ByteBufferVendor sharing;
   private BufferPool poolMock;
   private CountDownLatch clientHasOpenedResource;
   private CountDownLatch clientMayComplete;
@@ -45,7 +45,7 @@ public class ByteBufferSharingImplTest {
   public void before() {
     poolMock = mock(BufferPool.class);
     sharing =
-        new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
+        new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     clientHasOpenedResource = new CountDownLatch(1);
     clientMayComplete = new CountDownLatch(1);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index dfd7b90..62a858c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -488,7 +488,7 @@ public class NioSslEngineTest {
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
 
     try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
-      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
       inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
     }
 
@@ -545,7 +545,7 @@ public class NioSslEngineTest {
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
     try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
-      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
       inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
     }