You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2020/04/29 05:22:52 UTC

[geode] branch develop updated: GEODE-6636: Create multiple buffer pools (#4234)

This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e7b0186  GEODE-6636: Create multiple buffer pools (#4234)
e7b0186 is described below

commit e7b018623430a959c4edbf856934588dacea7392
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Wed Apr 29 07:22:17 2020 +0200

    GEODE-6636: Create multiple buffer pools (#4234)
    
    * GEODE-6636: Create multiple buffer pools
    
    * GEODE-6636: Remove new alerts
    
    * GEODE-6636: Bug fix
    
    * GEODE-6636: Update after review
    
    * GEODE-6636: Added SMALL, MEDIUM constants
    
    * GEODE-6636: Fix non-direct buffer added to direct buffer pool
    
    * GEODE-6636: Update after rebase
    
    * GEODE-6636: Update after rebase
---
 .../org/apache/geode/internal/net/BufferPool.java  | 172 +++++++++++++++------
 .../org/apache/geode/internal/tcp/Connection.java  |   2 +-
 .../apache/geode/internal/net/BufferPoolTest.java  |  48 ++++++
 .../geode/internal/net/NioPlainEngineTest.java     |   4 +-
 .../geode/internal/net/NioSslEngineTest.java       |   4 +-
 5 files changed, 179 insertions(+), 51 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 0997c6e..c156c2c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -20,7 +20,9 @@ import java.util.IdentityHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.tcp.Connection;
 
 public class BufferPool {
   private final DMStats stats;
@@ -41,12 +43,30 @@ public class BufferPool {
   }
 
   /**
-   * A list of soft references to byte buffers.
+   * A list of soft references to small byte buffers.
    */
-  private final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+  private final ConcurrentLinkedQueue<BBSoftReference> bufferSmallQueue =
       new ConcurrentLinkedQueue<>();
 
   /**
+   * A list of soft references to middle byte buffers.
+   */
+  private final ConcurrentLinkedQueue<BBSoftReference> bufferMiddleQueue =
+      new ConcurrentLinkedQueue<>();
+
+  /**
+   * A list of soft references to large byte buffers.
+   */
+  private final ConcurrentLinkedQueue<BBSoftReference> bufferLargeQueue =
+      new ConcurrentLinkedQueue<>();
+
+  private final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE;
+
+
+  private final int MEDIUM_BUFFER_SIZE = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+
+
+  /**
    * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
    */
   public static final boolean useDirectBuffers = !Boolean.getBoolean("p2p.nodirectBuffers");
@@ -69,51 +89,18 @@ public class BufferPool {
    */
   private ByteBuffer acquireDirectBuffer(int size, boolean send) {
     ByteBuffer result;
+
     if (useDirectBuffers) {
-      IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
-                                                                            // set
-      BBSoftReference ref = bufferQueue.poll();
-      while (ref != null) {
-        ByteBuffer bb = ref.getBB();
-        if (bb == null) {
-          // it was garbage collected
-          int refSize = ref.consumeSize();
-          if (refSize > 0) {
-            if (ref.getSend()) { // fix bug 46773
-              stats.incSenderBufferSize(-refSize, true);
-            } else {
-              stats.incReceiverBufferSize(-refSize, true);
-            }
-          }
-        } else if (bb.capacity() >= size) {
-          bb.rewind();
-          bb.limit(size);
-          return bb;
-        } else {
-          // wasn't big enough so put it back in the queue
-          Assert.assertTrue(bufferQueue.offer(ref));
-          if (alreadySeen == null) {
-            alreadySeen = new IdentityHashMap<>();
-          }
-          if (alreadySeen.put(ref, ref) != null) {
-            // if it returns non-null then we have already seen this item
-            // so we have worked all the way through the queue once.
-            // So it is time to give up and allocate a new buffer.
-            break;
-          }
-        }
-        ref = bufferQueue.poll();
+      if (size <= MEDIUM_BUFFER_SIZE) {
+        return acquirePredefinedFixedBuffer(send, size);
+      } else {
+        return acquireLargeBuffer(send, size);
       }
-      result = ByteBuffer.allocateDirect(size);
     } else {
       // if we are using heap buffers then don't bother with keeping them around
       result = ByteBuffer.allocate(size);
     }
-    if (send) {
-      stats.incSenderBufferSize(size, useDirectBuffers);
-    } else {
-      stats.incReceiverBufferSize(size, useDirectBuffers);
-    }
+    updateBufferStats(size, send, false);
     return result;
   }
 
@@ -129,6 +116,86 @@ public class BufferPool {
     return result;
   }
 
+  /**
+   * Acquire direct buffer with predefined default capacity (4096 or 32768)
+   */
+  private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) {
+    // set
+    int defaultSize;
+    ConcurrentLinkedQueue<BBSoftReference> bufferTempQueue;
+    ByteBuffer result;
+
+    if (size <= SMALL_BUFFER_SIZE) {
+      defaultSize = SMALL_BUFFER_SIZE;
+      bufferTempQueue = bufferSmallQueue;
+    } else {
+      defaultSize = MEDIUM_BUFFER_SIZE;
+      bufferTempQueue = bufferMiddleQueue;
+    }
+
+    BBSoftReference ref = bufferTempQueue.poll();
+    while (ref != null) {
+      ByteBuffer bb = ref.getBB();
+      if (bb == null) {
+        // it was garbage collected
+        updateBufferStats(-defaultSize, ref.getSend(), true);
+      } else {
+        bb.clear();
+        if (defaultSize > size) {
+          bb.limit(size);
+        }
+        return bb;
+      }
+      ref = bufferTempQueue.poll();
+    }
+    result = ByteBuffer.allocateDirect(defaultSize);
+    updateBufferStats(defaultSize, send, true);
+    if (defaultSize > size) {
+      result.limit(size);
+    }
+    return result;
+  }
+
+  private ByteBuffer acquireLargeBuffer(boolean send, int size) {
+    // set
+    ByteBuffer result;
+    IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
+    // set
+    BBSoftReference ref = bufferLargeQueue.poll();
+    while (ref != null) {
+      ByteBuffer bb = ref.getBB();
+      if (bb == null) {
+        // it was garbage collected
+        int refSize = ref.consumeSize();
+        if (refSize > 0) {
+          updateBufferStats(-refSize, ref.getSend(), true);
+        }
+      } else if (bb.capacity() >= size) {
+        bb.clear();
+        if (bb.capacity() > size) {
+          bb.limit(size);
+        }
+        return bb;
+      } else {
+        // wasn't big enough so put it back in the queue
+        Assert.assertTrue(bufferLargeQueue.offer(ref));
+        if (alreadySeen == null) {
+          alreadySeen = new IdentityHashMap<>();
+        }
+        if (alreadySeen.put(ref, ref) != null) {
+          // if it returns non-null then we have already seen this item
+          // so we have worked all the way through the queue once.
+          // So it is time to give up and allocate a new buffer.
+          break;
+        }
+      }
+      ref = bufferLargeQueue.poll();
+    }
+    result = ByteBuffer.allocateDirect(size);
+    updateBufferStats(size, send, true);
+    return result;
+  }
+
   public void releaseSenderBuffer(ByteBuffer bb) {
     releaseBuffer(bb, true);
   }
@@ -228,13 +295,26 @@ public class BufferPool {
   private void releaseBuffer(ByteBuffer bb, boolean send) {
     if (bb.isDirect()) {
       BBSoftReference bbRef = new BBSoftReference(bb, send);
-      bufferQueue.offer(bbRef);
-    } else {
-      if (send) {
-        stats.incSenderBufferSize(-bb.capacity(), false);
+      if (bb.capacity() <= SMALL_BUFFER_SIZE) {
+        bufferSmallQueue.offer(bbRef);
+      } else if (bb.capacity() <= MEDIUM_BUFFER_SIZE) {
+        bufferMiddleQueue.offer(bbRef);
       } else {
-        stats.incReceiverBufferSize(-bb.capacity(), false);
+        bufferLargeQueue.offer(bbRef);
       }
+    } else {
+      updateBufferStats(-bb.capacity(), send, false);
+    }
+  }
+
+  /**
+   * Update buffer stats.
+   */
+  private void updateBufferStats(int size, boolean send, boolean direct) {
+    if (send) {
+      stats.incSenderBufferSize(size, direct);
+    } else {
+      stats.incReceiverBufferSize(size, direct);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f8f6932..089eb2e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -116,7 +116,7 @@ public class Connection implements Runnable {
    * Small buffer used for send socket buffer on receiver connections and receive buffer on sender
    * connections.
    */
-  static final int SMALL_BUFFER_SIZE =
+  public static final int SMALL_BUFFER_SIZE =
       Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index cc441e4..81382f0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -113,4 +113,52 @@ public class BufferPoolTest {
     assertThat(newBuffer.position()).isEqualTo(16384);
     assertThat(newBuffer.limit()).isEqualTo(newBuffer.capacity());
   }
+
+
+  @Test
+  public void checkBufferSizeAfterAllocation() throws Exception {
+    ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+    ByteBuffer newBuffer =
+        bufferPool.acquireDirectReceiveBuffer(10000);
+    assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
+    assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+
+    // buffer should be ready to read the same amount of data
+    assertThat(buffer.position()).isEqualTo(0);
+    assertThat(buffer.limit()).isEqualTo(100);
+    assertThat(newBuffer.position()).isEqualTo(0);
+    assertThat(newBuffer.limit()).isEqualTo(10000);
+  }
+
+  @Test
+  public void checkBufferSizeAfterAcquire() throws Exception {
+    ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+    ByteBuffer newBuffer =
+        bufferPool.acquireDirectReceiveBuffer(10000);
+    assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
+    assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+
+    assertThat(buffer.position()).isEqualTo(0);
+    assertThat(buffer.limit()).isEqualTo(100);
+    assertThat(newBuffer.position()).isEqualTo(0);
+    assertThat(newBuffer.limit()).isEqualTo(10000);
+    bufferPool.releaseReceiveBuffer(buffer);
+    bufferPool.releaseReceiveBuffer(newBuffer);
+
+    buffer = bufferPool.acquireDirectReceiveBuffer(1000);
+
+    newBuffer =
+        bufferPool.acquireDirectReceiveBuffer(15000);
+
+    assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
+    assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+
+    assertThat(buffer.position()).isEqualTo(0);
+    assertThat(buffer.limit()).isEqualTo(1000);
+    assertThat(newBuffer.position()).isEqualTo(0);
+    assertThat(newBuffer.limit()).isEqualTo(15000);
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index e9785de..b4eae2f 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -63,9 +63,9 @@ public class NioPlainEngineTest {
     int requestedCapacity = 210;
     ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
         BufferPool.BufferType.TRACKED_RECEIVER);
-    verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
+    verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
     assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
-    assertThat(result).isNotSameAs(wrappedBuffer);
+    assertThat(result).isGreaterThanOrEqualTo(wrappedBuffer);
     // make sure that data was transferred to the new buffer
     for (int i = 0; i < 10; i++) {
       assertThat(result.get(i)).isEqualTo(wrappedBuffer.get(i));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index 2c9be77..720ef62 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -62,8 +62,8 @@ import org.apache.geode.test.junit.categories.MembershipTest;
 
 @Category({MembershipTest.class})
 public class NioSslEngineTest {
-  private static final int netBufferSize = 10000;
-  private static final int appBufferSize = 20000;
+  private static final int netBufferSize = 4096;
+  private static final int appBufferSize = 32768;
 
   private SSLEngine mockEngine;
   private DMStats mockStats;