You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mi...@apache.org on 2020/04/29 05:22:52 UTC
[geode] branch develop updated: GEODE-6636: Create multiple buffer
pools (#4234)
This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new e7b0186 GEODE-6636: Create multiple buffer pools (#4234)
e7b0186 is described below
commit e7b018623430a959c4edbf856934588dacea7392
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Wed Apr 29 07:22:17 2020 +0200
GEODE-6636: Create multiple buffer pools (#4234)
* GEODE-6636: Create multiple buffer pools
* GEODE-6636: Remove new alerts
* GEODE-6636: Bug fix
* GEODE-6636: Update after review
* GEODE-6636: Added SMALL, MEDIUM constants
* GEODE-6636: Fix non-direct buffer added to direct buffer pool
* GEODE-6636: Update after rebase
* GEODE-6636: Update after rebase
---
.../org/apache/geode/internal/net/BufferPool.java | 172 +++++++++++++++------
.../org/apache/geode/internal/tcp/Connection.java | 2 +-
.../apache/geode/internal/net/BufferPoolTest.java | 48 ++++++
.../geode/internal/net/NioPlainEngineTest.java | 4 +-
.../geode/internal/net/NioSslEngineTest.java | 4 +-
5 files changed, 179 insertions(+), 51 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 0997c6e..c156c2c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -20,7 +20,9 @@ import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.tcp.Connection;
public class BufferPool {
private final DMStats stats;
@@ -41,12 +43,30 @@ public class BufferPool {
}
/**
- * A list of soft references to byte buffers.
+ * A list of soft references to small byte buffers.
*/
- private final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+ private final ConcurrentLinkedQueue<BBSoftReference> bufferSmallQueue =
new ConcurrentLinkedQueue<>();
/**
+ * A list of soft references to middle byte buffers.
+ */
+ private final ConcurrentLinkedQueue<BBSoftReference> bufferMiddleQueue =
+ new ConcurrentLinkedQueue<>();
+
+ /**
+ * A list of soft references to large byte buffers.
+ */
+ private final ConcurrentLinkedQueue<BBSoftReference> bufferLargeQueue =
+ new ConcurrentLinkedQueue<>();
+
+ private final int SMALL_BUFFER_SIZE = Connection.SMALL_BUFFER_SIZE;
+
+
+ private final int MEDIUM_BUFFER_SIZE = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+
+
+ /**
* use direct ByteBuffers instead of heap ByteBuffers for NIO operations
*/
public static final boolean useDirectBuffers = !Boolean.getBoolean("p2p.nodirectBuffers");
@@ -69,51 +89,18 @@ public class BufferPool {
*/
private ByteBuffer acquireDirectBuffer(int size, boolean send) {
ByteBuffer result;
+
if (useDirectBuffers) {
- IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
- // set
- BBSoftReference ref = bufferQueue.poll();
- while (ref != null) {
- ByteBuffer bb = ref.getBB();
- if (bb == null) {
- // it was garbage collected
- int refSize = ref.consumeSize();
- if (refSize > 0) {
- if (ref.getSend()) { // fix bug 46773
- stats.incSenderBufferSize(-refSize, true);
- } else {
- stats.incReceiverBufferSize(-refSize, true);
- }
- }
- } else if (bb.capacity() >= size) {
- bb.rewind();
- bb.limit(size);
- return bb;
- } else {
- // wasn't big enough so put it back in the queue
- Assert.assertTrue(bufferQueue.offer(ref));
- if (alreadySeen == null) {
- alreadySeen = new IdentityHashMap<>();
- }
- if (alreadySeen.put(ref, ref) != null) {
- // if it returns non-null then we have already seen this item
- // so we have worked all the way through the queue once.
- // So it is time to give up and allocate a new buffer.
- break;
- }
- }
- ref = bufferQueue.poll();
+ if (size <= MEDIUM_BUFFER_SIZE) {
+ return acquirePredefinedFixedBuffer(send, size);
+ } else {
+ return acquireLargeBuffer(send, size);
}
- result = ByteBuffer.allocateDirect(size);
} else {
// if we are using heap buffers then don't bother with keeping them around
result = ByteBuffer.allocate(size);
}
- if (send) {
- stats.incSenderBufferSize(size, useDirectBuffers);
- } else {
- stats.incReceiverBufferSize(size, useDirectBuffers);
- }
+ updateBufferStats(size, send, false);
return result;
}
@@ -129,6 +116,86 @@ public class BufferPool {
return result;
}
+ /**
+ * Acquire direct buffer with predefined default capacity (4096 or 32768)
+ */
+ private ByteBuffer acquirePredefinedFixedBuffer(boolean send, int size) {
+ // set
+ int defaultSize;
+ ConcurrentLinkedQueue<BBSoftReference> bufferTempQueue;
+ ByteBuffer result;
+
+ if (size <= SMALL_BUFFER_SIZE) {
+ defaultSize = SMALL_BUFFER_SIZE;
+ bufferTempQueue = bufferSmallQueue;
+ } else {
+ defaultSize = MEDIUM_BUFFER_SIZE;
+ bufferTempQueue = bufferMiddleQueue;
+ }
+
+ BBSoftReference ref = bufferTempQueue.poll();
+ while (ref != null) {
+ ByteBuffer bb = ref.getBB();
+ if (bb == null) {
+ // it was garbage collected
+ updateBufferStats(-defaultSize, ref.getSend(), true);
+ } else {
+ bb.clear();
+ if (defaultSize > size) {
+ bb.limit(size);
+ }
+ return bb;
+ }
+ ref = bufferTempQueue.poll();
+ }
+ result = ByteBuffer.allocateDirect(defaultSize);
+ updateBufferStats(defaultSize, send, true);
+ if (defaultSize > size) {
+ result.limit(size);
+ }
+ return result;
+ }
+
+ private ByteBuffer acquireLargeBuffer(boolean send, int size) {
+ // set
+ ByteBuffer result;
+ IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
+ // set
+ BBSoftReference ref = bufferLargeQueue.poll();
+ while (ref != null) {
+ ByteBuffer bb = ref.getBB();
+ if (bb == null) {
+ // it was garbage collected
+ int refSize = ref.consumeSize();
+ if (refSize > 0) {
+ updateBufferStats(-refSize, ref.getSend(), true);
+ }
+ } else if (bb.capacity() >= size) {
+ bb.clear();
+ if (bb.capacity() > size) {
+ bb.limit(size);
+ }
+ return bb;
+ } else {
+ // wasn't big enough so put it back in the queue
+ Assert.assertTrue(bufferLargeQueue.offer(ref));
+ if (alreadySeen == null) {
+ alreadySeen = new IdentityHashMap<>();
+ }
+ if (alreadySeen.put(ref, ref) != null) {
+ // if it returns non-null then we have already seen this item
+ // so we have worked all the way through the queue once.
+ // So it is time to give up and allocate a new buffer.
+ break;
+ }
+ }
+ ref = bufferLargeQueue.poll();
+ }
+ result = ByteBuffer.allocateDirect(size);
+ updateBufferStats(size, send, true);
+ return result;
+ }
+
public void releaseSenderBuffer(ByteBuffer bb) {
releaseBuffer(bb, true);
}
@@ -228,13 +295,26 @@ public class BufferPool {
private void releaseBuffer(ByteBuffer bb, boolean send) {
if (bb.isDirect()) {
BBSoftReference bbRef = new BBSoftReference(bb, send);
- bufferQueue.offer(bbRef);
- } else {
- if (send) {
- stats.incSenderBufferSize(-bb.capacity(), false);
+ if (bb.capacity() <= SMALL_BUFFER_SIZE) {
+ bufferSmallQueue.offer(bbRef);
+ } else if (bb.capacity() <= MEDIUM_BUFFER_SIZE) {
+ bufferMiddleQueue.offer(bbRef);
} else {
- stats.incReceiverBufferSize(-bb.capacity(), false);
+ bufferLargeQueue.offer(bbRef);
}
+ } else {
+ updateBufferStats(-bb.capacity(), send, false);
+ }
+ }
+
+ /**
+ * Update buffer stats.
+ */
+ private void updateBufferStats(int size, boolean send, boolean direct) {
+ if (send) {
+ stats.incSenderBufferSize(size, direct);
+ } else {
+ stats.incReceiverBufferSize(size, direct);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index f8f6932..089eb2e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -116,7 +116,7 @@ public class Connection implements Runnable {
* Small buffer used for send socket buffer on receiver connections and receive buffer on sender
* connections.
*/
- static final int SMALL_BUFFER_SIZE =
+ public static final int SMALL_BUFFER_SIZE =
Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
/**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index cc441e4..81382f0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -113,4 +113,52 @@ public class BufferPoolTest {
assertThat(newBuffer.position()).isEqualTo(16384);
assertThat(newBuffer.limit()).isEqualTo(newBuffer.capacity());
}
+
+
+ @Test
+ public void checkBufferSizeAfterAllocation() throws Exception {
+ ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+ ByteBuffer newBuffer =
+ bufferPool.acquireDirectReceiveBuffer(10000);
+ assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
+ assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+
+ // buffer should be ready to read the same amount of data
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.limit()).isEqualTo(100);
+ assertThat(newBuffer.position()).isEqualTo(0);
+ assertThat(newBuffer.limit()).isEqualTo(10000);
+ }
+
+ @Test
+ public void checkBufferSizeAfterAcquire() throws Exception {
+ ByteBuffer buffer = bufferPool.acquireDirectReceiveBuffer(100);
+
+ ByteBuffer newBuffer =
+ bufferPool.acquireDirectReceiveBuffer(10000);
+ assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
+ assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.limit()).isEqualTo(100);
+ assertThat(newBuffer.position()).isEqualTo(0);
+ assertThat(newBuffer.limit()).isEqualTo(10000);
+ bufferPool.releaseReceiveBuffer(buffer);
+ bufferPool.releaseReceiveBuffer(newBuffer);
+
+ buffer = bufferPool.acquireDirectReceiveBuffer(1000);
+
+ newBuffer =
+ bufferPool.acquireDirectReceiveBuffer(15000);
+
+ assertThat(buffer.capacity()).isGreaterThanOrEqualTo(4096);
+ assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(32768);
+
+ assertThat(buffer.position()).isEqualTo(0);
+ assertThat(buffer.limit()).isEqualTo(1000);
+ assertThat(newBuffer.position()).isEqualTo(0);
+ assertThat(newBuffer.limit()).isEqualTo(15000);
+ }
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index e9785de..b4eae2f 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -63,9 +63,9 @@ public class NioPlainEngineTest {
int requestedCapacity = 210;
ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
BufferPool.BufferType.TRACKED_RECEIVER);
- verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
+ verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
- assertThat(result).isNotSameAs(wrappedBuffer);
+ assertThat(result).isGreaterThanOrEqualTo(wrappedBuffer);
// make sure that data was transferred to the new buffer
for (int i = 0; i < 10; i++) {
assertThat(result.get(i)).isEqualTo(wrappedBuffer.get(i));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index 2c9be77..720ef62 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -62,8 +62,8 @@ import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
public class NioSslEngineTest {
- private static final int netBufferSize = 10000;
- private static final int appBufferSize = 20000;
+ private static final int netBufferSize = 4096;
+ private static final int appBufferSize = 32768;
private SSLEngine mockEngine;
private DMStats mockStats;