You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2021/06/17 15:41:18 UTC

[geode] branch support/1.12 updated (1a8eb5a -> f25ec03)

This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a change to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from 1a8eb5a  GEODE-9141: (2 of 2) Handle in-buffer concurrency * Connection uses a ByteBufferVendor to mediate access to inputBuffer * Prevent return to pool before socket closer is finished
     new c13a621  Revert "GEODE-9141: (2 of 2) Handle in-buffer concurrency"
     new f25ec03  Revert "GEODE-9141: (1 of 2) rename ByteBufferSharingImpl to ByteBuferVendor"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...LSocketHostNameVerificationIntegrationTest.java |   6 +-
 .../internal/net/SSLSocketIntegrationTest.java     |   3 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   2 +-
 .../geode/internal/net/ByteBufferSharing.java      |  15 -
 ...ufferVendor.java => ByteBufferSharingImpl.java} | 144 +++------
 .../geode/internal/net/ByteBufferSharingNoOp.java  |   7 +-
 .../apache/geode/internal/net/NioSslEngine.java    |  56 ++--
 .../apache/geode/internal/net/SocketCreator.java   |   9 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 334 ++++++++++-----------
 .../internal/net/ByteBufferConcurrencyTest.java    |  16 +-
 ...dorTest.java => ByteBufferSharingImplTest.java} |  40 ++-
 .../geode/internal/net/NioSslEngineTest.java       |  41 ++-
 .../apache/geode/internal/tcp/ConnectionTest.java  |   1 -
 13 files changed, 299 insertions(+), 375 deletions(-)
 rename geode-core/src/main/java/org/apache/geode/internal/net/{ByteBufferVendor.java => ByteBufferSharingImpl.java} (53%)
 rename geode-core/src/test/java/org/apache/geode/internal/net/{ByteBufferVendorTest.java => ByteBufferSharingImplTest.java} (84%)

[geode] 01/02: Revert "GEODE-9141: (2 of 2) Handle in-buffer concurrency"

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c13a62199a3720c69c142b0dcd2f20130ac3b1a4
Author: Ernest Burghardt <eb...@pivotal.io>
AuthorDate: Thu Jun 17 10:39:47 2021 -0500

    Revert "GEODE-9141: (2 of 2) Handle in-buffer concurrency"
    
    This reverts commit 1a8eb5aec580eb75871060793ea65d62f5f2d959.
---
 ...LSocketHostNameVerificationIntegrationTest.java |   6 +-
 .../internal/net/SSLSocketIntegrationTest.java     |   3 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   2 +-
 .../geode/internal/net/ByteBufferSharing.java      |  15 -
 .../geode/internal/net/ByteBufferSharingNoOp.java  |   5 -
 .../geode/internal/net/ByteBufferVendor.java       | 144 +++------
 .../apache/geode/internal/net/NioSslEngine.java    |  50 +--
 .../apache/geode/internal/net/SocketCreator.java   |   9 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 334 ++++++++++-----------
 .../geode/internal/net/ByteBufferVendorTest.java   |  36 +--
 .../geode/internal/net/NioSslEngineTest.java       |  41 ++-
 .../apache/geode/internal/tcp/ConnectionTest.java  |   1 -
 12 files changed, 285 insertions(+), 361 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index e86bfea..a70f3b1 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -103,9 +103,6 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
   @Before
   public void setUp() throws Exception {
-
-    SocketCreatorFactory.close(); // to clear socket creators made in previous tests
-
     IgnoredException.addIgnoredException("javax.net.ssl.SSLException: Read timed out");
 
     this.localHost = InetAddress.getLoopbackAddress();
@@ -175,7 +172,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
 
     try {
       this.socketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-          sslEngine, 0,
+          sslEngine, 0, true,
           ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
           new BufferPool(mock(DMStats.class)));
 
@@ -208,6 +205,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(),
                 sslEngine,
                 timeoutMillis,
+                false,
                 ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
                 new BufferPool(mock(DMStats.class)));
       } catch (Throwable throwable) {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 13e9d5b..e7ac191 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -217,7 +217,7 @@ public class SSLSocketIntegrationTest {
     clientSocket = clientChannel.socket();
     NioSslEngine engine =
         clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
-            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0,
+            clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, true,
             ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
     clientChannel.configureBlocking(true);
 
@@ -267,6 +267,7 @@ public class SSLSocketIntegrationTest {
             sc.handshakeSSLSocketChannel(socket.getChannel(), sc.createSSLEngine("localhost", 1234,
                 false),
                 timeoutMillis,
+                false,
                 ByteBuffer.allocate(65535),
                 new BufferPool(mock(DMStats.class)));
 
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index cd1af3a..af3bd1e 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,4 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
 org/apache/geode/cache/query/internal/xml/ElementType$1
 org/apache/geode/cache/query/internal/xml/ElementType$2
 org/apache/geode/cache/query/internal/xml/ElementType$3
-org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
\ No newline at end of file
+org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
index c8a94ce..cdfa897 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -42,27 +42,12 @@ public interface ByteBufferSharing extends AutoCloseable {
    *
    * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
    *
-   * This variant is for use when the buffer is being written to.
-   *
    * @return the same buffer or a different (bigger) buffer
    * @throws IOException if the buffer is no longer accessible
    */
   ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
 
   /**
-   * Expand the buffer if needed. This may return a different object so be sure to pay attention to
-   * the return value if you need access to the potentially- expanded buffer.
-   *
-   * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
-   *
-   * This variant is for use when the buffer is being read from.
-   *
-   * @return the same buffer or a different (bigger) buffer
-   * @throws IOException if the buffer is no longer accessible
-   */
-  ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException;
-
-  /**
    * Override {@link AutoCloseable#close()} without throws clause since we don't need one.
    */
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
index 4f36e5b..4a8bc49 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -48,10 +48,5 @@ class ByteBufferSharingNoOp implements ByteBufferSharing {
   }
 
   @Override
-  public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
-    throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
-  }
-
-  @Override
   public void close() {}
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
index 1dc74f0..4933247 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.net;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,49 +27,49 @@ import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
 
 /**
- * Produces (via {@link #open()}) an {@link ByteBufferSharing} meant to used only within a
- * try-with-resources block. The resource controls access to a secondary resource
- * (via {@link ByteBufferSharing#getBuffer()}) within the scope of try-with-resources.
- * Neither the object returned by {@link #open()}, nor the object returned by invoking
- * {@link ByteBufferSharing#getBuffer()} on that object may be used outside the scope of
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
  * try-with-resources.
  */
-public class ByteBufferVendor {
+class ByteBufferVendor implements ByteBufferSharing {
 
   static class OpenAttemptTimedOut extends Exception {
   }
 
-  private interface ByteBufferSharingInternal extends ByteBufferSharing {
-    void releaseBuffer();
-  }
-
-  private final Lock lock = new ReentrantLock();
-  private final AtomicBoolean isDestructed = new AtomicBoolean(false);
-  private final AtomicInteger counter = new AtomicInteger(1);
-  // the object referenced by sharing is guarded by lock
-  private final ByteBufferSharingInternal sharing;
+  private final Lock lock;
+  private final AtomicBoolean isDestructed;
+  // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
+  private volatile ByteBuffer buffer;
+  private final BufferType bufferType;
+  private final AtomicInteger counter;
+  private final BufferPool bufferPool;
 
-  /*
-   * These constructors are for use only by the owner of the shared resource.
+  /**
+   * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
    *
    * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
    * to an external object or is returned to an external caller.)
    *
-   * Constructors acquire no locks. The reference count will be 1 after a constructor
+   * This constructor acquires no lock. The reference count will be 1 after this constructor
    * completes.
    */
+  ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType,
+                   final BufferPool bufferPool) {
+    this.buffer = buffer;
+    this.bufferType = bufferType;
+    this.bufferPool = bufferPool;
+    lock = new ReentrantLock();
+    counter = new AtomicInteger(1);
+    isDestructed = new AtomicBoolean(false);
+  }
 
   /**
-   * When you have a ByteBuffer available before construction, use this constructor.
-   *
-   * @param bufferArg is the ByteBuffer
-   * @param bufferType needed for freeing the buffer later
-   * @param bufferPool needed for freeing the buffer later
+   * The destructor. Called by the resource owner to undo the work of the constructor.
    */
-  public ByteBufferVendor(final ByteBuffer bufferArg,
-      final BufferType bufferType,
-      final BufferPool bufferPool) {
-    sharing = new ByteBufferSharingInternalImpl(bufferArg, bufferType, bufferPool);
+  void destruct() {
+    if (isDestructed.compareAndSet(false, true)) {
+      dropReference();
+    }
   }
 
   /**
@@ -79,19 +78,18 @@ public class ByteBufferVendor {
    *
    * Resource owners call this method as the last thing before returning a reference to the caller.
    * That caller binds that reference to a variable in a try-with-resources statement and relies on
-   * the AutoCloseable protocol to invoke {@link AutoCloseable#close()} on the object at
-   * the end of the block.
+   * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
    */
-  public ByteBufferSharing open() throws IOException {
+  ByteBufferSharing open() throws IOException {
     lock.lock();
     addReferenceAfterLock();
-    return sharing;
+    return this;
   }
 
   /**
    * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
    */
-  public ByteBufferSharing open(final long time, final TimeUnit unit)
+  ByteBufferSharing open(final long time, final TimeUnit unit)
       throws OpenAttemptTimedOut, IOException {
     try {
       if (!lock.tryLock(time, unit)) {
@@ -102,25 +100,24 @@ public class ByteBufferVendor {
       throw new OpenAttemptTimedOut();
     }
     addReferenceAfterLock();
-    return sharing;
-  }
-
-  /**
-   * The destructor. Called by the resource owner to undo the work of the constructor.
-   */
-  public void destruct() {
-    if (isDestructed.compareAndSet(false, true)) {
-      dropReference();
-    }
+    return this;
   }
 
-  private void exposingResource() throws IOException {
+  @Override
+  public ByteBuffer getBuffer() throws IOException {
     if (isDestructed.get()) {
       throwClosed();
     }
+    return buffer;
   }
 
-  private void close() {
+  @Override
+  public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+    return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+  }
+
+  @Override
+  public void close() {
     /*
      * We are counting on our ReentrantLock throwing an exception if the current thread
      * does not hold the lock. In that case dropReference() will not be called. This
@@ -145,11 +142,16 @@ public class ByteBufferVendor {
   private int dropReference() {
     final int usages = counter.decrementAndGet();
     if (usages == 0) {
-      sharing.releaseBuffer();
+      bufferPool.releaseBuffer(bufferType, buffer);
     }
     return usages;
   }
 
+  @VisibleForTesting
+  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+    buffer = newBufferForTesting;
+  }
+
   private void addReferenceAfterLock() throws IOException {
     try {
       addReference();
@@ -163,54 +165,4 @@ public class ByteBufferVendor {
     throw new IOException("NioSslEngine has been closed");
   }
 
-  private class ByteBufferSharingInternalImpl implements ByteBufferSharingInternal {
-
-    /*
-     * mutable because in general our ByteBuffer may need to be resized (grown or compacted)
-     * no concurrency concerns since ByteBufferSharingNotNull is guarded by ByteBufferVendor.lock
-     */
-    private ByteBuffer buffer;
-    private final BufferType bufferType;
-    private final BufferPool bufferPool;
-
-    public ByteBufferSharingInternalImpl(final ByteBuffer buffer,
-        final BufferType bufferType,
-        final BufferPool bufferPool) {
-      Objects.requireNonNull(buffer);
-      this.buffer = buffer;
-      this.bufferType = bufferType;
-      this.bufferPool = bufferPool;
-    }
-
-    @Override
-    public ByteBuffer getBuffer() throws IOException {
-      exposingResource();
-      return buffer;
-    }
-
-    @Override
-    public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
-      return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
-    }
-
-    @Override
-    public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
-      return buffer = bufferPool.expandReadBufferIfNeeded(bufferType, getBuffer(), newCapacity);
-    }
-
-    @Override
-    public void close() {
-      ByteBufferVendor.this.close();
-    }
-
-    @Override
-    public void releaseBuffer() {
-      bufferPool.releaseBuffer(bufferType, buffer);
-    }
-  }
-
-  @VisibleForTesting
-  public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
-    ((ByteBufferSharingInternalImpl) sharing).buffer = newBufferForTesting;
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 4c603a0..fc91a31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter {
   /**
    * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  private final ByteBufferVendor outputBufferVendor;
+  private final ByteBufferVendor outputSharing;
 
   /**
    * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  private final ByteBufferVendor inputBufferVendor;
+  private final ByteBufferVendor inputSharing;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
@@ -76,10 +76,10 @@ public class NioSslEngine implements NioFilter {
     closed = false;
     this.engine = engine;
     this.bufferPool = bufferPool;
-    outputBufferVendor =
+    outputSharing =
         new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
             TRACKED_SENDER, bufferPool);
-    inputBufferVendor =
+    inputSharing =
         new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
             TRACKED_RECEIVER, bufferPool);
   }
@@ -98,7 +98,7 @@ public class NioSslEngine implements NioFilter {
           peerNetData.capacity(), engine.getSession().getPacketBufferSize()));
     }
 
-    final ByteBuffer handshakeBuffer = peerNetData;
+    ByteBuffer handshakeBuffer = peerNetData;
     handshakeBuffer.clear();
 
     ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]);
@@ -135,7 +135,7 @@ public class NioSslEngine implements NioFilter {
 
       switch (status) {
         case NEED_UNWRAP:
-          try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+          try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
             final ByteBuffer peerAppData = inputSharing.getBuffer();
 
             // Receive handshaking data from peer
@@ -162,7 +162,7 @@ public class NioSslEngine implements NioFilter {
           }
 
         case NEED_WRAP:
-          try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
+          try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
             final ByteBuffer myNetData = outputSharing.getBuffer();
 
             // Empty the local network packet buffer.
@@ -231,7 +231,7 @@ public class NioSslEngine implements NioFilter {
 
   @Override
   public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
-    try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
 
       ByteBuffer myNetData = outputSharing.getBuffer();
 
@@ -260,13 +260,13 @@ public class NioSslEngine implements NioFilter {
 
       myNetData.flip();
 
-      return outputBufferVendor.open();
+      return shareOutputBuffer();
     }
   }
 
   @Override
   public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
-    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -292,7 +292,7 @@ public class NioSslEngine implements NioFilter {
             // partial data - need to read more. When this happens the SSLEngine will not have
             // changed the buffer position
             wrappedBuffer.compact();
-            return inputBufferVendor.open();
+            return shareInputBuffer();
           case OK:
             break;
           default:// if there is data in the decrypted buffer return it. Otherwise signal that we're
@@ -305,7 +305,7 @@ public class NioSslEngine implements NioFilter {
         }
       }
       wrappedBuffer.clear();
-      return inputBufferVendor.open();
+      return shareInputBuffer();
     }
   }
 
@@ -325,7 +325,7 @@ public class NioSslEngine implements NioFilter {
   @Override
   public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
       ByteBuffer wrappedBuffer) throws IOException {
-    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+    try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
 
       ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -355,13 +355,13 @@ public class NioSslEngine implements NioFilter {
           }
         }
       }
-      return inputBufferVendor.open();
+      return shareInputBuffer();
     }
   }
 
   @Override
   public ByteBufferSharing getUnwrappedBuffer() throws IOException {
-    return inputBufferVendor.open();
+    return shareInputBuffer();
   }
 
   @Override
@@ -377,8 +377,8 @@ public class NioSslEngine implements NioFilter {
       return;
     }
     closed = true;
-    inputBufferVendor.destruct();
-    try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) {
+    inputSharing.destruct();
+    try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
       final ByteBuffer myNetData = outputSharing.getBuffer();
 
       if (!engine.isOutboundDone()) {
@@ -412,7 +412,7 @@ public class NioSslEngine implements NioFilter {
         engine.closeOutbound();
       }
     } finally {
-      outputBufferVendor.destruct();
+      outputSharing.destruct();
     }
   }
 
@@ -422,12 +422,16 @@ public class NioSslEngine implements NioFilter {
   }
 
   @VisibleForTesting
-  public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException {
-    return outputBufferVendor;
+  public ByteBufferSharing shareOutputBuffer() throws IOException {
+    return outputSharing.open();
   }
 
-  @VisibleForTesting
-  public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException {
-    return inputBufferVendor;
+  private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
+      throws OpenAttemptTimedOut, IOException {
+    return outputSharing.open(time, unit);
+  }
+
+  public ByteBufferSharing shareInputBuffer() throws IOException {
+    return inputSharing.open();
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index a31bbb2..a232fca 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -803,16 +803,21 @@ public class SocketCreator extends TcpSocketCreatorImpl {
    * @param socketChannel the socket's NIO channel
    * @param engine the sslEngine (see createSSLEngine)
    * @param timeout handshake timeout in milliseconds. No timeout if <= 0
+   * @param clientSocket set to true if you initiated the connect(), false if you accepted it
    * @param peerNetBuffer the buffer to use in reading data fron socketChannel. This should also be
    *        used in subsequent I/O operations
    * @return The SSLEngine to be used in processing data for sending/receiving from the channel
    */
-  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel,
-      SSLEngine engine,
+  public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, SSLEngine engine,
       int timeout,
+      boolean clientSocket,
       ByteBuffer peerNetBuffer,
       BufferPool bufferPool)
       throws IOException {
+    engine.setUseClientMode(clientSocket);
+    if (!clientSocket) {
+      engine.setNeedClientAuth(sslConfig.isRequireAuth());
+    }
     while (!socketChannel.finishConnect()) {
       try {
         Thread.sleep(50);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 107aa9f..f5a1886 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,7 +18,6 @@ import static java.lang.Boolean.FALSE;
 import static java.lang.ThreadLocal.withInitial;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
 import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
-import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
 import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
 
 import java.io.DataInput;
@@ -80,7 +79,6 @@ import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.ByteBufferSharing;
-import org.apache.geode.internal.net.ByteBufferVendor;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -119,7 +117,7 @@ public class Connection implements Runnable {
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
-  public static final int SMALL_BUFFER_SIZE =
+  static final int SMALL_BUFFER_SIZE =
       Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
   /**
@@ -311,14 +309,11 @@ public class Connection implements Runnable {
   /** name of thread that we're currently performing an operation in (may be null) */
   private String ackThreadName;
 
-  /*
-   * This object mediates access to the input ByteBuffer and ensures its return to
-   * pool after last use. This reference couldn't be final since it is initialized
-   * in createIoFilter() not in the constructors. It had to be initialized there
-   * because in general we have to construct an SSLEngine before we know the buffer
-   * size and createIoFilter() is where we create that object.
-   */
-  private ByteBufferVendor inputBufferVendor;
+  /** the buffer used for message receipt */
+  private ByteBuffer inputBuffer;
+
+  /** Lock used to protect the input buffer */
+  public final Object inputBufferLock = new Object();
 
   /** the length of the next message to be dispatched */
   private int messageLength;
@@ -894,28 +889,27 @@ public class Connection implements Runnable {
     waitForAddressCompletion();
 
     InternalDistributedMember myAddr = owner.getConduit().getMemberId();
-    try (final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE)) {
-      /*
-       * Note a byte of zero is always written because old products serialized a member id with
-       * always sends an ip address. My reading of the ip-address specs indicated that the first
-       * byte of a valid address would never be 0.
-       */
-      connectHandshake.writeByte(0);
-      connectHandshake.writeByte(HANDSHAKE_VERSION);
-      // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
-      InternalDataSerializer.invokeToData(myAddr, connectHandshake);
-      connectHandshake.writeBoolean(sharedResource);
-      connectHandshake.writeBoolean(preserveOrder);
-      connectHandshake.writeLong(uniqueId);
-      // write the product version ordinal
-      Version.CURRENT.writeOrdinal(connectHandshake, true);
-      connectHandshake.writeInt(dominoCount.get() + 1);
-      // this writes the sending member + thread name that is stored in senderName
-      // on the receiver to show the cause of reader thread creation
-      connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
-          MsgIdGenerator.NO_MSG_ID);
-      writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
-    }
+    final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
+    /*
+     * Note a byte of zero is always written because old products serialized a member id with always
+     * sends an ip address. My reading of the ip-address specs indicated that the first byte of a
+     * valid address would never be 0.
+     */
+    connectHandshake.writeByte(0);
+    connectHandshake.writeByte(HANDSHAKE_VERSION);
+    // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
+    InternalDataSerializer.invokeToData(myAddr, connectHandshake);
+    connectHandshake.writeBoolean(sharedResource);
+    connectHandshake.writeBoolean(preserveOrder);
+    connectHandshake.writeLong(uniqueId);
+    // write the product version ordinal
+    Version.CURRENT.writeOrdinal(connectHandshake, true);
+    connectHandshake.writeInt(dominoCount.get() + 1);
+    // this writes the sending member + thread name that is stored in senderName
+    // on the receiver to show the cause of reader thread creation
+    connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
+        MsgIdGenerator.NO_MSG_ID);
+    writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
   }
 
   /**
@@ -1359,7 +1353,7 @@ public class Connection implements Runnable {
         if (!isReceiver && !hasResidualReaderThread()) {
           // receivers release the input buffer when exiting run(). Senders use the
           // inputBuffer for reading direct-reply responses
-          inputBufferVendor.destruct();
+          releaseInputBuffer();
         }
         lengthSet = false;
       }
@@ -1500,7 +1494,7 @@ public class Connection implements Runnable {
         }
       }
 
-      inputBufferVendor.destruct();
+      releaseInputBuffer();
 
       // make sure that if the reader thread exits we notify a thread waiting for the handshake.
       notifyHandshakeWaiter(false);
@@ -1512,6 +1506,16 @@ public class Connection implements Runnable {
     }
   }
 
+  private void releaseInputBuffer() {
+    synchronized (inputBufferLock) {
+      ByteBuffer tmp = inputBuffer;
+      if (tmp != null) {
+        inputBuffer = null;
+        getBufferPool().releaseReceiveBuffer(tmp);
+      }
+    }
+  }
+
   BufferPool getBufferPool() {
     return owner.getBufferPool();
   }
@@ -1533,6 +1537,9 @@ public class Connection implements Runnable {
   }
 
   private void readMessages() {
+    if (closing.get()) {
+      return;
+    }
     // take a snapshot of uniqueId to detect reconnect attempts
     SocketChannel channel;
     try {
@@ -1578,6 +1585,8 @@ public class Connection implements Runnable {
     // we should not change the state of the connection if we are a handshake reader thread
     // as there is a race between this thread and the application thread doing direct ack
     boolean handshakeHasBeenRead = false;
+    // if we're using SSL/TLS the input buffer may already have data to process
+    boolean skipInitialRead = getInputBuffer().position() > 0;
     try {
       for (boolean isInitialRead = true;;) {
         if (stopped) {
@@ -1597,9 +1606,8 @@ public class Connection implements Runnable {
           break;
         }
 
-        try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
-          ByteBuffer buff = inputSharing.getBuffer();
-
+        try {
+          ByteBuffer buff = getInputBuffer();
           synchronized (stateLock) {
             connectionState = STATE_READING;
           }
@@ -1608,8 +1616,6 @@ public class Connection implements Runnable {
             amountRead = channel.read(buff);
           } else {
             isInitialRead = false;
-            // if we're using SSL/TLS the input buffer may already have data to process
-            final boolean skipInitialRead = buff.position() > 0;
             if (!skipInitialRead) {
               amountRead = channel.read(buff);
             } else {
@@ -1674,11 +1680,11 @@ public class Connection implements Runnable {
         } catch (IOException e) {
           // "Socket closed" check needed for Solaris jdk 1.4.2_08
           if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
-            if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
-              logger.info("{} io exception for {}", p2pReaderName(), this, e);
+            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
+              logger.debug("{} io exception for {}", p2pReaderName(), this, e);
             }
-            if (logger.isDebugEnabled()) {
-              if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
+            if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
+              if (logger.isDebugEnabled()) {
                 logger.debug(
                     "{} received unexpected WSACancelBlockingCall exception, which may result in a hang",
                     p2pReaderName());
@@ -1722,55 +1728,28 @@ public class Connection implements Runnable {
   private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException {
     if (getConduit().useSSL() && channel != null) {
       InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
-      String hostName;
-      if (remoteAddr != null) {
-        hostName = remoteAddr.getHostName();
-      } else {
-        hostName = SocketCreator.getHostName(address.getAddress());
-      }
       SSLEngine engine =
-          getConduit().getSocketCreator().createSSLEngine(hostName,
-              address.getPort(), clientSocket);
-
-      final int packetBufferSize = engine.getSession().getPacketBufferSize();
-
-      inputBufferVendor =
-          new ByteBufferVendor(
-              getBufferPool().acquireDirectReceiveBuffer(packetBufferSize),
-              TRACKED_RECEIVER,
-              getBufferPool());
+          getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort(),
+              clientSocket);
 
+      int packetBufferSize = engine.getSession().getPacketBufferSize();
+      if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) {
+        // TLS has a minimum input buffer size constraint
+        if (inputBuffer != null) {
+          getBufferPool().releaseReceiveBuffer(inputBuffer);
+        }
+        inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize);
+      }
       if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
         channel.socket().setReceiveBufferSize(packetBufferSize);
       }
       if (channel.socket().getSendBufferSize() < packetBufferSize) {
         channel.socket().setSendBufferSize(packetBufferSize);
       }
-      try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
-        final ByteBuffer inputBuffer = inputSharing.getBuffer();
-        /*
-         * It's ok to share the inputBuffer with handshakeSSLSocketChannel() since that method
-         * accesses the referenced buffer for the handshake which completes before returning
-         * control here. The NioSslEngine retains no reference to the buffer.
-         */
-        ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
-            getConduit().idleConnectionTimeout, inputBuffer,
-            getBufferPool());
-      }
+      ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
+          getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
+          getBufferPool());
     } else {
-      final int allocSize;
-      if (recvBufferSize == -1) {
-        allocSize = owner.getConduit().tcpBufferSize;
-      } else {
-        allocSize = recvBufferSize;
-      }
-
-      inputBufferVendor =
-          new ByteBufferVendor(
-              getBufferPool().acquireDirectReceiveBuffer(allocSize),
-              TRACKED_RECEIVER,
-              getBufferPool());
-
       ioFilter = new NioPlainEngine(getBufferPool());
     }
   }
@@ -1802,13 +1781,9 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-
-    if (e instanceof SSLException && msg.contains("status = closed")) {
-      return true; // engine has been closed - this is normal
-    }
-
-    return (msg.contains("forcibly closed") || msg.contains("reset by peer")
-        || msg.contains("connection reset") || msg.contains("socket is closed"));
+    return msg.contains("forcibly closed")
+        || msg.contains("reset by peer")
+        || msg.contains("connection reset");
   }
 
   private static boolean validMsgType(int msgType) {
@@ -2652,6 +2627,20 @@ public class Connection implements Runnable {
   }
 
   /**
+   * gets the buffer for receiving message length bytes
+   */
+  private ByteBuffer getInputBuffer() {
+    if (inputBuffer == null) {
+      int allocSize = recvBufferSize;
+      if (allocSize == -1) {
+        allocSize = owner.getConduit().tcpBufferSize;
+      }
+      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
+    }
+    return inputBuffer;
+  }
+
+  /**
    * @throws SocketTimeoutException if wait expires.
    * @throws ConnectionException if ack is not received
    */
@@ -2758,93 +2747,72 @@ public class Connection implements Runnable {
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
-    try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
-      // can't be final because in some cases we expand the buffer (resulting in a new object)
-      ByteBuffer inputBuffer = inputSharing.getBuffer();
-      inputBuffer.flip();
+    inputBuffer.flip();
 
-      try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
-        final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+    try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
+      final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
 
-        peerDataBuffer.flip();
+      peerDataBuffer.flip();
 
-        boolean done = false;
+      boolean done = false;
 
-        while (!done && connected) {
-          owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-          int remaining = peerDataBuffer.remaining();
-          if (lengthSet || remaining >= MSG_HEADER_BYTES) {
-            if (!lengthSet) {
-              if (readMessageHeader(peerDataBuffer)) {
-                break;
-              }
+      while (!done && connected) {
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+        int remaining = peerDataBuffer.remaining();
+        if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+          if (!lengthSet) {
+            if (readMessageHeader(peerDataBuffer)) {
+              break;
             }
-            if (remaining >= messageLength + MSG_HEADER_BYTES) {
-              lengthSet = false;
-              peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
-              // don't trust the message deserialization to leave the position in
-              // the correct spot. Some of the serialization uses buffered
-              // streams that can leave the position at the wrong spot
-              int startPos = peerDataBuffer.position();
-              int oldLimit = peerDataBuffer.limit();
-              peerDataBuffer.limit(startPos + messageLength);
-
-              if (handshakeRead) {
-                try {
-                  readMessage(peerDataBuffer);
-                } catch (SerializationException e) {
-                  logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
-                  throw e;
-                }
-              } else {
-                try (ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
-                    DataInputStream dis = new DataInputStream(bbis)) {
-                  if (!isReceiver) {
-                    // we read the handshake and then stop processing since we don't want
-                    // to process the input buffer anymore in a handshake thread
-                    readHandshakeForSender(dis, peerDataBuffer);
-                    return;
-                  }
-                  if (readHandshakeForReceiver(dis)) {
-                    ioFilter.doneReading(peerDataBuffer);
-                    return;
-                  }
-                }
-              }
-              if (!connected) {
-                continue;
+          }
+          if (remaining >= messageLength + MSG_HEADER_BYTES) {
+            lengthSet = false;
+            peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
+            // don't trust the message deserialization to leave the position in
+            // the correct spot. Some of the serialization uses buffered
+            // streams that can leave the position at the wrong spot
+            int startPos = peerDataBuffer.position();
+            int oldLimit = peerDataBuffer.limit();
+            peerDataBuffer.limit(startPos + messageLength);
+
+            if (handshakeRead) {
+              try {
+                readMessage(peerDataBuffer);
+              } catch (SerializationException e) {
+                logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
+                throw e;
               }
-              accessed();
-              peerDataBuffer.limit(oldLimit);
-              peerDataBuffer.position(startPos + messageLength);
             } else {
-              done = true;
-              if (getConduit().useSSL()) {
+              ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
+              DataInputStream dis = new DataInputStream(bbis);
+              if (!isReceiver) {
+                // we read the handshake and then stop processing since we don't want
+                // to process the input buffer anymore in a handshake thread
+                readHandshakeForSender(dis, peerDataBuffer);
+                return;
+              }
+              if (readHandshakeForReceiver(dis)) {
                 ioFilter.doneReading(peerDataBuffer);
-              } else {
-                // compact or resize the buffer
-                final int oldBufferSize = inputBuffer.capacity();
-                final int allocSize = messageLength + MSG_HEADER_BYTES;
-                if (oldBufferSize < allocSize) {
-                  // need a bigger buffer
-                  logger.info(
-                      "Allocating larger network read buffer, new size is {} old size was {}.",
-                      allocSize, oldBufferSize);
-                  inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
-                } else {
-                  if (inputBuffer.position() != 0) {
-                    inputBuffer.compact();
-                  } else {
-                    inputBuffer.position(inputBuffer.limit());
-                    inputBuffer.limit(inputBuffer.capacity());
-                  }
-                }
+                return;
               }
             }
+            if (!connected) {
+              continue;
+            }
+            accessed();
+            peerDataBuffer.limit(oldLimit);
+            peerDataBuffer.position(startPos + messageLength);
           } else {
-            ioFilter.doneReading(peerDataBuffer);
             done = true;
+            if (getConduit().useSSL()) {
+              ioFilter.doneReading(peerDataBuffer);
+            } else {
+              compactOrResizeBuffer(messageLength);
+            }
           }
+        } else {
+          ioFilter.doneReading(peerDataBuffer);
+          done = true;
         }
       }
     }
@@ -2979,9 +2947,10 @@ public class Connection implements Runnable {
   private void readMessage(ByteBuffer peerDataBuffer) {
     if (messageType == NORMAL_MSG_TYPE) {
       owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
-      try (ByteBufferInputStream bbis =
+      ByteBufferInputStream bbis =
           remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
-              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) {
+              : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
+      try {
         ReplyProcessor21.initMessageRPId();
         // add serialization stats
         long startSer = owner.getConduit().getStats().startMsgDeserialization();
@@ -3234,8 +3203,34 @@ public class Connection implements Runnable {
   private void setThreadName(int dominoNumber) {
     Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " "
         + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un")
-        + "ordered sender uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
-        + " local port=" + socket.getLocalPort() + " remote port=" + socket.getPort());
+        + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
+        + " port=" + socket.getPort());
+  }
+
+  private void compactOrResizeBuffer(int messageLength) {
+    final int oldBufferSize = inputBuffer.capacity();
+    int allocSize = messageLength + MSG_HEADER_BYTES;
+    if (oldBufferSize < allocSize) {
+      // need a bigger buffer
+      logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
+          allocSize, oldBufferSize);
+      ByteBuffer oldBuffer = inputBuffer;
+      inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
+
+      if (oldBuffer != null) {
+        int oldByteCount = oldBuffer.remaining();
+        inputBuffer.put(oldBuffer);
+        inputBuffer.position(oldByteCount);
+        getBufferPool().releaseReceiveBuffer(oldBuffer);
+      }
+    } else {
+      if (inputBuffer.position() != 0) {
+        inputBuffer.compact();
+      } else {
+        inputBuffer.position(inputBuffer.limit());
+        inputBuffer.limit(inputBuffer.capacity());
+      }
+    }
   }
 
   private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck)
@@ -3320,7 +3315,6 @@ public class Connection implements Runnable {
    * socket is properly closed at this end. When that is the case isResidualReaderThread
    * will return true.
    */
-  @VisibleForTesting
   public boolean hasResidualReaderThread() {
     return hasResidualReaderThread;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
index fe4f08b..359d8aa 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
@@ -32,11 +32,11 @@ import org.junit.Test;
 public class ByteBufferVendorTest {
 
   @FunctionalInterface
-  private interface Foo {
+  private static interface Foo {
     void run() throws IOException;
   }
 
-  private ByteBufferVendor sharingVendor;
+  private ByteBufferVendor sharing;
   private BufferPool poolMock;
   private CountDownLatch clientHasOpenedResource;
   private CountDownLatch clientMayComplete;
@@ -44,7 +44,7 @@ public class ByteBufferVendorTest {
   @Before
   public void before() {
     poolMock = mock(BufferPool.class);
-    sharingVendor =
+    sharing =
         new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     clientHasOpenedResource = new CountDownLatch(1);
@@ -54,7 +54,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
-      try (final ByteBufferSharing _unused = sharingVendor.open()) {
+      try (final ByteBufferSharing _unused = sharing.open()) {
       }
     });
   }
@@ -62,7 +62,7 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
     resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharingVendor.open();
+      final ByteBufferSharing sharing2 = sharing.open();
       sharing2.close();
       verify(poolMock, times(0)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
@@ -73,7 +73,7 @@ public class ByteBufferVendorTest {
   @Test
   public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
     clientIsLastReferenceHolder("client with balanced close calls", () -> {
-      try (final ByteBufferSharing _unused = sharingVendor.open()) {
+      try (final ByteBufferSharing _unused = sharing.open()) {
         clientHasOpenedResource.countDown();
         blockClient();
       }
@@ -83,42 +83,36 @@ public class ByteBufferVendorTest {
   @Test
   public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
     clientIsLastReferenceHolder("client with extra close calls", () -> {
-      final ByteBufferSharing sharing2 = sharingVendor.open();
+      final ByteBufferSharing sharing2 = sharing.open();
       clientHasOpenedResource.countDown();
       blockClient();
       sharing2.close();
       verify(poolMock, times(1)).releaseBuffer(any(), any());
       assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+      System.out.println("here");
     });
   }
 
   @Test
   public void extraCloseDoesNotPrematurelyReturnBufferToPool() throws IOException {
-    final ByteBufferSharing sharing2 = sharingVendor.open();
+    final ByteBufferSharing sharing2 = sharing.open();
     sharing2.close();
     assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
     verify(poolMock, times(0)).releaseBuffer(any(), any());
-    sharingVendor.destruct();
+    sharing.destruct();
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
 
   @Test
   public void extraCloseDoesNotDecrementRefCount() throws IOException {
-    final ByteBufferSharing sharing2 = sharingVendor.open();
+    final ByteBufferSharing sharing2 = sharing.open();
     sharing2.close();
     assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
-    final ByteBufferSharing sharing3 = this.sharingVendor.open();
-    sharingVendor.destruct();
+    final ByteBufferSharing sharing3 = this.sharing.open();
+    sharing.destruct();
     verify(poolMock, times(0)).releaseBuffer(any(), any());
   }
 
-  @Test
-  public void destructIsIdempotent() {
-    sharingVendor.destruct();
-    sharingVendor.destruct();
-    verify(poolMock, times(1)).releaseBuffer(any(), any());
-  }
-
   private void resourceOwnerIsLastReferenceHolder(final String name, final Foo client)
       throws InterruptedException {
     /*
@@ -134,7 +128,7 @@ public class ByteBufferVendorTest {
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
-    sharingVendor.destruct();
+    sharing.destruct();
 
     verify(poolMock, times(1)).releaseBuffer(any(), any());
   }
@@ -153,7 +147,7 @@ public class ByteBufferVendorTest {
 
     clientHasOpenedResource.await();
 
-    sharingVendor.destruct();
+    sharing.destruct();
 
     verify(poolMock, times(0)).releaseBuffer(any(), any());
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index d6b9aa6..62a858c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -90,8 +90,7 @@ public class NioSslEngineTest {
 
   @Test
   public void engineUsesDirectBuffers() throws IOException {
-    try (final ByteBufferSharing outputSharing =
-        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
       assertThat(outputSharing.getBuffer().isDirect()).isTrue();
     }
   }
@@ -191,8 +190,7 @@ public class NioSslEngineTest {
 
   @Test
   public void wrap() throws Exception {
-    try (final ByteBufferSharing outputSharing =
-        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
 
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer appData =
@@ -223,8 +221,7 @@ public class NioSslEngineTest {
 
   @Test
   public void wrapFails() throws IOException {
-    try (final ByteBufferSharing outputSharing =
-        nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer appData =
           ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
@@ -247,8 +244,7 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferOverflow() throws Exception {
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       // make the application data too big to fit into the engine's encryption buffer
       final ByteBuffer peerAppData = inputSharing.getBuffer();
 
@@ -288,8 +284,7 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithBufferUnderflow() throws Exception {
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
       byte[] netBytes = new byte[wrappedData.capacity() / 2];
@@ -314,8 +309,7 @@ public class NioSslEngineTest {
 
   @Test
   public void unwrapWithDecryptionError() throws IOException {
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       // make the application data too big to fit into the engine's encryption buffer
       ByteBuffer wrappedData =
           ByteBuffer.allocate(inputSharing.getBuffer().capacity());
@@ -374,10 +368,10 @@ public class NioSslEngineTest {
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
         new SSLEngineResult(CLOSED, FINISHED, 0, 0));
     nioSslEngine.close(mockChannel);
-    assertThatThrownBy(() -> nioSslEngine.getOutputBufferVendorForTestingOnly().open().getBuffer())
+    assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
-    assertThatThrownBy(() -> nioSslEngine.getInputBufferVendorForTestingOnly().open().getBuffer())
+    assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
         .isInstanceOf(IOException.class)
         .hasMessageContaining("NioSslEngine has been closed");
     nioSslEngine.close(mockChannel);
@@ -407,8 +401,7 @@ public class NioSslEngineTest {
 
     when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
     when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
-      try (final ByteBufferSharing outputSharing =
-          nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+      try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
         // give the NioSslEngine something to write on its socket channel, simulating a TLS close
         // message
         outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
@@ -444,8 +437,7 @@ public class NioSslEngineTest {
     ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
     SocketChannel mockChannel = mock(SocketChannel.class);
 
-    try (final ByteBufferSharing inputSharing =
-        nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
       // force a compaction by making the decoded buffer appear near to being full
       ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
       unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
@@ -495,7 +487,10 @@ public class NioSslEngineTest {
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
 
-    nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -523,8 +518,7 @@ public class NioSslEngineTest {
       assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
       // The initial available space in the unwrapped buffer should have doubled
       int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
-      try (final ByteBufferSharing inputSharing =
-          nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+      try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
         assertThat(inputSharing.getBuffer().capacity())
             .isEqualTo(2 * initialFreeSpace + preexistingBytes);
       }
@@ -550,7 +544,10 @@ public class NioSslEngineTest {
     // force buffer expansion by making a small decoded buffer appear near to being full
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
-    nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
+    try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+      inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+    }
 
     // simulate some socket reads
     when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index 40f1ed3..c064afb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -103,7 +103,6 @@ public class ConnectionTest {
     TCPConduit tcpConduit = mock(TCPConduit.class);
 
     when(connectionTable.getConduit()).thenReturn(tcpConduit);
-    when(connectionTable.getBufferPool()).thenReturn(mock(BufferPool.class));
     when(distributionConfig.getMemberTimeout()).thenReturn(100);
     when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));
 

[geode] 02/02: Revert "GEODE-9141: (1 of 2) rename ByteBufferSharingImpl to ByteBuferVendor"

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f25ec03371d8287df3b404884fc751bd8c27b38b
Author: Ernest Burghardt <eb...@pivotal.io>
AuthorDate: Thu Jun 17 10:40:14 2021 -0500

    Revert "GEODE-9141: (1 of 2) rename ByteBufferSharingImpl to ByteBuferVendor"
    
    This reverts commit bba424244b8011daf4e7d1285334ad91ce4497fe.
---
 ...{ByteBufferVendor.java => ByteBufferSharingImpl.java} |  6 +++---
 .../apache/geode/internal/net/ByteBufferSharingNoOp.java |  2 +-
 .../java/org/apache/geode/internal/net/NioSslEngine.java | 10 +++++-----
 .../geode/internal/net/ByteBufferConcurrencyTest.java    | 16 ++++++++--------
 ...ferVendorTest.java => ByteBufferSharingImplTest.java} |  6 +++---
 .../org/apache/geode/internal/net/NioSslEngineTest.java  |  4 ++--
 6 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
similarity index 96%
rename from geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
index 4933247..b083d09 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
@@ -31,7 +31,7 @@ import org.apache.geode.internal.net.BufferPool.BufferType;
  * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
  * try-with-resources.
  */
-class ByteBufferVendor implements ByteBufferSharing {
+class ByteBufferSharingImpl implements ByteBufferSharing {
 
   static class OpenAttemptTimedOut extends Exception {
   }
@@ -53,8 +53,8 @@ class ByteBufferVendor implements ByteBufferSharing {
    * This constructor acquires no lock. The reference count will be 1 after this constructor
    * completes.
    */
-  ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType,
-                   final BufferPool bufferPool) {
+  ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType,
+      final BufferPool bufferPool) {
     this.buffer = buffer;
     this.bufferType = bufferType;
     this.bufferPool = bufferPool;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
index 4a8bc49..bd707e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -27,7 +27,7 @@ import java.nio.ByteBuffer;
  * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so,
  * needs no reference counting on buffers, nor any synchronization around access to buffers.
  *
- * See also {@link ByteBufferVendor}
+ * See also {@link ByteBufferSharingImpl}
  */
 class ByteBufferSharingNoOp implements ByteBufferSharing {
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index fc91a31..8969ecc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -42,7 +42,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.GemFireIOException;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.net.BufferPool.BufferType;
-import org.apache.geode.internal.net.ByteBufferVendor.OpenAttemptTimedOut;
+import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 
@@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter {
   /**
    * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
    */
-  private final ByteBufferVendor outputSharing;
+  private final ByteBufferSharingImpl outputSharing;
 
   /**
    * holds the last unwrapped data from a peer; a.k.a. peerAppData
    */
-  private final ByteBufferVendor inputSharing;
+  private final ByteBufferSharingImpl inputSharing;
 
   NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
@@ -77,10 +77,10 @@ public class NioSslEngine implements NioFilter {
     this.engine = engine;
     this.bufferPool = bufferPool;
     outputSharing =
-        new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
+        new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
             TRACKED_SENDER, bufferPool);
     inputSharing =
-        new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
+        new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
             TRACKED_RECEIVER, bufferPool);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java
index bf95963..7c1f383 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferConcurrencyTest.java
@@ -47,8 +47,8 @@ public class ByteBufferConcurrencyTest {
       throws Exception {
     poolMock = mock(BufferPool.class);
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferVendor sharing =
-        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferSharingImpl sharing =
+        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     executor.inParallel(() -> {
       sharing.destruct();
@@ -78,8 +78,8 @@ public class ByteBufferConcurrencyTest {
     }).when(poolMock).releaseBuffer(any(), any());
 
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferVendor sharing =
-        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferSharingImpl sharing =
+        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
 
     executor.inParallel(() -> {
@@ -109,8 +109,8 @@ public class ByteBufferConcurrencyTest {
       throws Exception {
     poolMock = mock(BufferPool.class);
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferVendor sharing =
-        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferSharingImpl sharing =
+        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
 
     final AtomicBoolean inUse = new AtomicBoolean(false);
@@ -134,8 +134,8 @@ public class ByteBufferConcurrencyTest {
       throws Exception {
     poolMock = mock(BufferPool.class);
     ByteBuffer someBuffer = ByteBuffer.allocate(1);
-    ByteBufferVendor sharing =
-        new ByteBufferVendor(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
+    ByteBufferSharingImpl sharing =
+        new ByteBufferSharingImpl(someBuffer, BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
 
     final AtomicBoolean inUse = new AtomicBoolean(false);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
similarity index 97%
rename from geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
index 359d8aa..d863076 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
@@ -29,14 +29,14 @@ import java.util.concurrent.CountDownLatch;
 import org.junit.Before;
 import org.junit.Test;
 
-public class ByteBufferVendorTest {
+public class ByteBufferSharingImplTest {
 
   @FunctionalInterface
   private static interface Foo {
     void run() throws IOException;
   }
 
-  private ByteBufferVendor sharing;
+  private ByteBufferSharingImpl sharing;
   private BufferPool poolMock;
   private CountDownLatch clientHasOpenedResource;
   private CountDownLatch clientMayComplete;
@@ -45,7 +45,7 @@ public class ByteBufferVendorTest {
   public void before() {
     poolMock = mock(BufferPool.class);
     sharing =
-        new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
+        new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
             poolMock);
     clientHasOpenedResource = new CountDownLatch(1);
     clientMayComplete = new CountDownLatch(1);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index 62a858c..dfd7b90 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -488,7 +488,7 @@ public class NioSslEngineTest {
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
 
     try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
-      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
       inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
     }
 
@@ -545,7 +545,7 @@ public class NioSslEngineTest {
     ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
     unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
     try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
-      final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+      final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
       inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
     }