You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/05/17 23:00:00 UTC

[geode] 01/01: GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6733
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a1b95388c76522be199ebb50d8b455e0e6411f3c
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri May 17 15:57:56 2019 -0700

    GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue
    
    Converted static Buffers class to be a non-static buffer pool.
---
 .../internal/net/SSLSocketIntegrationTest.java     |  4 +-
 .../distributed/internal/DistributionStats.java    |  2 -
 .../distributed/internal/direct/DirectChannel.java |  3 +-
 .../internal/net/{Buffers.java => BufferPool.java} | 75 ++++++++++------------
 .../org/apache/geode/internal/net/NioFilter.java   |  8 +--
 .../apache/geode/internal/net/NioPlainEngine.java  | 22 +++----
 .../apache/geode/internal/net/NioSslEngine.java    | 48 +++++++-------
 .../apache/geode/internal/net/SocketCreator.java   |  5 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 70 ++++++--------------
 .../apache/geode/internal/tcp/ConnectionTable.java |  8 +++
 .../geode/internal/tcp/DirectReplySender.java      |  3 +-
 .../apache/geode/internal/tcp/MsgOutputStream.java |  4 +-
 .../org/apache/geode/internal/tcp/MsgReader.java   |  8 +--
 .../org/apache/geode/internal/tcp/MsgStreamer.java | 31 +++++----
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  5 ++
 .../geode/internal/tcp/VersionedMsgStreamer.java   |  5 +-
 .../net/{BuffersTest.java => BufferPoolTest.java}  | 26 +++++---
 .../geode/internal/net/NioPlainEngineTest.java     | 20 +++---
 .../geode/internal/net/NioSslEngineTest.java       | 18 +++---
 .../geode/internal/tcp/ConnectionJUnitTest.java    |  5 +-
 20 files changed, 173 insertions(+), 197 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 8e27671..5a09285 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -216,7 +216,7 @@ public class SSLSocketIntegrationTest {
     NioSslEngine engine =
         clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
             clusterSocketCreator.createSSLEngine("localhost", 1234), 0, true,
-            ByteBuffer.allocate(65535), mock(DMStats.class));
+            ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
     clientChannel.configureBlocking(true);
 
     // transmit expected string from Client to Server
@@ -264,7 +264,7 @@ public class SSLSocketIntegrationTest {
                 timeoutMillis,
                 false,
                 ByteBuffer.allocate(500),
-                mock(DMStats.class));
+                new BufferPool(mock(DMStats.class)));
 
         readMessageFromNIOSSLClient(socket, buffer, engine);
         readMessageFromNIOSSLClient(socket, buffer, engine);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index 91c47e2..13a77fb 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -25,7 +25,6 @@ import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
 import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
 import org.apache.geode.internal.util.Breadcrumbs;
 
@@ -954,7 +953,6 @@ public class DistributionStats implements DMStats {
     // this.replyWaitHistogram = new HistogramStats("ReplyWait", "nanoseconds", f,
     // new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000},
     // false);
-    Buffers.initBufferStats(this);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index 7d6d046..ecf37f2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -373,7 +373,8 @@ public class DirectChannel {
           DMStats stats = getDMStats();
           List<?> sentCons; // used for cons we sent to this time
 
-          final BaseMsgStreamer ms = MsgStreamer.create(cons, msg, directReply, stats);
+          final BaseMsgStreamer ms =
+              MsgStreamer.create(cons, msg, directReply, stats, getConduit().getBufferPool());
           try {
             startTime = 0;
             if (ackTimeout > 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
similarity index 73%
rename from geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index c77803d..d796ed6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -19,11 +19,12 @@ import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.Assert;
 
-public class Buffers {
+public class BufferPool {
+  private final DMStats stats;
+
   /**
    * Buffers may be acquired from the Buffers pool
    * or they may be allocated using Buffer.allocate(). This enum is used
@@ -34,11 +35,15 @@ public class Buffers {
     UNTRACKED, TRACKED_SENDER, TRACKED_RECEIVER
   }
 
+
+  public BufferPool(DMStats stats) {
+    this.stats = stats;
+  }
+
   /**
    * A list of soft references to byte buffers.
    */
-  @MakeNotStatic
-  private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+  private final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
       new ConcurrentLinkedQueue<>();
 
   /**
@@ -51,15 +56,15 @@ public class Buffers {
    *
    * @return a byte buffer to be used for sending on this connection.
    */
-  public static ByteBuffer acquireSenderBuffer(int size, DMStats stats) {
-    return acquireBuffer(size, stats, true);
+  public ByteBuffer acquireSenderBuffer(int size) {
+    return acquireBuffer(size, true);
   }
 
-  public static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) {
-    return acquireBuffer(size, stats, false);
+  public ByteBuffer acquireReceiveBuffer(int size) {
+    return acquireBuffer(size, false);
   }
 
-  private static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) {
+  private ByteBuffer acquireBuffer(int size, boolean send) {
     ByteBuffer result;
     if (useDirectBuffers) {
       IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
@@ -109,19 +114,19 @@ public class Buffers {
     return result;
   }
 
-  public static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) {
-    releaseBuffer(bb, stats, true);
+  public void releaseSenderBuffer(ByteBuffer bb) {
+    releaseBuffer(bb, true);
   }
 
-  public static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) {
-    releaseBuffer(bb, stats, false);
+  public void releaseReceiveBuffer(ByteBuffer bb) {
+    releaseBuffer(bb, false);
   }
 
   /**
    * expand a buffer that's currently being read from
    */
-  static ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
-      int desiredCapacity, DMStats stats) {
+  ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
+      int desiredCapacity) {
     if (existing.capacity() >= desiredCapacity) {
       if (existing.position() > 0) {
         existing.compact();
@@ -129,51 +134,51 @@ public class Buffers {
       }
       return existing;
     }
-    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity);
     newBuffer.clear();
     newBuffer.put(existing);
     newBuffer.flip();
-    releaseBuffer(type, existing, stats);
+    releaseBuffer(type, existing);
     return newBuffer;
   }
 
   /**
    * expand a buffer that's currently being written to
    */
-  static ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
-      int desiredCapacity, DMStats stats) {
+  ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
+      int desiredCapacity) {
     if (existing.capacity() >= desiredCapacity) {
       return existing;
     }
-    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity);
     newBuffer.clear();
     existing.flip();
     newBuffer.put(existing);
-    releaseBuffer(type, existing, stats);
+    releaseBuffer(type, existing);
     return newBuffer;
   }
 
-  static ByteBuffer acquireBuffer(Buffers.BufferType type, int capacity, DMStats stats) {
+  ByteBuffer acquireBuffer(BufferPool.BufferType type, int capacity) {
     switch (type) {
       case UNTRACKED:
         return ByteBuffer.allocate(capacity);
       case TRACKED_SENDER:
-        return Buffers.acquireSenderBuffer(capacity, stats);
+        return acquireSenderBuffer(capacity);
       case TRACKED_RECEIVER:
-        return Buffers.acquireReceiveBuffer(capacity, stats);
+        return acquireReceiveBuffer(capacity);
     }
     throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
   }
 
-  static void releaseBuffer(Buffers.BufferType type, ByteBuffer buffer, DMStats stats) {
+  void releaseBuffer(BufferPool.BufferType type, ByteBuffer buffer) {
     switch (type) {
       case UNTRACKED:
         return;
       case TRACKED_SENDER:
-        Buffers.releaseSenderBuffer(buffer, stats);
+        releaseSenderBuffer(buffer);
         return;
       case TRACKED_RECEIVER:
-        Buffers.releaseReceiveBuffer(buffer, stats);
+        releaseReceiveBuffer(buffer);
         return;
     }
     throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
@@ -183,7 +188,7 @@ public class Buffers {
   /**
    * Releases a previously acquired buffer.
    */
-  private static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) {
+  private void releaseBuffer(ByteBuffer bb, boolean send) {
     if (useDirectBuffers) {
       BBSoftReference bbRef = new BBSoftReference(bb, send);
       bufferQueue.offer(bbRef);
@@ -196,20 +201,6 @@ public class Buffers {
     }
   }
 
-  public static void initBufferStats(DMStats stats) { // fixes 46773
-    if (useDirectBuffers) {
-      for (BBSoftReference ref : bufferQueue) {
-        if (ref.getBB() != null) {
-          if (ref.getSend()) { // fix bug 46773
-            stats.incSenderBufferSize(ref.getSize(), true);
-          } else {
-            stats.incReceiverBufferSize(ref.getSize(), true);
-          }
-        }
-      }
-    }
-  }
-
   /**
    * A soft reference that remembers the size of the byte buffer it refers to. TODO Dan - I really
    * think this should be a weak reference. The JVM doesn't seem to clear soft references if it is
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 6cb40ec..8e41ef1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -18,8 +18,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
-import org.apache.geode.distributed.internal.DMStats;
-
 /**
  * Prior to transmitting a buffer or processing a received buffer
  * a NioFilter should be called to wrap (transmit) or unwrap (received)
@@ -44,7 +42,7 @@ public interface NioFilter {
    * This must be invoked before readAtLeast. A new buffer may be returned by this method.
    */
   ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType, DMStats stats);
+      BufferPool.BufferType bufferType);
 
   /**
    * read at least the indicated amount of bytes from the given
@@ -55,8 +53,8 @@ public interface NioFilter {
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer,
-      DMStats stats) throws IOException;
+  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+      throws IOException;
 
   /**
    * You must invoke this when done reading from the unwrapped buffer
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 8a3e3fb..32fa297 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -20,24 +20,22 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.logging.LogService;
 
 /**
  * A pass-through implementation of NioFilter. Use this if you don't need
  * secure communications.
  */
 public class NioPlainEngine implements NioFilter {
-  private static final Logger logger = LogService.getLogger();
+  private final BufferPool bufferPool;
 
   int lastReadPosition;
   int lastProcessedPosition;
 
 
-  public NioPlainEngine() {}
+  public NioPlainEngine(BufferPool bufferPool) {
+    this.bufferPool = bufferPool;
+  }
 
   @Override
   public ByteBuffer wrap(ByteBuffer buffer) {
@@ -52,11 +50,11 @@ public class NioPlainEngine implements NioFilter {
 
   @Override
   public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType, DMStats stats) {
+      BufferPool.BufferType bufferType) {
     ByteBuffer buffer = wrappedBuffer;
 
     if (buffer == null) {
-      buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+      buffer = bufferPool.acquireBuffer(bufferType, amount);
       buffer.clear();
       lastProcessedPosition = 0;
       lastReadPosition = 0;
@@ -73,10 +71,10 @@ public class NioPlainEngine implements NioFilter {
       ByteBuffer oldBuffer = buffer;
       oldBuffer.limit(lastReadPosition);
       oldBuffer.position(lastProcessedPosition);
-      buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+      buffer = bufferPool.acquireBuffer(bufferType, amount);
       buffer.clear();
       buffer.put(oldBuffer);
-      Buffers.releaseBuffer(bufferType, oldBuffer, stats);
+      bufferPool.releaseBuffer(bufferType, oldBuffer);
       lastReadPosition = buffer.position();
       lastProcessedPosition = 0;
     }
@@ -84,8 +82,8 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer,
-      DMStats stats) throws IOException {
+  public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
+      throws IOException {
     ByteBuffer buffer = wrappedBuffer;
 
     Assert.assertTrue(buffer.capacity() - lastProcessedPosition >= bytes);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index dd71d75..9bf969d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -19,9 +19,8 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
 import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
 import static javax.net.ssl.SSLEngineResult.Status.OK;
-import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_RECEIVER;
-import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_SENDER;
-import static org.apache.geode.internal.net.Buffers.releaseBuffer;
+import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
+import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -41,8 +40,8 @@ import javax.net.ssl.SSLSession;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.BufferPool.BufferType;
 
 
 /**
@@ -53,7 +52,7 @@ import org.apache.geode.internal.logging.LogService;
 public class NioSslEngine implements NioFilter {
   private static final Logger logger = LogService.getLogger();
 
-  private final DMStats stats;
+  private final BufferPool bufferPool;
 
   private volatile boolean closed;
 
@@ -74,14 +73,14 @@ public class NioSslEngine implements NioFilter {
    */
   ByteBuffer handshakeBuffer;
 
-  NioSslEngine(SSLEngine engine, DMStats stats) {
-    this.stats = stats;
+  NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
     int appBufferSize = session.getApplicationBufferSize();
     int packetBufferSize = engine.getSession().getPacketBufferSize();
     this.myNetData = ByteBuffer.allocate(packetBufferSize);
     this.peerAppData = ByteBuffer.allocate(appBufferSize);
     this.engine = engine;
+    this.bufferPool = bufferPool;
   }
 
   /**
@@ -97,7 +96,7 @@ public class NioSslEngine implements NioFilter {
         logger.debug("Allocating new buffer for SSL handshake");
       }
       this.handshakeBuffer =
-          Buffers.acquireReceiveBuffer(engine.getSession().getPacketBufferSize(), stats);
+          bufferPool.acquireReceiveBuffer(engine.getSession().getPacketBufferSize());
     } else {
       this.handshakeBuffer = peerNetData;
     }
@@ -154,8 +153,7 @@ public class NioSslEngine implements NioFilter {
 
           if (engineResult.getStatus() == BUFFER_OVERFLOW) {
             peerAppData =
-                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2,
-                    stats);
+                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2);
           }
           break;
 
@@ -172,7 +170,7 @@ public class NioSslEngine implements NioFilter {
             case BUFFER_OVERFLOW:
               myNetData =
                   expandWriteBuffer(TRACKED_SENDER, myNetData,
-                      myNetData.capacity() * 2, stats);
+                      myNetData.capacity() * 2);
               break;
             case OK:
               myNetData.flip();
@@ -216,9 +214,9 @@ public class NioSslEngine implements NioFilter {
     return true;
   }
 
-  ByteBuffer expandWriteBuffer(Buffers.BufferType type, ByteBuffer existing,
-      int desiredCapacity, DMStats stats) {
-    return Buffers.expandWriteBufferIfNeeded(type, existing, desiredCapacity, stats);
+  ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
+      int desiredCapacity) {
+    return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
   }
 
   void checkClosed() {
@@ -248,7 +246,7 @@ public class NioSslEngine implements NioFilter {
 
       if (remaining < (appData.remaining() * 2)) {
         int newCapacity = expandedCapacity(appData, myNetData);
-        myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity, stats);
+        myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity);
       }
 
       SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
@@ -303,27 +301,27 @@ public class NioSslEngine implements NioFilter {
   void expandPeerAppData(ByteBuffer wrappedBuffer) {
     if (peerAppData.capacity() - peerAppData.position() < 2 * wrappedBuffer.remaining()) {
       peerAppData =
-          Buffers.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData,
-              expandedCapacity(wrappedBuffer, peerAppData), stats);
+          bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData,
+              expandedCapacity(wrappedBuffer, peerAppData));
     }
   }
 
   @Override
   public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType, DMStats stats) {
+      BufferType bufferType) {
     ByteBuffer buffer = wrappedBuffer;
     int requiredSize = engine.getSession().getPacketBufferSize();
     if (buffer == null) {
-      buffer = Buffers.acquireBuffer(bufferType, requiredSize, stats);
+      buffer = bufferPool.acquireBuffer(bufferType, requiredSize);
     } else if (buffer.capacity() < requiredSize) {
-      buffer = Buffers.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize, stats);
+      buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize);
     }
     return buffer;
   }
 
   @Override
   public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
-      ByteBuffer wrappedBuffer, DMStats stats) throws IOException {
+      ByteBuffer wrappedBuffer) throws IOException {
     if (peerAppData.capacity() > bytes) {
       // we already have a buffer that's big enough
       if (peerAppData.capacity() - peerAppData.position() < bytes) {
@@ -332,7 +330,7 @@ public class NioSslEngine implements NioFilter {
       }
     } else {
       peerAppData =
-          Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
+          bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes);
     }
 
     while (peerAppData.remaining() < bytes) {
@@ -367,7 +365,7 @@ public class NioSslEngine implements NioFilter {
     // for TTLS the app-data buffers do not need to be tracked direct-buffers since we
     // do not use them for I/O operations
     peerAppData =
-        Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount, this.stats);
+        bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount);
     return peerAppData;
   }
 
@@ -405,8 +403,8 @@ public class NioSslEngine implements NioFilter {
     } catch (IOException e) {
       throw new GemFireIOException("exception closing SSL session", e);
     } finally {
-      releaseBuffer(TRACKED_SENDER, myNetData, stats);
-      releaseBuffer(TRACKED_RECEIVER, peerAppData, stats);
+      bufferPool.releaseBuffer(TRACKED_SENDER, myNetData);
+      bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData);
       this.closed = true;
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index 412c423..bc136f5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -84,7 +84,6 @@ import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
 import org.apache.geode.distributed.ClientSocketFactory;
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -940,7 +939,7 @@ public class SocketCreator {
       int timeout,
       boolean clientSocket,
       ByteBuffer peerNetBuffer,
-      DMStats stats)
+      BufferPool bufferPool)
       throws IOException {
     engine.setUseClientMode(clientSocket);
     while (!socketChannel.finishConnect()) {
@@ -954,7 +953,7 @@ public class SocketCreator {
       }
     }
 
-    NioSslEngine nioSslEngine = new NioSslEngine(engine, stats);
+    NioSslEngine nioSslEngine = new NioSslEngine(engine, bufferPool);
 
     boolean blocking = socketChannel.isBlocking();
     if (blocking) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 2ba313e..a9cb8d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,8 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -78,7 +76,7 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -593,7 +591,7 @@ public class Connection implements Runnable {
     bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
     int allocSize = bytes.length;
     ByteBuffer bb;
-    if (Buffers.useDirectBuffers) {
+    if (BufferPool.useDirectBuffers) {
       bb = ByteBuffer.allocateDirect(allocSize);
     } else {
       bb = ByteBuffer.allocate(allocSize);
@@ -637,7 +635,7 @@ public class Connection implements Runnable {
     if (this.isReceiver) {
       DistributionConfig cfg = owner.getConduit().config;
       ByteBuffer bb;
-      if (Buffers.useDirectBuffers) {
+      if (BufferPool.useDirectBuffers) {
         bb = ByteBuffer.allocateDirect(128);
       } else {
         bb = ByteBuffer.allocate(128);
@@ -1200,7 +1198,7 @@ public class Connection implements Runnable {
   private BatchBufferFlusher batchFlusher;
 
   private void createBatchSendBuffer() {
-    if (Buffers.useDirectBuffers) {
+    if (BufferPool.useDirectBuffers) {
       this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
       this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
     } else {
@@ -1613,10 +1611,14 @@ public class Connection implements Runnable {
     if (tmp != null) {
       this.inputBuffer = null;
       final DMStats stats = this.owner.getConduit().getStats();
-      Buffers.releaseReceiveBuffer(tmp, stats);
+      getBufferPool().releaseReceiveBuffer(tmp);
     }
   }
 
+  BufferPool getBufferPool() {
+    return owner.getBufferPool();
+  }
+
   private String p2pReaderName() {
     StringBuilder sb = new StringBuilder(64);
     if (this.isReceiver) {
@@ -1840,9 +1842,9 @@ public class Connection implements Runnable {
           || (inputBuffer.capacity() < packetBufferSize)) {
         // TLS has a minimum input buffer size constraint
         if (inputBuffer != null) {
-          Buffers.releaseReceiveBuffer(inputBuffer, getConduit().getStats());
+          getBufferPool().releaseReceiveBuffer(inputBuffer);
         }
-        inputBuffer = Buffers.acquireReceiveBuffer(packetBufferSize, getConduit().getStats());
+        inputBuffer = getBufferPool().acquireReceiveBuffer(packetBufferSize);
       }
       if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
         channel.socket().setReceiveBufferSize(packetBufferSize);
@@ -1851,9 +1853,10 @@ public class Connection implements Runnable {
         channel.socket().setSendBufferSize(packetBufferSize);
       }
       ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
-          getConduit().idleConnectionTimeout, clientSocket, inputBuffer, getConduit().getStats());
+          getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
+          getBufferPool());
     } else {
-      ioFilter = new NioPlainEngine();
+      ioFilter = new NioPlainEngine(getBufferPool());
     }
   }
 
@@ -1958,42 +1961,6 @@ public class Connection implements Runnable {
   }
 
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE")
-  void readFully(InputStream input, byte[] buffer, int len) throws IOException {
-    int bytesSoFar = 0;
-    while (bytesSoFar < len) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-      try {
-        synchronized (stateLock) {
-          connectionState = STATE_READING;
-        }
-        int bytesThisTime = input.read(buffer, bytesSoFar, len - bytesSoFar);
-        if (bytesThisTime < 0) {
-          this.readerShuttingDown = true;
-          try {
-            requestClose("Stream read returned non-positive length");
-          } catch (Exception ignored) {
-          }
-          return;
-        }
-        bytesSoFar += bytesThisTime;
-      } catch (InterruptedIOException io) {
-        // Current thread has been interrupted. Regard it similar to an EOF
-        this.readerShuttingDown = true;
-        try {
-          requestClose("Current thread interrupted");
-        } catch (Exception ignored) {
-        }
-        Thread.currentThread().interrupt();
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-      } finally {
-        synchronized (stateLock) {
-          connectionState = STATE_IDLE;
-        }
-      }
-    } // while
-  }
-
   /**
    * sends a serialized message to the other end of this connection. This is used by the
    * DirectChannel in GemFire when the message is going to be sent to multiple recipients.
@@ -2011,7 +1978,7 @@ public class Connection implements Runnable {
       return;
     }
     final boolean origSocketInUse = this.socketInUse;
-    byte originalState = -1;
+    byte originalState;
     synchronized (stateLock) {
       originalState = this.connectionState;
       this.connectionState = STATE_SENDING;
@@ -2237,7 +2204,6 @@ public class Connection implements Runnable {
                 ck.setBuffer(oldBuffer);
               } else {
                 // old buffer was not large enough
-                oldBuffer = null;
                 ByteBuffer newbb = ByteBuffer.allocate(newBytes);
                 newbb.put(buffer);
                 newbb.flip();
@@ -2787,7 +2753,7 @@ public class Connection implements Runnable {
       if (allocSize == -1) {
         allocSize = this.owner.getConduit().tcpBufferSize;
       }
-      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, this.owner.getConduit().getStats());
+      inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize);
     }
     return inputBuffer;
   }
@@ -3398,13 +3364,13 @@ public class Connection implements Runnable {
       logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
           allocSize, oldBufferSize);
       ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
+      inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize);
 
       if (oldBuffer != null) {
         int oldByteCount = oldBuffer.remaining();
         inputBuffer.put(oldBuffer);
         inputBuffer.position(oldByteCount);
-        Buffers.releaseReceiveBuffer(oldBuffer, stats);
+        getBufferPool().releaseReceiveBuffer(oldBuffer);
       }
     } else {
       if (inputBuffer.position() != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index f3a4432..50c3cf8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -48,6 +48,7 @@ import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingExecutors;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.SocketCloser;
 
 /**
@@ -124,6 +125,8 @@ public class ConnectionTable {
    */
   private final TCPConduit owner;
 
+  private final BufferPool bufferPool;
+
   /**
    * true if this table is no longer in use
    */
@@ -199,6 +202,7 @@ public class ConnectionTable {
     this.threadConnectionMap = new ConcurrentHashMap();
     this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
     this.socketCloser = new SocketCloser();
+    this.bufferPool = new BufferPool(owner.getStats());
   }
 
   private Executor createThreadPoolForIO(boolean conserveSockets) {
@@ -611,6 +615,10 @@ public class ConnectionTable {
     return owner;
   }
 
+  public BufferPool getBufferPool() {
+    return bufferPool;
+  }
+
   public boolean isClosed() {
     return this.closed;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 73d1582..dbb828d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -65,7 +65,8 @@ class DirectReplySender implements ReplySender {
     }
     ArrayList<Connection> conns = new ArrayList<Connection>(1);
     conns.add(conn);
-    MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS);
+    MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS,
+        conn.getBufferPool());
     try {
       ms.writeMessage();
       ConnectExceptions ce = ms.getConnectExceptions();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
index 2d767b8..98669b3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 
 /**
  * MsgOutputStream should no longer be used except in Connection to do the handshake. Otherwise
@@ -38,7 +38,7 @@ public class MsgOutputStream extends OutputStream implements ObjToByteArraySeria
    * The caller of this constructor is responsible for managing the allocated instance.
    */
   public MsgOutputStream(int allocSize) {
-    if (Buffers.useDirectBuffers) {
+    if (BufferPool.useDirectBuffers) {
       this.buffer = ByteBuffer.allocateDirect(allocSize);
     } else {
       this.buffer = ByteBuffer.allocate(allocSize);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index afb0272..0a33428 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -27,7 +27,7 @@ import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.NioFilter;
 
 /**
@@ -125,13 +125,13 @@ public class MsgReader {
 
   private ByteBuffer readAtLeast(int bytes) throws IOException {
     peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
-        Buffers.BufferType.TRACKED_RECEIVER, getStats());
-    return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
+        BufferPool.BufferType.TRACKED_RECEIVER);
+    return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData);
   }
 
   public void close() {
     if (peerNetData != null) {
-      Buffers.releaseReceiveBuffer(peerNetData, getStats());
+      conn.getBufferPool().releaseReceiveBuffer(peerNetData);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 22a385b..d42bef1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -25,7 +25,6 @@ import java.util.List;
 import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
 import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
-import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.distributed.internal.DMStats;
@@ -37,8 +36,7 @@ import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 
 /**
  * <p>
@@ -52,13 +50,13 @@ import org.apache.geode.internal.net.Buffers;
 public class MsgStreamer extends OutputStream
     implements ObjToByteArraySerializer, BaseMsgStreamer, ByteBufferWriter {
 
-  private static final Logger logger = LogService.getLogger();
-
   /**
    * List of connections to send this msg to.
    */
   private final List<?> cons;
 
+  private final BufferPool bufferPool;
+
   /**
    * Any exceptions that happen during sends
    */
@@ -98,7 +96,7 @@ public class MsgStreamer extends OutputStream
     MsgIdGenerator.release(this.msgId);
     this.buffer.clear();
     this.overflowBuf = null;
-    Buffers.releaseSenderBuffer(this.buffer, this.stats);
+    bufferPool.releaseSenderBuffer(this.buffer);
   }
 
   /**
@@ -126,15 +124,16 @@ public class MsgStreamer extends OutputStream
    * now be used.
    */
   MsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats,
-      int sendBufferSize) {
+      int sendBufferSize, BufferPool bufferPool) {
     this.stats = stats;
     this.msg = msg;
     this.cons = cons;
-    this.buffer = Buffers.acquireSenderBuffer(sendBufferSize, stats);
+    this.buffer = bufferPool.acquireSenderBuffer(sendBufferSize);
     this.buffer.clear();
     this.buffer.position(Connection.MSG_HEADER_BYTES);
     this.msgId = MsgIdGenerator.NO_MSG_ID;
     this.directReply = directReply;
+    this.bufferPool = bufferPool;
     startSerialization();
   }
 
@@ -144,7 +143,7 @@ public class MsgStreamer extends OutputStream
    * List of MsgStreamer objects.
    */
   public static BaseMsgStreamer create(List<?> cons, final DistributionMessage msg,
-      final boolean directReply, final DMStats stats) {
+      final boolean directReply, final DMStats stats, BufferPool bufferPool) {
     final Connection firstCon = (Connection) cons.get(0);
     // split into different versions if required
     Version version;
@@ -170,7 +169,8 @@ public class MsgStreamer extends OutputStream
         }
       }
       if (versionToConnMap == null) {
-        return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize());
+        return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+            bufferPool);
       } else {
         // if there is a versioned stream created, then split remaining
         // connections to unversioned stream
@@ -187,7 +187,8 @@ public class MsgStreamer extends OutputStream
               unversionedCons.add(con);
             }
           }
-          streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize));
+          streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize,
+              bufferPool));
         }
         for (ObjectIterator<Object2ObjectMap.Entry> itr =
             versionToConnMap.object2ObjectEntrySet().fastIterator(); itr.hasNext();) {
@@ -195,15 +196,17 @@ public class MsgStreamer extends OutputStream
           Object ver = entry.getKey();
           Object l = entry.getValue();
           streamers.add(new VersionedMsgStreamer((List<?>) l, msg, directReply, stats,
-              sendBufferSize, (Version) ver));
+              bufferPool, sendBufferSize, (Version) ver));
         }
         return new MsgStreamerList(streamers);
       }
     } else if ((version = firstCon.getRemoteVersion()) == null) {
-      return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize());
+      return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+          bufferPool);
     } else {
       // create a single VersionedMsgStreamer
-      return new VersionedMsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+      return new VersionedMsgStreamer(cons, msg, directReply, stats, bufferPool,
+          firstCon.getSendBufferSize(),
           version);
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 97d748f..699c706 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
@@ -975,6 +976,10 @@ public class TCPConduit implements Runnable {
     return useSSL;
   }
 
+  public BufferPool getBufferPool() {
+    return this.conTable.getBufferPool();
+  }
+
   protected class Stopper extends CancelCriterion {
 
     @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
index 4fe7b32..72b68f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
@@ -21,6 +21,7 @@ import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.net.BufferPool;
 
 /**
  * An extension of {@link MsgStreamer} that implements {@link VersionedDataStream}.
@@ -32,8 +33,8 @@ class VersionedMsgStreamer extends MsgStreamer implements VersionedDataStream {
   private final Version version;
 
   VersionedMsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats,
-      int sendBufferSize, Version version) {
-    super(cons, msg, directReply, stats, sendBufferSize);
+      BufferPool bufferPool, int sendBufferSize, Version version) {
+    super(cons, msg, directReply, stats, sendBufferSize, bufferPool);
     this.version = version;
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
similarity index 84%
rename from geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index 96a4ac6..cc441e4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -21,11 +21,19 @@ import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.distributed.internal.DMStats;
 
-public class BuffersTest {
+public class BufferPoolTest {
+
+  private BufferPool bufferPool;
+
+  @Before
+  public void setup() {
+    bufferPool = new BufferPool(mock(DMStats.class));
+  }
 
   @Test
   public void expandBuffer() throws Exception {
@@ -50,8 +58,7 @@ public class BuffersTest {
   private void createAndVerifyNewWriteBuffer(ByteBuffer buffer, boolean useDirectBuffer) {
     buffer.position(buffer.capacity());
     ByteBuffer newBuffer =
-        Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500,
-            mock(DMStats.class));
+        bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500);
     assertEquals(buffer.position(), newBuffer.position());
     assertEquals(500, newBuffer.capacity());
     newBuffer.flip();
@@ -66,8 +73,7 @@ public class BuffersTest {
     buffer.position(0);
     buffer.limit(256);
     ByteBuffer newBuffer =
-        Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500,
-            mock(DMStats.class));
+        bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500);
     assertEquals(0, newBuffer.position());
     assertEquals(500, newBuffer.capacity());
     for (int i = 0; i < 256; i++) {
@@ -84,8 +90,9 @@ public class BuffersTest {
     ByteBuffer buffer = ByteBuffer.allocate(33842);
     buffer.position(7);
     buffer.limit(16384);
-    ByteBuffer newBuffer = Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer,
-        40899, mock(DMStats.class));
+    ByteBuffer newBuffer =
+        bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer,
+            40899);
     assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899);
     // buffer should be ready to read the same amount of data
     assertThat(newBuffer.position()).isEqualTo(0);
@@ -98,8 +105,9 @@ public class BuffersTest {
     ByteBuffer buffer = ByteBuffer.allocate(33842);
     buffer.position(16384);
     buffer.limit(buffer.capacity());
-    ByteBuffer newBuffer = Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer,
-        40899, mock(DMStats.class));
+    ByteBuffer newBuffer =
+        bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer,
+            40899);
     assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899);
     // buffer should have the same amount of data as the old one
     assertThat(newBuffer.position()).isEqualTo(16384);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index 5fe4def..133d827 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -36,17 +36,15 @@ import org.apache.geode.distributed.internal.DMStats;
 
 public class NioPlainEngineTest {
 
-  private static final int netBufferSize = 10000;
-  private static final int appBufferSize = 20000;
-
   private DMStats mockStats;
   private NioPlainEngine nioEngine;
+  private BufferPool bufferPool;
 
   @Before
   public void setUp() throws Exception {
     mockStats = mock(DMStats.class);
-
-    nioEngine = new NioPlainEngine();
+    bufferPool = new BufferPool(mockStats);
+    nioEngine = new NioPlainEngine(bufferPool);
   }
 
   @Test
@@ -60,13 +58,13 @@ public class NioPlainEngineTest {
   @Test
   @Ignore("Pending fix of GEODE-6733 to remove static from Buffers implementation")
   public void ensureWrappedCapacity() {
-    ByteBuffer wrappedBuffer = Buffers.acquireReceiveBuffer(100, mockStats);
+    ByteBuffer wrappedBuffer = bufferPool.acquireReceiveBuffer(100);
     verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
     wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
     nioEngine.lastReadPosition = 10;
     int requestedCapacity = 210;
     ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
-        Buffers.BufferType.TRACKED_RECEIVER, mockStats);
+        BufferPool.BufferType.TRACKED_RECEIVER);
     verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
     assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
     assertThat(result).isNotSameAs(wrappedBuffer);
@@ -90,7 +88,7 @@ public class NioPlainEngineTest {
     nioEngine.lastReadPosition = consumedDataPresentInBuffer + unconsumedDataPresentInBuffer;
     ByteBuffer result =
         wrappedBuffer = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
-            Buffers.BufferType.UNTRACKED, mockStats);
+            BufferPool.BufferType.UNTRACKED);
     assertThat(result.capacity()).isEqualTo(requestedCapacity + unconsumedDataPresentInBuffer);
     assertThat(result).isSameAs(wrappedBuffer);
     // make sure that data was transferred to the new buffer
@@ -121,14 +119,14 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
     assertThat(data.position()).isEqualTo(0);
     assertThat(data.limit()).isEqualTo(amountToRead);
     assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
     assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
 
-    data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(5)).read(any(ByteBuffer.class));
     // at end of last readAtLeast data
     assertThat(data.position()).isEqualTo(amountToRead);
@@ -152,7 +150,7 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index b12df09..7236895 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -80,7 +80,7 @@ public class NioSslEngineTest {
 
     mockStats = mock(DMStats.class);
 
-    nioSslEngine = new NioSslEngine(mockEngine, mockStats);
+    nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats));
     spyNioSslEngine = spy(nioSslEngine);
   }
 
@@ -113,8 +113,8 @@ public class NioSslEngineTest {
     verify(mockEngine, atLeast(2)).getHandshakeStatus();
     verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
     verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
-    verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(Buffers.BufferType.class),
-        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class),
+        any(ByteBuffer.class), any(Integer.class));
     verify(spyNioSslEngine, times(1)).handleBlockingTasks();
     verify(mockChannel, times(3)).read(any(ByteBuffer.class));
   }
@@ -228,8 +228,8 @@ public class NioSslEngineTest {
 
     ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
 
-    verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(Buffers.BufferType.class),
-        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class),
+        any(ByteBuffer.class), any(Integer.class));
     appData.flip();
     assertThat(wrappedBuffer).isEqualTo(appData);
     verify(spyNioSslEngine, times(1)).handleBlockingTasks();
@@ -376,14 +376,14 @@ public class NioSslEngineTest {
   public void ensureWrappedCapacityOfSmallMessage() {
     ByteBuffer buffer = ByteBuffer.allocate(netBufferSize);
     assertThat(
-        nioSslEngine.ensureWrappedCapacity(10, buffer, Buffers.BufferType.UNTRACKED, mockStats))
+        nioSslEngine.ensureWrappedCapacity(10, buffer, BufferPool.BufferType.UNTRACKED))
             .isEqualTo(buffer);
   }
 
   @Test
   public void ensureWrappedCapacityWithNoBuffer() {
     assertThat(
-        nioSslEngine.ensureWrappedCapacity(10, null, Buffers.BufferType.UNTRACKED, mockStats)
+        nioSslEngine.ensureWrappedCapacity(10, null, BufferPool.BufferType.UNTRACKED)
             .capacity())
                 .isEqualTo(netBufferSize);
   }
@@ -415,7 +415,7 @@ public class NioSslEngineTest {
     testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
     assertThat(data.position()).isEqualTo(0);
     assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
@@ -459,7 +459,7 @@ public class NioSslEngineTest {
         new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
     assertThat(data.position()).isEqualTo(0);
     assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
index 73cf06c..854685f 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
@@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.test.junit.categories.MembershipTest;
@@ -52,10 +53,12 @@ public class ConnectionJUnitTest {
     DistributionManager distMgr = mock(DistributionManager.class);
     MembershipManager membership = mock(MembershipManager.class);
     TCPConduit conduit = mock(TCPConduit.class);
+    DMStats stats = mock(DMStats.class);
 
     // mock the connection table and conduit
 
     when(table.getConduit()).thenReturn(conduit);
+    when(table.getBufferPool()).thenReturn(new BufferPool(stats));
 
     CancelCriterion stopper = mock(CancelCriterion.class);
     when(stopper.cancelInProgress()).thenReturn(null);
@@ -67,7 +70,7 @@ public class ConnectionJUnitTest {
     // mock the distribution manager and membership manager
     when(distMgr.getMembershipManager()).thenReturn(membership);
     when(conduit.getDM()).thenReturn(distMgr);
-    when(conduit.getStats()).thenReturn(mock(DMStats.class));
+    when(conduit.getStats()).thenReturn(stats);
     when(table.getDM()).thenReturn(distMgr);
     SocketCloser closer = mock(SocketCloser.class);
     when(table.getSocketCloser()).thenReturn(closer);