You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/05/17 22:59:59 UTC

[geode] branch feature/GEODE-6733 created (now a1b9538)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a change to branch feature/GEODE-6733
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at a1b9538  GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue

This branch includes the following new commits:

     new a1b9538  GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue

Posted by bs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6733
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a1b95388c76522be199ebb50d8b455e0e6411f3c
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri May 17 15:57:56 2019 -0700

    GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue
    
    Converted static Buffers class to be a non-static buffer pool.
---
 .../internal/net/SSLSocketIntegrationTest.java     |  4 +-
 .../distributed/internal/DistributionStats.java    |  2 -
 .../distributed/internal/direct/DirectChannel.java |  3 +-
 .../internal/net/{Buffers.java => BufferPool.java} | 75 ++++++++++------------
 .../org/apache/geode/internal/net/NioFilter.java   |  8 +--
 .../apache/geode/internal/net/NioPlainEngine.java  | 22 +++----
 .../apache/geode/internal/net/NioSslEngine.java    | 48 +++++++-------
 .../apache/geode/internal/net/SocketCreator.java   |  5 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 70 ++++++--------------
 .../apache/geode/internal/tcp/ConnectionTable.java |  8 +++
 .../geode/internal/tcp/DirectReplySender.java      |  3 +-
 .../apache/geode/internal/tcp/MsgOutputStream.java |  4 +-
 .../org/apache/geode/internal/tcp/MsgReader.java   |  8 +--
 .../org/apache/geode/internal/tcp/MsgStreamer.java | 31 +++++----
 .../org/apache/geode/internal/tcp/TCPConduit.java  |  5 ++
 .../geode/internal/tcp/VersionedMsgStreamer.java   |  5 +-
 .../net/{BuffersTest.java => BufferPoolTest.java}  | 26 +++++---
 .../geode/internal/net/NioPlainEngineTest.java     | 20 +++---
 .../geode/internal/net/NioSslEngineTest.java       | 18 +++---
 .../geode/internal/tcp/ConnectionJUnitTest.java    |  5 +-
 20 files changed, 173 insertions(+), 197 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 8e27671..5a09285 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -216,7 +216,7 @@ public class SSLSocketIntegrationTest {
     NioSslEngine engine =
         clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
             clusterSocketCreator.createSSLEngine("localhost", 1234), 0, true,
-            ByteBuffer.allocate(65535), mock(DMStats.class));
+            ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
     clientChannel.configureBlocking(true);
 
     // transmit expected string from Client to Server
@@ -264,7 +264,7 @@ public class SSLSocketIntegrationTest {
                 timeoutMillis,
                 false,
                 ByteBuffer.allocate(500),
-                mock(DMStats.class));
+                new BufferPool(mock(DMStats.class)));
 
         readMessageFromNIOSSLClient(socket, buffer, engine);
         readMessageFromNIOSSLClient(socket, buffer, engine);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index 91c47e2..13a77fb 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -25,7 +25,6 @@ import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
 import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
 import org.apache.geode.internal.util.Breadcrumbs;
 
@@ -954,7 +953,6 @@ public class DistributionStats implements DMStats {
     // this.replyWaitHistogram = new HistogramStats("ReplyWait", "nanoseconds", f,
     // new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000},
     // false);
-    Buffers.initBufferStats(this);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index 7d6d046..ecf37f2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -373,7 +373,8 @@ public class DirectChannel {
           DMStats stats = getDMStats();
           List<?> sentCons; // used for cons we sent to this time
 
-          final BaseMsgStreamer ms = MsgStreamer.create(cons, msg, directReply, stats);
+          final BaseMsgStreamer ms =
+              MsgStreamer.create(cons, msg, directReply, stats, getConduit().getBufferPool());
           try {
             startTime = 0;
             if (ackTimeout > 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
similarity index 73%
rename from geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index c77803d..d796ed6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -19,11 +19,12 @@ import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.Assert;
 
-public class Buffers {
+public class BufferPool {
+  private final DMStats stats;
+
   /**
    * Buffers may be acquired from the Buffers pool
    * or they may be allocated using Buffer.allocate(). This enum is used
@@ -34,11 +35,15 @@ public class Buffers {
     UNTRACKED, TRACKED_SENDER, TRACKED_RECEIVER
   }
 
+
+  public BufferPool(DMStats stats) {
+    this.stats = stats;
+  }
+
   /**
    * A list of soft references to byte buffers.
    */
-  @MakeNotStatic
-  private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+  private final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
       new ConcurrentLinkedQueue<>();
 
   /**
@@ -51,15 +56,15 @@ public class Buffers {
    *
    * @return a byte buffer to be used for sending on this connection.
    */
-  public static ByteBuffer acquireSenderBuffer(int size, DMStats stats) {
-    return acquireBuffer(size, stats, true);
+  public ByteBuffer acquireSenderBuffer(int size) {
+    return acquireBuffer(size, true);
   }
 
-  public static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) {
-    return acquireBuffer(size, stats, false);
+  public ByteBuffer acquireReceiveBuffer(int size) {
+    return acquireBuffer(size, false);
   }
 
-  private static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) {
+  private ByteBuffer acquireBuffer(int size, boolean send) {
     ByteBuffer result;
     if (useDirectBuffers) {
       IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
@@ -109,19 +114,19 @@ public class Buffers {
     return result;
   }
 
-  public static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) {
-    releaseBuffer(bb, stats, true);
+  public void releaseSenderBuffer(ByteBuffer bb) {
+    releaseBuffer(bb, true);
   }
 
-  public static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) {
-    releaseBuffer(bb, stats, false);
+  public void releaseReceiveBuffer(ByteBuffer bb) {
+    releaseBuffer(bb, false);
   }
 
   /**
    * expand a buffer that's currently being read from
    */
-  static ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
-      int desiredCapacity, DMStats stats) {
+  ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
+      int desiredCapacity) {
     if (existing.capacity() >= desiredCapacity) {
       if (existing.position() > 0) {
         existing.compact();
@@ -129,51 +134,51 @@ public class Buffers {
       }
       return existing;
     }
-    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity);
     newBuffer.clear();
     newBuffer.put(existing);
     newBuffer.flip();
-    releaseBuffer(type, existing, stats);
+    releaseBuffer(type, existing);
     return newBuffer;
   }
 
   /**
    * expand a buffer that's currently being written to
    */
-  static ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
-      int desiredCapacity, DMStats stats) {
+  ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
+      int desiredCapacity) {
     if (existing.capacity() >= desiredCapacity) {
       return existing;
     }
-    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+    ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity);
     newBuffer.clear();
     existing.flip();
     newBuffer.put(existing);
-    releaseBuffer(type, existing, stats);
+    releaseBuffer(type, existing);
     return newBuffer;
   }
 
-  static ByteBuffer acquireBuffer(Buffers.BufferType type, int capacity, DMStats stats) {
+  ByteBuffer acquireBuffer(BufferPool.BufferType type, int capacity) {
     switch (type) {
       case UNTRACKED:
         return ByteBuffer.allocate(capacity);
       case TRACKED_SENDER:
-        return Buffers.acquireSenderBuffer(capacity, stats);
+        return acquireSenderBuffer(capacity);
       case TRACKED_RECEIVER:
-        return Buffers.acquireReceiveBuffer(capacity, stats);
+        return acquireReceiveBuffer(capacity);
     }
     throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
   }
 
-  static void releaseBuffer(Buffers.BufferType type, ByteBuffer buffer, DMStats stats) {
+  void releaseBuffer(BufferPool.BufferType type, ByteBuffer buffer) {
     switch (type) {
       case UNTRACKED:
         return;
       case TRACKED_SENDER:
-        Buffers.releaseSenderBuffer(buffer, stats);
+        releaseSenderBuffer(buffer);
         return;
       case TRACKED_RECEIVER:
-        Buffers.releaseReceiveBuffer(buffer, stats);
+        releaseReceiveBuffer(buffer);
         return;
     }
     throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
@@ -183,7 +188,7 @@ public class Buffers {
   /**
    * Releases a previously acquired buffer.
    */
-  private static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) {
+  private void releaseBuffer(ByteBuffer bb, boolean send) {
     if (useDirectBuffers) {
       BBSoftReference bbRef = new BBSoftReference(bb, send);
       bufferQueue.offer(bbRef);
@@ -196,20 +201,6 @@ public class Buffers {
     }
   }
 
-  public static void initBufferStats(DMStats stats) { // fixes 46773
-    if (useDirectBuffers) {
-      for (BBSoftReference ref : bufferQueue) {
-        if (ref.getBB() != null) {
-          if (ref.getSend()) { // fix bug 46773
-            stats.incSenderBufferSize(ref.getSize(), true);
-          } else {
-            stats.incReceiverBufferSize(ref.getSize(), true);
-          }
-        }
-      }
-    }
-  }
-
   /**
    * A soft reference that remembers the size of the byte buffer it refers to. TODO Dan - I really
    * think this should be a weak reference. The JVM doesn't seem to clear soft references if it is
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 6cb40ec..8e41ef1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -18,8 +18,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
-import org.apache.geode.distributed.internal.DMStats;
-
 /**
  * Prior to transmitting a buffer or processing a received buffer
  * a NioFilter should be called to wrap (transmit) or unwrap (received)
@@ -44,7 +42,7 @@ public interface NioFilter {
    * This must be invoked before readAtLeast. A new buffer may be returned by this method.
    */
   ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType, DMStats stats);
+      BufferPool.BufferType bufferType);
 
   /**
    * read at least the indicated amount of bytes from the given
@@ -55,8 +53,8 @@ public interface NioFilter {
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer,
-      DMStats stats) throws IOException;
+  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+      throws IOException;
 
   /**
    * You must invoke this when done reading from the unwrapped buffer
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 8a3e3fb..32fa297 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -20,24 +20,22 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.logging.LogService;
 
 /**
  * A pass-through implementation of NioFilter. Use this if you don't need
  * secure communications.
  */
 public class NioPlainEngine implements NioFilter {
-  private static final Logger logger = LogService.getLogger();
+  private final BufferPool bufferPool;
 
   int lastReadPosition;
   int lastProcessedPosition;
 
 
-  public NioPlainEngine() {}
+  public NioPlainEngine(BufferPool bufferPool) {
+    this.bufferPool = bufferPool;
+  }
 
   @Override
   public ByteBuffer wrap(ByteBuffer buffer) {
@@ -52,11 +50,11 @@ public class NioPlainEngine implements NioFilter {
 
   @Override
   public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType, DMStats stats) {
+      BufferPool.BufferType bufferType) {
     ByteBuffer buffer = wrappedBuffer;
 
     if (buffer == null) {
-      buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+      buffer = bufferPool.acquireBuffer(bufferType, amount);
       buffer.clear();
       lastProcessedPosition = 0;
       lastReadPosition = 0;
@@ -73,10 +71,10 @@ public class NioPlainEngine implements NioFilter {
       ByteBuffer oldBuffer = buffer;
       oldBuffer.limit(lastReadPosition);
       oldBuffer.position(lastProcessedPosition);
-      buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+      buffer = bufferPool.acquireBuffer(bufferType, amount);
       buffer.clear();
       buffer.put(oldBuffer);
-      Buffers.releaseBuffer(bufferType, oldBuffer, stats);
+      bufferPool.releaseBuffer(bufferType, oldBuffer);
       lastReadPosition = buffer.position();
       lastProcessedPosition = 0;
     }
@@ -84,8 +82,8 @@ public class NioPlainEngine implements NioFilter {
   }
 
   @Override
-  public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer,
-      DMStats stats) throws IOException {
+  public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
+      throws IOException {
     ByteBuffer buffer = wrappedBuffer;
 
     Assert.assertTrue(buffer.capacity() - lastProcessedPosition >= bytes);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index dd71d75..9bf969d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -19,9 +19,8 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
 import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
 import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
 import static javax.net.ssl.SSLEngineResult.Status.OK;
-import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_RECEIVER;
-import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_SENDER;
-import static org.apache.geode.internal.net.Buffers.releaseBuffer;
+import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
+import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -41,8 +40,8 @@ import javax.net.ssl.SSLSession;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.BufferPool.BufferType;
 
 
 /**
@@ -53,7 +52,7 @@ import org.apache.geode.internal.logging.LogService;
 public class NioSslEngine implements NioFilter {
   private static final Logger logger = LogService.getLogger();
 
-  private final DMStats stats;
+  private final BufferPool bufferPool;
 
   private volatile boolean closed;
 
@@ -74,14 +73,14 @@ public class NioSslEngine implements NioFilter {
    */
   ByteBuffer handshakeBuffer;
 
-  NioSslEngine(SSLEngine engine, DMStats stats) {
-    this.stats = stats;
+  NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
     SSLSession session = engine.getSession();
     int appBufferSize = session.getApplicationBufferSize();
     int packetBufferSize = engine.getSession().getPacketBufferSize();
     this.myNetData = ByteBuffer.allocate(packetBufferSize);
     this.peerAppData = ByteBuffer.allocate(appBufferSize);
     this.engine = engine;
+    this.bufferPool = bufferPool;
   }
 
   /**
@@ -97,7 +96,7 @@ public class NioSslEngine implements NioFilter {
         logger.debug("Allocating new buffer for SSL handshake");
       }
       this.handshakeBuffer =
-          Buffers.acquireReceiveBuffer(engine.getSession().getPacketBufferSize(), stats);
+          bufferPool.acquireReceiveBuffer(engine.getSession().getPacketBufferSize());
     } else {
       this.handshakeBuffer = peerNetData;
     }
@@ -154,8 +153,7 @@ public class NioSslEngine implements NioFilter {
 
           if (engineResult.getStatus() == BUFFER_OVERFLOW) {
             peerAppData =
-                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2,
-                    stats);
+                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2);
           }
           break;
 
@@ -172,7 +170,7 @@ public class NioSslEngine implements NioFilter {
             case BUFFER_OVERFLOW:
               myNetData =
                   expandWriteBuffer(TRACKED_SENDER, myNetData,
-                      myNetData.capacity() * 2, stats);
+                      myNetData.capacity() * 2);
               break;
             case OK:
               myNetData.flip();
@@ -216,9 +214,9 @@ public class NioSslEngine implements NioFilter {
     return true;
   }
 
-  ByteBuffer expandWriteBuffer(Buffers.BufferType type, ByteBuffer existing,
-      int desiredCapacity, DMStats stats) {
-    return Buffers.expandWriteBufferIfNeeded(type, existing, desiredCapacity, stats);
+  ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
+      int desiredCapacity) {
+    return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
   }
 
   void checkClosed() {
@@ -248,7 +246,7 @@ public class NioSslEngine implements NioFilter {
 
       if (remaining < (appData.remaining() * 2)) {
         int newCapacity = expandedCapacity(appData, myNetData);
-        myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity, stats);
+        myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity);
       }
 
       SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
@@ -303,27 +301,27 @@ public class NioSslEngine implements NioFilter {
   void expandPeerAppData(ByteBuffer wrappedBuffer) {
     if (peerAppData.capacity() - peerAppData.position() < 2 * wrappedBuffer.remaining()) {
       peerAppData =
-          Buffers.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData,
-              expandedCapacity(wrappedBuffer, peerAppData), stats);
+          bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData,
+              expandedCapacity(wrappedBuffer, peerAppData));
     }
   }
 
   @Override
   public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
-      Buffers.BufferType bufferType, DMStats stats) {
+      BufferType bufferType) {
     ByteBuffer buffer = wrappedBuffer;
     int requiredSize = engine.getSession().getPacketBufferSize();
     if (buffer == null) {
-      buffer = Buffers.acquireBuffer(bufferType, requiredSize, stats);
+      buffer = bufferPool.acquireBuffer(bufferType, requiredSize);
     } else if (buffer.capacity() < requiredSize) {
-      buffer = Buffers.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize, stats);
+      buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize);
     }
     return buffer;
   }
 
   @Override
   public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
-      ByteBuffer wrappedBuffer, DMStats stats) throws IOException {
+      ByteBuffer wrappedBuffer) throws IOException {
     if (peerAppData.capacity() > bytes) {
       // we already have a buffer that's big enough
       if (peerAppData.capacity() - peerAppData.position() < bytes) {
@@ -332,7 +330,7 @@ public class NioSslEngine implements NioFilter {
       }
     } else {
       peerAppData =
-          Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
+          bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes);
     }
 
     while (peerAppData.remaining() < bytes) {
@@ -367,7 +365,7 @@ public class NioSslEngine implements NioFilter {
     // for TTLS the app-data buffers do not need to be tracked direct-buffers since we
     // do not use them for I/O operations
     peerAppData =
-        Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount, this.stats);
+        bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount);
     return peerAppData;
   }
 
@@ -405,8 +403,8 @@ public class NioSslEngine implements NioFilter {
     } catch (IOException e) {
       throw new GemFireIOException("exception closing SSL session", e);
     } finally {
-      releaseBuffer(TRACKED_SENDER, myNetData, stats);
-      releaseBuffer(TRACKED_RECEIVER, peerAppData, stats);
+      bufferPool.releaseBuffer(TRACKED_SENDER, myNetData);
+      bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData);
       this.closed = true;
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index 412c423..bc136f5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -84,7 +84,6 @@ import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
 import org.apache.geode.distributed.ClientSocketFactory;
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -940,7 +939,7 @@ public class SocketCreator {
       int timeout,
       boolean clientSocket,
       ByteBuffer peerNetBuffer,
-      DMStats stats)
+      BufferPool bufferPool)
       throws IOException {
     engine.setUseClientMode(clientSocket);
     while (!socketChannel.finishConnect()) {
@@ -954,7 +953,7 @@ public class SocketCreator {
       }
     }
 
-    NioSslEngine nioSslEngine = new NioSslEngine(engine, stats);
+    NioSslEngine nioSslEngine = new NioSslEngine(engine, bufferPool);
 
     boolean blocking = socketChannel.isBlocking();
     if (blocking) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 2ba313e..a9cb8d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,8 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -78,7 +76,7 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
@@ -593,7 +591,7 @@ public class Connection implements Runnable {
     bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
     int allocSize = bytes.length;
     ByteBuffer bb;
-    if (Buffers.useDirectBuffers) {
+    if (BufferPool.useDirectBuffers) {
       bb = ByteBuffer.allocateDirect(allocSize);
     } else {
       bb = ByteBuffer.allocate(allocSize);
@@ -637,7 +635,7 @@ public class Connection implements Runnable {
     if (this.isReceiver) {
       DistributionConfig cfg = owner.getConduit().config;
       ByteBuffer bb;
-      if (Buffers.useDirectBuffers) {
+      if (BufferPool.useDirectBuffers) {
         bb = ByteBuffer.allocateDirect(128);
       } else {
         bb = ByteBuffer.allocate(128);
@@ -1200,7 +1198,7 @@ public class Connection implements Runnable {
   private BatchBufferFlusher batchFlusher;
 
   private void createBatchSendBuffer() {
-    if (Buffers.useDirectBuffers) {
+    if (BufferPool.useDirectBuffers) {
       this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
       this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
     } else {
@@ -1613,10 +1611,14 @@ public class Connection implements Runnable {
     if (tmp != null) {
       this.inputBuffer = null;
       final DMStats stats = this.owner.getConduit().getStats();
-      Buffers.releaseReceiveBuffer(tmp, stats);
+      getBufferPool().releaseReceiveBuffer(tmp);
     }
   }
 
+  BufferPool getBufferPool() {
+    return owner.getBufferPool();
+  }
+
   private String p2pReaderName() {
     StringBuilder sb = new StringBuilder(64);
     if (this.isReceiver) {
@@ -1840,9 +1842,9 @@ public class Connection implements Runnable {
           || (inputBuffer.capacity() < packetBufferSize)) {
         // TLS has a minimum input buffer size constraint
         if (inputBuffer != null) {
-          Buffers.releaseReceiveBuffer(inputBuffer, getConduit().getStats());
+          getBufferPool().releaseReceiveBuffer(inputBuffer);
         }
-        inputBuffer = Buffers.acquireReceiveBuffer(packetBufferSize, getConduit().getStats());
+        inputBuffer = getBufferPool().acquireReceiveBuffer(packetBufferSize);
       }
       if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
         channel.socket().setReceiveBufferSize(packetBufferSize);
@@ -1851,9 +1853,10 @@ public class Connection implements Runnable {
         channel.socket().setSendBufferSize(packetBufferSize);
       }
       ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
-          getConduit().idleConnectionTimeout, clientSocket, inputBuffer, getConduit().getStats());
+          getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
+          getBufferPool());
     } else {
-      ioFilter = new NioPlainEngine();
+      ioFilter = new NioPlainEngine(getBufferPool());
     }
   }
 
@@ -1958,42 +1961,6 @@ public class Connection implements Runnable {
   }
 
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE")
-  void readFully(InputStream input, byte[] buffer, int len) throws IOException {
-    int bytesSoFar = 0;
-    while (bytesSoFar < len) {
-      this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-      try {
-        synchronized (stateLock) {
-          connectionState = STATE_READING;
-        }
-        int bytesThisTime = input.read(buffer, bytesSoFar, len - bytesSoFar);
-        if (bytesThisTime < 0) {
-          this.readerShuttingDown = true;
-          try {
-            requestClose("Stream read returned non-positive length");
-          } catch (Exception ignored) {
-          }
-          return;
-        }
-        bytesSoFar += bytesThisTime;
-      } catch (InterruptedIOException io) {
-        // Current thread has been interrupted. Regard it similar to an EOF
-        this.readerShuttingDown = true;
-        try {
-          requestClose("Current thread interrupted");
-        } catch (Exception ignored) {
-        }
-        Thread.currentThread().interrupt();
-        this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-      } finally {
-        synchronized (stateLock) {
-          connectionState = STATE_IDLE;
-        }
-      }
-    } // while
-  }
-
   /**
    * sends a serialized message to the other end of this connection. This is used by the
    * DirectChannel in GemFire when the message is going to be sent to multiple recipients.
@@ -2011,7 +1978,7 @@ public class Connection implements Runnable {
       return;
     }
     final boolean origSocketInUse = this.socketInUse;
-    byte originalState = -1;
+    byte originalState;
     synchronized (stateLock) {
       originalState = this.connectionState;
       this.connectionState = STATE_SENDING;
@@ -2237,7 +2204,6 @@ public class Connection implements Runnable {
                 ck.setBuffer(oldBuffer);
               } else {
                 // old buffer was not large enough
-                oldBuffer = null;
                 ByteBuffer newbb = ByteBuffer.allocate(newBytes);
                 newbb.put(buffer);
                 newbb.flip();
@@ -2787,7 +2753,7 @@ public class Connection implements Runnable {
       if (allocSize == -1) {
         allocSize = this.owner.getConduit().tcpBufferSize;
       }
-      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, this.owner.getConduit().getStats());
+      inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize);
     }
     return inputBuffer;
   }
@@ -3398,13 +3364,13 @@ public class Connection implements Runnable {
       logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
           allocSize, oldBufferSize);
       ByteBuffer oldBuffer = inputBuffer;
-      inputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
+      inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize);
 
       if (oldBuffer != null) {
         int oldByteCount = oldBuffer.remaining();
         inputBuffer.put(oldBuffer);
         inputBuffer.position(oldByteCount);
-        Buffers.releaseReceiveBuffer(oldBuffer, stats);
+        getBufferPool().releaseReceiveBuffer(oldBuffer);
       }
     } else {
       if (inputBuffer.position() != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index f3a4432..50c3cf8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -48,6 +48,7 @@ import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingExecutors;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.SocketCloser;
 
 /**
@@ -124,6 +125,8 @@ public class ConnectionTable {
    */
   private final TCPConduit owner;
 
+  private final BufferPool bufferPool;
+
   /**
    * true if this table is no longer in use
    */
@@ -199,6 +202,7 @@ public class ConnectionTable {
     this.threadConnectionMap = new ConcurrentHashMap();
     this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
     this.socketCloser = new SocketCloser();
+    this.bufferPool = new BufferPool(owner.getStats());
   }
 
   private Executor createThreadPoolForIO(boolean conserveSockets) {
@@ -611,6 +615,10 @@ public class ConnectionTable {
     return owner;
   }
 
+  public BufferPool getBufferPool() {
+    return bufferPool;
+  }
+
   public boolean isClosed() {
     return this.closed;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 73d1582..dbb828d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -65,7 +65,8 @@ class DirectReplySender implements ReplySender {
     }
     ArrayList<Connection> conns = new ArrayList<Connection>(1);
     conns.add(conn);
-    MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS);
+    MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS,
+        conn.getBufferPool());
     try {
       ms.writeMessage();
       ConnectExceptions ce = ms.getConnectExceptions();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
index 2d767b8..98669b3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 
 /**
  * MsgOutputStream should no longer be used except in Connection to do the handshake. Otherwise
@@ -38,7 +38,7 @@ public class MsgOutputStream extends OutputStream implements ObjToByteArraySeria
    * The caller of this constructor is responsible for managing the allocated instance.
    */
   public MsgOutputStream(int allocSize) {
-    if (Buffers.useDirectBuffers) {
+    if (BufferPool.useDirectBuffers) {
       this.buffer = ByteBuffer.allocateDirect(allocSize);
     } else {
       this.buffer = ByteBuffer.allocate(allocSize);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index afb0272..0a33428 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -27,7 +27,7 @@ import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.NioFilter;
 
 /**
@@ -125,13 +125,13 @@ public class MsgReader {
 
   private ByteBuffer readAtLeast(int bytes) throws IOException {
     peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
-        Buffers.BufferType.TRACKED_RECEIVER, getStats());
-    return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
+        BufferPool.BufferType.TRACKED_RECEIVER);
+    return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData);
   }
 
   public void close() {
     if (peerNetData != null) {
-      Buffers.releaseReceiveBuffer(peerNetData, getStats());
+      conn.getBufferPool().releaseReceiveBuffer(peerNetData);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 22a385b..d42bef1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -25,7 +25,6 @@ import java.util.List;
 import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
 import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
-import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.distributed.internal.DMStats;
@@ -37,8 +36,7 @@ import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.ObjToByteArraySerializer;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
 
 /**
  * <p>
@@ -52,13 +50,13 @@ import org.apache.geode.internal.net.Buffers;
 public class MsgStreamer extends OutputStream
     implements ObjToByteArraySerializer, BaseMsgStreamer, ByteBufferWriter {
 
-  private static final Logger logger = LogService.getLogger();
-
   /**
    * List of connections to send this msg to.
    */
   private final List<?> cons;
 
+  private final BufferPool bufferPool;
+
   /**
    * Any exceptions that happen during sends
    */
@@ -98,7 +96,7 @@ public class MsgStreamer extends OutputStream
     MsgIdGenerator.release(this.msgId);
     this.buffer.clear();
     this.overflowBuf = null;
-    Buffers.releaseSenderBuffer(this.buffer, this.stats);
+    bufferPool.releaseSenderBuffer(this.buffer);
   }
 
   /**
@@ -126,15 +124,16 @@ public class MsgStreamer extends OutputStream
    * now be used.
    */
   MsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats,
-      int sendBufferSize) {
+      int sendBufferSize, BufferPool bufferPool) {
     this.stats = stats;
     this.msg = msg;
     this.cons = cons;
-    this.buffer = Buffers.acquireSenderBuffer(sendBufferSize, stats);
+    this.buffer = bufferPool.acquireSenderBuffer(sendBufferSize);
     this.buffer.clear();
     this.buffer.position(Connection.MSG_HEADER_BYTES);
     this.msgId = MsgIdGenerator.NO_MSG_ID;
     this.directReply = directReply;
+    this.bufferPool = bufferPool;
     startSerialization();
   }
 
@@ -144,7 +143,7 @@ public class MsgStreamer extends OutputStream
    * List of MsgStreamer objects.
    */
   public static BaseMsgStreamer create(List<?> cons, final DistributionMessage msg,
-      final boolean directReply, final DMStats stats) {
+      final boolean directReply, final DMStats stats, BufferPool bufferPool) {
     final Connection firstCon = (Connection) cons.get(0);
     // split into different versions if required
     Version version;
@@ -170,7 +169,8 @@ public class MsgStreamer extends OutputStream
         }
       }
       if (versionToConnMap == null) {
-        return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize());
+        return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+            bufferPool);
       } else {
         // if there is a versioned stream created, then split remaining
         // connections to unversioned stream
@@ -187,7 +187,8 @@ public class MsgStreamer extends OutputStream
               unversionedCons.add(con);
             }
           }
-          streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize));
+          streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize,
+              bufferPool));
         }
         for (ObjectIterator<Object2ObjectMap.Entry> itr =
             versionToConnMap.object2ObjectEntrySet().fastIterator(); itr.hasNext();) {
@@ -195,15 +196,17 @@ public class MsgStreamer extends OutputStream
           Object ver = entry.getKey();
           Object l = entry.getValue();
           streamers.add(new VersionedMsgStreamer((List<?>) l, msg, directReply, stats,
-              sendBufferSize, (Version) ver));
+              bufferPool, sendBufferSize, (Version) ver));
         }
         return new MsgStreamerList(streamers);
       }
     } else if ((version = firstCon.getRemoteVersion()) == null) {
-      return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize());
+      return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+          bufferPool);
     } else {
       // create a single VersionedMsgStreamer
-      return new VersionedMsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+      return new VersionedMsgStreamer(cons, msg, directReply, stats, bufferPool,
+          firstCon.getSendBufferSize(),
           version);
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 97d748f..699c706 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThread;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
@@ -975,6 +976,10 @@ public class TCPConduit implements Runnable {
     return useSSL;
   }
 
+  public BufferPool getBufferPool() {
+    return this.conTable.getBufferPool();
+  }
+
   protected class Stopper extends CancelCriterion {
 
     @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
index 4fe7b32..72b68f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
@@ -21,6 +21,7 @@ import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.net.BufferPool;
 
 /**
  * An extension of {@link MsgStreamer} that implements {@link VersionedDataStream}.
@@ -32,8 +33,8 @@ class VersionedMsgStreamer extends MsgStreamer implements VersionedDataStream {
   private final Version version;
 
   VersionedMsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats,
-      int sendBufferSize, Version version) {
-    super(cons, msg, directReply, stats, sendBufferSize);
+      BufferPool bufferPool, int sendBufferSize, Version version) {
+    super(cons, msg, directReply, stats, sendBufferSize, bufferPool);
     this.version = version;
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
similarity index 84%
rename from geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index 96a4ac6..cc441e4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -21,11 +21,19 @@ import static org.mockito.Mockito.mock;
 
 import java.nio.ByteBuffer;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.distributed.internal.DMStats;
 
-public class BuffersTest {
+public class BufferPoolTest {
+
+  private BufferPool bufferPool;
+
+  @Before
+  public void setup() {
+    bufferPool = new BufferPool(mock(DMStats.class));
+  }
 
   @Test
   public void expandBuffer() throws Exception {
@@ -50,8 +58,7 @@ public class BuffersTest {
   private void createAndVerifyNewWriteBuffer(ByteBuffer buffer, boolean useDirectBuffer) {
     buffer.position(buffer.capacity());
     ByteBuffer newBuffer =
-        Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500,
-            mock(DMStats.class));
+        bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500);
     assertEquals(buffer.position(), newBuffer.position());
     assertEquals(500, newBuffer.capacity());
     newBuffer.flip();
@@ -66,8 +73,7 @@ public class BuffersTest {
     buffer.position(0);
     buffer.limit(256);
     ByteBuffer newBuffer =
-        Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500,
-            mock(DMStats.class));
+        bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500);
     assertEquals(0, newBuffer.position());
     assertEquals(500, newBuffer.capacity());
     for (int i = 0; i < 256; i++) {
@@ -84,8 +90,9 @@ public class BuffersTest {
     ByteBuffer buffer = ByteBuffer.allocate(33842);
     buffer.position(7);
     buffer.limit(16384);
-    ByteBuffer newBuffer = Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer,
-        40899, mock(DMStats.class));
+    ByteBuffer newBuffer =
+        bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer,
+            40899);
     assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899);
     // buffer should be ready to read the same amount of data
     assertThat(newBuffer.position()).isEqualTo(0);
@@ -98,8 +105,9 @@ public class BuffersTest {
     ByteBuffer buffer = ByteBuffer.allocate(33842);
     buffer.position(16384);
     buffer.limit(buffer.capacity());
-    ByteBuffer newBuffer = Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer,
-        40899, mock(DMStats.class));
+    ByteBuffer newBuffer =
+        bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer,
+            40899);
     assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899);
     // buffer should have the same amount of data as the old one
     assertThat(newBuffer.position()).isEqualTo(16384);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index 5fe4def..133d827 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -36,17 +36,15 @@ import org.apache.geode.distributed.internal.DMStats;
 
 public class NioPlainEngineTest {
 
-  private static final int netBufferSize = 10000;
-  private static final int appBufferSize = 20000;
-
   private DMStats mockStats;
   private NioPlainEngine nioEngine;
+  private BufferPool bufferPool;
 
   @Before
   public void setUp() throws Exception {
     mockStats = mock(DMStats.class);
-
-    nioEngine = new NioPlainEngine();
+    bufferPool = new BufferPool(mockStats);
+    nioEngine = new NioPlainEngine(bufferPool);
   }
 
   @Test
@@ -60,13 +58,13 @@ public class NioPlainEngineTest {
   @Test
   @Ignore("Pending fix of GEODE-6733 to remove static from Buffers implementation")
   public void ensureWrappedCapacity() {
-    ByteBuffer wrappedBuffer = Buffers.acquireReceiveBuffer(100, mockStats);
+    ByteBuffer wrappedBuffer = bufferPool.acquireReceiveBuffer(100);
     verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
     wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
     nioEngine.lastReadPosition = 10;
     int requestedCapacity = 210;
     ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
-        Buffers.BufferType.TRACKED_RECEIVER, mockStats);
+        BufferPool.BufferType.TRACKED_RECEIVER);
     verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
     assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
     assertThat(result).isNotSameAs(wrappedBuffer);
@@ -90,7 +88,7 @@ public class NioPlainEngineTest {
     nioEngine.lastReadPosition = consumedDataPresentInBuffer + unconsumedDataPresentInBuffer;
     ByteBuffer result =
         wrappedBuffer = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
-            Buffers.BufferType.UNTRACKED, mockStats);
+            BufferPool.BufferType.UNTRACKED);
     assertThat(result.capacity()).isEqualTo(requestedCapacity + unconsumedDataPresentInBuffer);
     assertThat(result).isSameAs(wrappedBuffer);
     // make sure that data was transferred to the new buffer
@@ -121,14 +119,14 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
     assertThat(data.position()).isEqualTo(0);
     assertThat(data.limit()).isEqualTo(amountToRead);
     assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
     assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
 
-    data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(5)).read(any(ByteBuffer.class));
     // at end of last readAtLeast data
     assertThat(data.position()).isEqualTo(amountToRead);
@@ -152,7 +150,7 @@ public class NioPlainEngineTest {
 
     nioEngine.lastReadPosition = 10;
 
-    nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index b12df09..7236895 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -80,7 +80,7 @@ public class NioSslEngineTest {
 
     mockStats = mock(DMStats.class);
 
-    nioSslEngine = new NioSslEngine(mockEngine, mockStats);
+    nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats));
     spyNioSslEngine = spy(nioSslEngine);
   }
 
@@ -113,8 +113,8 @@ public class NioSslEngineTest {
     verify(mockEngine, atLeast(2)).getHandshakeStatus();
     verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
     verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
-    verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(Buffers.BufferType.class),
-        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class),
+        any(ByteBuffer.class), any(Integer.class));
     verify(spyNioSslEngine, times(1)).handleBlockingTasks();
     verify(mockChannel, times(3)).read(any(ByteBuffer.class));
   }
@@ -228,8 +228,8 @@ public class NioSslEngineTest {
 
     ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
 
-    verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(Buffers.BufferType.class),
-        any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+    verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class),
+        any(ByteBuffer.class), any(Integer.class));
     appData.flip();
     assertThat(wrappedBuffer).isEqualTo(appData);
     verify(spyNioSslEngine, times(1)).handleBlockingTasks();
@@ -376,14 +376,14 @@ public class NioSslEngineTest {
   public void ensureWrappedCapacityOfSmallMessage() {
     ByteBuffer buffer = ByteBuffer.allocate(netBufferSize);
     assertThat(
-        nioSslEngine.ensureWrappedCapacity(10, buffer, Buffers.BufferType.UNTRACKED, mockStats))
+        nioSslEngine.ensureWrappedCapacity(10, buffer, BufferPool.BufferType.UNTRACKED))
             .isEqualTo(buffer);
   }
 
   @Test
   public void ensureWrappedCapacityWithNoBuffer() {
     assertThat(
-        nioSslEngine.ensureWrappedCapacity(10, null, Buffers.BufferType.UNTRACKED, mockStats)
+        nioSslEngine.ensureWrappedCapacity(10, null, BufferPool.BufferType.UNTRACKED)
             .capacity())
                 .isEqualTo(netBufferSize);
   }
@@ -415,7 +415,7 @@ public class NioSslEngineTest {
     testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
     assertThat(data.position()).isEqualTo(0);
     assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
@@ -459,7 +459,7 @@ public class NioSslEngineTest {
         new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
     nioSslEngine.engine = testSSLEngine;
 
-    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+    ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
     verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
     assertThat(data.position()).isEqualTo(0);
     assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
index 73cf06c..854685f 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
@@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
+import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.SocketCloser;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.test.junit.categories.MembershipTest;
@@ -52,10 +53,12 @@ public class ConnectionJUnitTest {
     DistributionManager distMgr = mock(DistributionManager.class);
     MembershipManager membership = mock(MembershipManager.class);
     TCPConduit conduit = mock(TCPConduit.class);
+    DMStats stats = mock(DMStats.class);
 
     // mock the connection table and conduit
 
     when(table.getConduit()).thenReturn(conduit);
+    when(table.getBufferPool()).thenReturn(new BufferPool(stats));
 
     CancelCriterion stopper = mock(CancelCriterion.class);
     when(stopper.cancelInProgress()).thenReturn(null);
@@ -67,7 +70,7 @@ public class ConnectionJUnitTest {
     // mock the distribution manager and membership manager
     when(distMgr.getMembershipManager()).thenReturn(membership);
     when(conduit.getDM()).thenReturn(distMgr);
-    when(conduit.getStats()).thenReturn(mock(DMStats.class));
+    when(conduit.getStats()).thenReturn(stats);
     when(table.getDM()).thenReturn(distMgr);
     SocketCloser closer = mock(SocketCloser.class);
     when(table.getSocketCloser()).thenReturn(closer);