You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/05/21 15:43:31 UTC
[geode] branch develop updated: GEODE-6733 Remove mutable static
org.apache.geode.internal.net.Buffers.buffersQueue
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 3ec8459 GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue
3ec8459 is described below
commit 3ec8459e9254b7b5a553965c40b75668f9e8b673
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue May 21 08:42:21 2019 -0700
GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue
Converted static Buffers class to be a non-static buffer pool.
---
.../internal/net/SSLSocketIntegrationTest.java | 4 +-
.../distributed/internal/DistributionStats.java | 2 -
.../distributed/internal/direct/DirectChannel.java | 3 +-
.../internal/net/{Buffers.java => BufferPool.java} | 75 ++++++++++------------
.../org/apache/geode/internal/net/NioFilter.java | 8 +--
.../apache/geode/internal/net/NioPlainEngine.java | 22 +++----
.../apache/geode/internal/net/NioSslEngine.java | 48 +++++++-------
.../apache/geode/internal/net/SocketCreator.java | 5 +-
.../org/apache/geode/internal/tcp/Connection.java | 70 ++++++--------------
.../apache/geode/internal/tcp/ConnectionTable.java | 8 +++
.../geode/internal/tcp/DirectReplySender.java | 3 +-
.../apache/geode/internal/tcp/MsgOutputStream.java | 4 +-
.../org/apache/geode/internal/tcp/MsgReader.java | 8 +--
.../org/apache/geode/internal/tcp/MsgStreamer.java | 31 +++++----
.../org/apache/geode/internal/tcp/TCPConduit.java | 5 ++
.../geode/internal/tcp/VersionedMsgStreamer.java | 5 +-
.../sanctioned-geode-core-serializables.txt | 2 +-
.../net/{BuffersTest.java => BufferPoolTest.java} | 26 +++++---
.../geode/internal/net/NioPlainEngineTest.java | 20 +++---
.../geode/internal/net/NioSslEngineTest.java | 18 +++---
.../geode/internal/tcp/ConnectionJUnitTest.java | 5 +-
21 files changed, 174 insertions(+), 198 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 8e27671..5a09285 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -216,7 +216,7 @@ public class SSLSocketIntegrationTest {
NioSslEngine engine =
clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
clusterSocketCreator.createSSLEngine("localhost", 1234), 0, true,
- ByteBuffer.allocate(65535), mock(DMStats.class));
+ ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
clientChannel.configureBlocking(true);
// transmit expected string from Client to Server
@@ -264,7 +264,7 @@ public class SSLSocketIntegrationTest {
timeoutMillis,
false,
ByteBuffer.allocate(500),
- mock(DMStats.class));
+ new BufferPool(mock(DMStats.class)));
readMessageFromNIOSSLClient(socket, buffer, engine);
readMessageFromNIOSSLClient(socket, buffer, engine);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index 91c47e2..13a77fb 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -25,7 +25,6 @@ import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
import org.apache.geode.internal.util.Breadcrumbs;
@@ -954,7 +953,6 @@ public class DistributionStats implements DMStats {
// this.replyWaitHistogram = new HistogramStats("ReplyWait", "nanoseconds", f,
// new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000},
// false);
- Buffers.initBufferStats(this);
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index 7d6d046..ecf37f2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -373,7 +373,8 @@ public class DirectChannel {
DMStats stats = getDMStats();
List<?> sentCons; // used for cons we sent to this time
- final BaseMsgStreamer ms = MsgStreamer.create(cons, msg, directReply, stats);
+ final BaseMsgStreamer ms =
+ MsgStreamer.create(cons, msg, directReply, stats, getConduit().getBufferPool());
try {
startTime = 0;
if (ackTimeout > 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
similarity index 73%
rename from geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
rename to geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index c77803d..d796ed6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -19,11 +19,12 @@ import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.internal.Assert;
-public class Buffers {
+public class BufferPool {
+ private final DMStats stats;
+
/**
* Buffers may be acquired from the Buffers pool
* or they may be allocated using Buffer.allocate(). This enum is used
@@ -34,11 +35,15 @@ public class Buffers {
UNTRACKED, TRACKED_SENDER, TRACKED_RECEIVER
}
+
+ public BufferPool(DMStats stats) {
+ this.stats = stats;
+ }
+
/**
* A list of soft references to byte buffers.
*/
- @MakeNotStatic
- private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
+ private final ConcurrentLinkedQueue<BBSoftReference> bufferQueue =
new ConcurrentLinkedQueue<>();
/**
@@ -51,15 +56,15 @@ public class Buffers {
*
* @return a byte buffer to be used for sending on this connection.
*/
- public static ByteBuffer acquireSenderBuffer(int size, DMStats stats) {
- return acquireBuffer(size, stats, true);
+ public ByteBuffer acquireSenderBuffer(int size) {
+ return acquireBuffer(size, true);
}
- public static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) {
- return acquireBuffer(size, stats, false);
+ public ByteBuffer acquireReceiveBuffer(int size) {
+ return acquireBuffer(size, false);
}
- private static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) {
+ private ByteBuffer acquireBuffer(int size, boolean send) {
ByteBuffer result;
if (useDirectBuffers) {
IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a
@@ -109,19 +114,19 @@ public class Buffers {
return result;
}
- public static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) {
- releaseBuffer(bb, stats, true);
+ public void releaseSenderBuffer(ByteBuffer bb) {
+ releaseBuffer(bb, true);
}
- public static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) {
- releaseBuffer(bb, stats, false);
+ public void releaseReceiveBuffer(ByteBuffer bb) {
+ releaseBuffer(bb, false);
}
/**
* expand a buffer that's currently being read from
*/
- static ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
- int desiredCapacity, DMStats stats) {
+ ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing,
+ int desiredCapacity) {
if (existing.capacity() >= desiredCapacity) {
if (existing.position() > 0) {
existing.compact();
@@ -129,51 +134,51 @@ public class Buffers {
}
return existing;
}
- ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+ ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity);
newBuffer.clear();
newBuffer.put(existing);
newBuffer.flip();
- releaseBuffer(type, existing, stats);
+ releaseBuffer(type, existing);
return newBuffer;
}
/**
* expand a buffer that's currently being written to
*/
- static ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
- int desiredCapacity, DMStats stats) {
+ ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing,
+ int desiredCapacity) {
if (existing.capacity() >= desiredCapacity) {
return existing;
}
- ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats);
+ ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity);
newBuffer.clear();
existing.flip();
newBuffer.put(existing);
- releaseBuffer(type, existing, stats);
+ releaseBuffer(type, existing);
return newBuffer;
}
- static ByteBuffer acquireBuffer(Buffers.BufferType type, int capacity, DMStats stats) {
+ ByteBuffer acquireBuffer(BufferPool.BufferType type, int capacity) {
switch (type) {
case UNTRACKED:
return ByteBuffer.allocate(capacity);
case TRACKED_SENDER:
- return Buffers.acquireSenderBuffer(capacity, stats);
+ return acquireSenderBuffer(capacity);
case TRACKED_RECEIVER:
- return Buffers.acquireReceiveBuffer(capacity, stats);
+ return acquireReceiveBuffer(capacity);
}
throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
}
- static void releaseBuffer(Buffers.BufferType type, ByteBuffer buffer, DMStats stats) {
+ void releaseBuffer(BufferPool.BufferType type, ByteBuffer buffer) {
switch (type) {
case UNTRACKED:
return;
case TRACKED_SENDER:
- Buffers.releaseSenderBuffer(buffer, stats);
+ releaseSenderBuffer(buffer);
return;
case TRACKED_RECEIVER:
- Buffers.releaseReceiveBuffer(buffer, stats);
+ releaseReceiveBuffer(buffer);
return;
}
throw new IllegalArgumentException("Unexpected buffer type " + type.toString());
@@ -183,7 +188,7 @@ public class Buffers {
/**
* Releases a previously acquired buffer.
*/
- private static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) {
+ private void releaseBuffer(ByteBuffer bb, boolean send) {
if (useDirectBuffers) {
BBSoftReference bbRef = new BBSoftReference(bb, send);
bufferQueue.offer(bbRef);
@@ -196,20 +201,6 @@ public class Buffers {
}
}
- public static void initBufferStats(DMStats stats) { // fixes 46773
- if (useDirectBuffers) {
- for (BBSoftReference ref : bufferQueue) {
- if (ref.getBB() != null) {
- if (ref.getSend()) { // fix bug 46773
- stats.incSenderBufferSize(ref.getSize(), true);
- } else {
- stats.incReceiverBufferSize(ref.getSize(), true);
- }
- }
- }
- }
- }
-
/**
* A soft reference that remembers the size of the byte buffer it refers to. TODO Dan - I really
* think this should be a weak reference. The JVM doesn't seem to clear soft references if it is
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 6cb40ec..8e41ef1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -18,8 +18,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.geode.distributed.internal.DMStats;
-
/**
* Prior to transmitting a buffer or processing a received buffer
* a NioFilter should be called to wrap (transmit) or unwrap (received)
@@ -44,7 +42,7 @@ public interface NioFilter {
* This must be invoked before readAtLeast. A new buffer may be returned by this method.
*/
ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
- Buffers.BufferType bufferType, DMStats stats);
+ BufferPool.BufferType bufferType);
/**
* read at least the indicated amount of bytes from the given
@@ -55,8 +53,8 @@ public interface NioFilter {
* wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
* unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
*/
- ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer,
- DMStats stats) throws IOException;
+ ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+ throws IOException;
/**
* You must invoke this when done reading from the unwrapped buffer
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 8a3e3fb..32fa297 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -20,24 +20,22 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.logging.LogService;
/**
* A pass-through implementation of NioFilter. Use this if you don't need
* secure communications.
*/
public class NioPlainEngine implements NioFilter {
- private static final Logger logger = LogService.getLogger();
+ private final BufferPool bufferPool;
int lastReadPosition;
int lastProcessedPosition;
- public NioPlainEngine() {}
+ public NioPlainEngine(BufferPool bufferPool) {
+ this.bufferPool = bufferPool;
+ }
@Override
public ByteBuffer wrap(ByteBuffer buffer) {
@@ -52,11 +50,11 @@ public class NioPlainEngine implements NioFilter {
@Override
public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
- Buffers.BufferType bufferType, DMStats stats) {
+ BufferPool.BufferType bufferType) {
ByteBuffer buffer = wrappedBuffer;
if (buffer == null) {
- buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+ buffer = bufferPool.acquireBuffer(bufferType, amount);
buffer.clear();
lastProcessedPosition = 0;
lastReadPosition = 0;
@@ -73,10 +71,10 @@ public class NioPlainEngine implements NioFilter {
ByteBuffer oldBuffer = buffer;
oldBuffer.limit(lastReadPosition);
oldBuffer.position(lastProcessedPosition);
- buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+ buffer = bufferPool.acquireBuffer(bufferType, amount);
buffer.clear();
buffer.put(oldBuffer);
- Buffers.releaseBuffer(bufferType, oldBuffer, stats);
+ bufferPool.releaseBuffer(bufferType, oldBuffer);
lastReadPosition = buffer.position();
lastProcessedPosition = 0;
}
@@ -84,8 +82,8 @@ public class NioPlainEngine implements NioFilter {
}
@Override
- public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer,
- DMStats stats) throws IOException {
+ public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
+ throws IOException {
ByteBuffer buffer = wrappedBuffer;
Assert.assertTrue(buffer.capacity() - lastProcessedPosition >= bytes);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index dd71d75..9bf969d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -19,9 +19,8 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP;
import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW;
import static javax.net.ssl.SSLEngineResult.Status.OK;
-import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_RECEIVER;
-import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_SENDER;
-import static org.apache.geode.internal.net.Buffers.releaseBuffer;
+import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
+import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER;
import java.io.EOFException;
import java.io.IOException;
@@ -41,8 +40,8 @@ import javax.net.ssl.SSLSession;
import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
-import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.net.BufferPool.BufferType;
/**
@@ -53,7 +52,7 @@ import org.apache.geode.internal.logging.LogService;
public class NioSslEngine implements NioFilter {
private static final Logger logger = LogService.getLogger();
- private final DMStats stats;
+ private final BufferPool bufferPool;
private volatile boolean closed;
@@ -74,14 +73,14 @@ public class NioSslEngine implements NioFilter {
*/
ByteBuffer handshakeBuffer;
- NioSslEngine(SSLEngine engine, DMStats stats) {
- this.stats = stats;
+ NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
SSLSession session = engine.getSession();
int appBufferSize = session.getApplicationBufferSize();
int packetBufferSize = engine.getSession().getPacketBufferSize();
this.myNetData = ByteBuffer.allocate(packetBufferSize);
this.peerAppData = ByteBuffer.allocate(appBufferSize);
this.engine = engine;
+ this.bufferPool = bufferPool;
}
/**
@@ -97,7 +96,7 @@ public class NioSslEngine implements NioFilter {
logger.debug("Allocating new buffer for SSL handshake");
}
this.handshakeBuffer =
- Buffers.acquireReceiveBuffer(engine.getSession().getPacketBufferSize(), stats);
+ bufferPool.acquireReceiveBuffer(engine.getSession().getPacketBufferSize());
} else {
this.handshakeBuffer = peerNetData;
}
@@ -154,8 +153,7 @@ public class NioSslEngine implements NioFilter {
if (engineResult.getStatus() == BUFFER_OVERFLOW) {
peerAppData =
- expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2,
- stats);
+ expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2);
}
break;
@@ -172,7 +170,7 @@ public class NioSslEngine implements NioFilter {
case BUFFER_OVERFLOW:
myNetData =
expandWriteBuffer(TRACKED_SENDER, myNetData,
- myNetData.capacity() * 2, stats);
+ myNetData.capacity() * 2);
break;
case OK:
myNetData.flip();
@@ -216,9 +214,9 @@ public class NioSslEngine implements NioFilter {
return true;
}
- ByteBuffer expandWriteBuffer(Buffers.BufferType type, ByteBuffer existing,
- int desiredCapacity, DMStats stats) {
- return Buffers.expandWriteBufferIfNeeded(type, existing, desiredCapacity, stats);
+ ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
+ int desiredCapacity) {
+ return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
}
void checkClosed() {
@@ -248,7 +246,7 @@ public class NioSslEngine implements NioFilter {
if (remaining < (appData.remaining() * 2)) {
int newCapacity = expandedCapacity(appData, myNetData);
- myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity, stats);
+ myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity);
}
SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
@@ -303,27 +301,27 @@ public class NioSslEngine implements NioFilter {
void expandPeerAppData(ByteBuffer wrappedBuffer) {
if (peerAppData.capacity() - peerAppData.position() < 2 * wrappedBuffer.remaining()) {
peerAppData =
- Buffers.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData,
- expandedCapacity(wrappedBuffer, peerAppData), stats);
+ bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData,
+ expandedCapacity(wrappedBuffer, peerAppData));
}
}
@Override
public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
- Buffers.BufferType bufferType, DMStats stats) {
+ BufferType bufferType) {
ByteBuffer buffer = wrappedBuffer;
int requiredSize = engine.getSession().getPacketBufferSize();
if (buffer == null) {
- buffer = Buffers.acquireBuffer(bufferType, requiredSize, stats);
+ buffer = bufferPool.acquireBuffer(bufferType, requiredSize);
} else if (buffer.capacity() < requiredSize) {
- buffer = Buffers.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize, stats);
+ buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize);
}
return buffer;
}
@Override
public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
- ByteBuffer wrappedBuffer, DMStats stats) throws IOException {
+ ByteBuffer wrappedBuffer) throws IOException {
if (peerAppData.capacity() > bytes) {
// we already have a buffer that's big enough
if (peerAppData.capacity() - peerAppData.position() < bytes) {
@@ -332,7 +330,7 @@ public class NioSslEngine implements NioFilter {
}
} else {
peerAppData =
- Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats);
+ bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes);
}
while (peerAppData.remaining() < bytes) {
@@ -367,7 +365,7 @@ public class NioSslEngine implements NioFilter {
// for TTLS the app-data buffers do not need to be tracked direct-buffers since we
// do not use them for I/O operations
peerAppData =
- Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount, this.stats);
+ bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount);
return peerAppData;
}
@@ -405,8 +403,8 @@ public class NioSslEngine implements NioFilter {
} catch (IOException e) {
throw new GemFireIOException("exception closing SSL session", e);
} finally {
- releaseBuffer(TRACKED_SENDER, myNetData, stats);
- releaseBuffer(TRACKED_RECEIVER, peerAppData, stats);
+ bufferPool.releaseBuffer(TRACKED_SENDER, myNetData);
+ bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData);
this.closed = true;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index 412c423..bc136f5 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -84,7 +84,6 @@ import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.ClientSocketFactory;
-import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -940,7 +939,7 @@ public class SocketCreator {
int timeout,
boolean clientSocket,
ByteBuffer peerNetBuffer,
- DMStats stats)
+ BufferPool bufferPool)
throws IOException {
engine.setUseClientMode(clientSocket);
while (!socketChannel.finishConnect()) {
@@ -954,7 +953,7 @@ public class SocketCreator {
}
}
- NioSslEngine nioSslEngine = new NioSslEngine(engine, stats);
+ NioSslEngine nioSslEngine = new NioSslEngine(engine, bufferPool);
boolean blocking = socketChannel.isBlocking();
if (blocking) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 2ba313e..a9cb8d9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,8 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -78,7 +76,7 @@ import org.apache.geode.internal.Version;
import org.apache.geode.internal.alerting.AlertingAction;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.NioFilter;
import org.apache.geode.internal.net.NioPlainEngine;
import org.apache.geode.internal.net.SocketCreator;
@@ -593,7 +591,7 @@ public class Connection implements Runnable {
bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
int allocSize = bytes.length;
ByteBuffer bb;
- if (Buffers.useDirectBuffers) {
+ if (BufferPool.useDirectBuffers) {
bb = ByteBuffer.allocateDirect(allocSize);
} else {
bb = ByteBuffer.allocate(allocSize);
@@ -637,7 +635,7 @@ public class Connection implements Runnable {
if (this.isReceiver) {
DistributionConfig cfg = owner.getConduit().config;
ByteBuffer bb;
- if (Buffers.useDirectBuffers) {
+ if (BufferPool.useDirectBuffers) {
bb = ByteBuffer.allocateDirect(128);
} else {
bb = ByteBuffer.allocate(128);
@@ -1200,7 +1198,7 @@ public class Connection implements Runnable {
private BatchBufferFlusher batchFlusher;
private void createBatchSendBuffer() {
- if (Buffers.useDirectBuffers) {
+ if (BufferPool.useDirectBuffers) {
this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
} else {
@@ -1613,10 +1611,14 @@ public class Connection implements Runnable {
if (tmp != null) {
this.inputBuffer = null;
final DMStats stats = this.owner.getConduit().getStats();
- Buffers.releaseReceiveBuffer(tmp, stats);
+ getBufferPool().releaseReceiveBuffer(tmp);
}
}
+ BufferPool getBufferPool() {
+ return owner.getBufferPool();
+ }
+
private String p2pReaderName() {
StringBuilder sb = new StringBuilder(64);
if (this.isReceiver) {
@@ -1840,9 +1842,9 @@ public class Connection implements Runnable {
|| (inputBuffer.capacity() < packetBufferSize)) {
// TLS has a minimum input buffer size constraint
if (inputBuffer != null) {
- Buffers.releaseReceiveBuffer(inputBuffer, getConduit().getStats());
+ getBufferPool().releaseReceiveBuffer(inputBuffer);
}
- inputBuffer = Buffers.acquireReceiveBuffer(packetBufferSize, getConduit().getStats());
+ inputBuffer = getBufferPool().acquireReceiveBuffer(packetBufferSize);
}
if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
channel.socket().setReceiveBufferSize(packetBufferSize);
@@ -1851,9 +1853,10 @@ public class Connection implements Runnable {
channel.socket().setSendBufferSize(packetBufferSize);
}
ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
- getConduit().idleConnectionTimeout, clientSocket, inputBuffer, getConduit().getStats());
+ getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
+ getBufferPool());
} else {
- ioFilter = new NioPlainEngine();
+ ioFilter = new NioPlainEngine(getBufferPool());
}
}
@@ -1958,42 +1961,6 @@ public class Connection implements Runnable {
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE")
- void readFully(InputStream input, byte[] buffer, int len) throws IOException {
- int bytesSoFar = 0;
- while (bytesSoFar < len) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
- try {
- synchronized (stateLock) {
- connectionState = STATE_READING;
- }
- int bytesThisTime = input.read(buffer, bytesSoFar, len - bytesSoFar);
- if (bytesThisTime < 0) {
- this.readerShuttingDown = true;
- try {
- requestClose("Stream read returned non-positive length");
- } catch (Exception ignored) {
- }
- return;
- }
- bytesSoFar += bytesThisTime;
- } catch (InterruptedIOException io) {
- // Current thread has been interrupted. Regard it similar to an EOF
- this.readerShuttingDown = true;
- try {
- requestClose("Current thread interrupted");
- } catch (Exception ignored) {
- }
- Thread.currentThread().interrupt();
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
- } finally {
- synchronized (stateLock) {
- connectionState = STATE_IDLE;
- }
- }
- } // while
- }
-
/**
* sends a serialized message to the other end of this connection. This is used by the
* DirectChannel in GemFire when the message is going to be sent to multiple recipients.
@@ -2011,7 +1978,7 @@ public class Connection implements Runnable {
return;
}
final boolean origSocketInUse = this.socketInUse;
- byte originalState = -1;
+ byte originalState;
synchronized (stateLock) {
originalState = this.connectionState;
this.connectionState = STATE_SENDING;
@@ -2237,7 +2204,6 @@ public class Connection implements Runnable {
ck.setBuffer(oldBuffer);
} else {
// old buffer was not large enough
- oldBuffer = null;
ByteBuffer newbb = ByteBuffer.allocate(newBytes);
newbb.put(buffer);
newbb.flip();
@@ -2787,7 +2753,7 @@ public class Connection implements Runnable {
if (allocSize == -1) {
allocSize = this.owner.getConduit().tcpBufferSize;
}
- inputBuffer = Buffers.acquireReceiveBuffer(allocSize, this.owner.getConduit().getStats());
+ inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize);
}
return inputBuffer;
}
@@ -3398,13 +3364,13 @@ public class Connection implements Runnable {
logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
allocSize, oldBufferSize);
ByteBuffer oldBuffer = inputBuffer;
- inputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats);
+ inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize);
if (oldBuffer != null) {
int oldByteCount = oldBuffer.remaining();
inputBuffer.put(oldBuffer);
inputBuffer.position(oldByteCount);
- Buffers.releaseReceiveBuffer(oldBuffer, stats);
+ getBufferPool().releaseReceiveBuffer(oldBuffer);
}
} else {
if (inputBuffer.position() != 0) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index f3a4432..50c3cf8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -48,6 +48,7 @@ import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.alerting.AlertingAction;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingExecutors;
+import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SocketCloser;
/**
@@ -124,6 +125,8 @@ public class ConnectionTable {
*/
private final TCPConduit owner;
+ private final BufferPool bufferPool;
+
/**
* true if this table is no longer in use
*/
@@ -199,6 +202,7 @@ public class ConnectionTable {
this.threadConnectionMap = new ConcurrentHashMap();
this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
this.socketCloser = new SocketCloser();
+ this.bufferPool = new BufferPool(owner.getStats());
}
private Executor createThreadPoolForIO(boolean conserveSockets) {
@@ -611,6 +615,10 @@ public class ConnectionTable {
return owner;
}
+ public BufferPool getBufferPool() {
+ return bufferPool;
+ }
+
public boolean isClosed() {
return this.closed;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 73d1582..dbb828d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -65,7 +65,8 @@ class DirectReplySender implements ReplySender {
}
ArrayList<Connection> conns = new ArrayList<Connection>(1);
conns.add(conn);
- MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS);
+ MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS,
+ conn.getBufferPool());
try {
ms.writeMessage();
ConnectExceptions ce = ms.getConnectExceptions();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
index 2d767b8..98669b3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.geode.DataSerializer;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.ObjToByteArraySerializer;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
/**
* MsgOutputStream should no longer be used except in Connection to do the handshake. Otherwise
@@ -38,7 +38,7 @@ public class MsgOutputStream extends OutputStream implements ObjToByteArraySeria
* The caller of this constructor is responsible for managing the allocated instance.
*/
public MsgOutputStream(int allocSize) {
- if (Buffers.useDirectBuffers) {
+ if (BufferPool.useDirectBuffers) {
this.buffer = ByteBuffer.allocateDirect(allocSize);
} else {
this.buffer = ByteBuffer.allocate(allocSize);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index afb0272..0a33428 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -27,7 +27,7 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.NioFilter;
/**
@@ -125,13 +125,13 @@ public class MsgReader {
private ByteBuffer readAtLeast(int bytes) throws IOException {
peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
- Buffers.BufferType.TRACKED_RECEIVER, getStats());
- return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
+ BufferPool.BufferType.TRACKED_RECEIVER);
+ return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData);
}
public void close() {
if (peerNetData != null) {
- Buffers.releaseReceiveBuffer(peerNetData, getStats());
+ conn.getBufferPool().releaseReceiveBuffer(peerNetData);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 22a385b..d42bef1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -25,7 +25,6 @@ import java.util.List;
import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
-import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.distributed.internal.DMStats;
@@ -37,8 +36,7 @@ import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.ObjToByteArraySerializer;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.net.Buffers;
+import org.apache.geode.internal.net.BufferPool;
/**
* <p>
@@ -52,13 +50,13 @@ import org.apache.geode.internal.net.Buffers;
public class MsgStreamer extends OutputStream
implements ObjToByteArraySerializer, BaseMsgStreamer, ByteBufferWriter {
- private static final Logger logger = LogService.getLogger();
-
/**
* List of connections to send this msg to.
*/
private final List<?> cons;
+ private final BufferPool bufferPool;
+
/**
* Any exceptions that happen during sends
*/
@@ -98,7 +96,7 @@ public class MsgStreamer extends OutputStream
MsgIdGenerator.release(this.msgId);
this.buffer.clear();
this.overflowBuf = null;
- Buffers.releaseSenderBuffer(this.buffer, this.stats);
+ bufferPool.releaseSenderBuffer(this.buffer);
}
/**
@@ -126,15 +124,16 @@ public class MsgStreamer extends OutputStream
* now be used.
*/
MsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats,
- int sendBufferSize) {
+ int sendBufferSize, BufferPool bufferPool) {
this.stats = stats;
this.msg = msg;
this.cons = cons;
- this.buffer = Buffers.acquireSenderBuffer(sendBufferSize, stats);
+ this.buffer = bufferPool.acquireSenderBuffer(sendBufferSize);
this.buffer.clear();
this.buffer.position(Connection.MSG_HEADER_BYTES);
this.msgId = MsgIdGenerator.NO_MSG_ID;
this.directReply = directReply;
+ this.bufferPool = bufferPool;
startSerialization();
}
@@ -144,7 +143,7 @@ public class MsgStreamer extends OutputStream
* List of MsgStreamer objects.
*/
public static BaseMsgStreamer create(List<?> cons, final DistributionMessage msg,
- final boolean directReply, final DMStats stats) {
+ final boolean directReply, final DMStats stats, BufferPool bufferPool) {
final Connection firstCon = (Connection) cons.get(0);
// split into different versions if required
Version version;
@@ -170,7 +169,8 @@ public class MsgStreamer extends OutputStream
}
}
if (versionToConnMap == null) {
- return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize());
+ return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+ bufferPool);
} else {
// if there is a versioned stream created, then split remaining
// connections to unversioned stream
@@ -187,7 +187,8 @@ public class MsgStreamer extends OutputStream
unversionedCons.add(con);
}
}
- streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize));
+ streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize,
+ bufferPool));
}
for (ObjectIterator<Object2ObjectMap.Entry> itr =
versionToConnMap.object2ObjectEntrySet().fastIterator(); itr.hasNext();) {
@@ -195,15 +196,17 @@ public class MsgStreamer extends OutputStream
Object ver = entry.getKey();
Object l = entry.getValue();
streamers.add(new VersionedMsgStreamer((List<?>) l, msg, directReply, stats,
- sendBufferSize, (Version) ver));
+ bufferPool, sendBufferSize, (Version) ver));
}
return new MsgStreamerList(streamers);
}
} else if ((version = firstCon.getRemoteVersion()) == null) {
- return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize());
+ return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+ bufferPool);
} else {
// create a single VersionedMsgStreamer
- return new VersionedMsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(),
+ return new VersionedMsgStreamer(cons, msg, directReply, stats, bufferPool,
+ firstCon.getSendBufferSize(),
version);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 97d748f..699c706 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.alerting.AlertingAction;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
@@ -975,6 +976,10 @@ public class TCPConduit implements Runnable {
return useSSL;
}
+ public BufferPool getBufferPool() {
+ return this.conTable.getBufferPool();
+ }
+
protected class Stopper extends CancelCriterion {
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
index 4fe7b32..72b68f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
@@ -21,6 +21,7 @@ import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.net.BufferPool;
/**
* An extension of {@link MsgStreamer} that implements {@link VersionedDataStream}.
@@ -32,8 +33,8 @@ class VersionedMsgStreamer extends MsgStreamer implements VersionedDataStream {
private final Version version;
VersionedMsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats,
- int sendBufferSize, Version version) {
- super(cons, msg, directReply, stats, sendBufferSize);
+ BufferPool bufferPool, int sendBufferSize, Version version) {
+ super(cons, msg, directReply, stats, sendBufferSize, bufferPool);
this.version = version;
}
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 9cce709..ed9bbf5 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -388,7 +388,7 @@ org/apache/geode/internal/jta/TransactionManagerImpl,true,5033392316185449821,gl
org/apache/geode/internal/jta/TransactionManagerImpl$GlobalTransactionComparator,false
org/apache/geode/internal/jta/UserTransactionImpl,true,2994652455204901910,storedTimeOut:int,tm:javax/transaction/TransactionManager
org/apache/geode/internal/monitoring/ThreadsMonitoring$Mode,false
-org/apache/geode/internal/net/Buffers$BufferType,false
+org/apache/geode/internal/net/BufferPool$BufferType,false
org/apache/geode/internal/offheap/MemoryBlock$State,false
org/apache/geode/internal/offheap/OffHeapStorage$1,false
org/apache/geode/internal/offheap/OffHeapStorage$2,false
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
similarity index 84%
rename from geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
index 96a4ac6..cc441e4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java
@@ -21,11 +21,19 @@ import static org.mockito.Mockito.mock;
import java.nio.ByteBuffer;
+import org.junit.Before;
import org.junit.Test;
import org.apache.geode.distributed.internal.DMStats;
-public class BuffersTest {
+public class BufferPoolTest {
+
+ private BufferPool bufferPool;
+
+ @Before
+ public void setup() {
+ bufferPool = new BufferPool(mock(DMStats.class));
+ }
@Test
public void expandBuffer() throws Exception {
@@ -50,8 +58,7 @@ public class BuffersTest {
private void createAndVerifyNewWriteBuffer(ByteBuffer buffer, boolean useDirectBuffer) {
buffer.position(buffer.capacity());
ByteBuffer newBuffer =
- Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500,
- mock(DMStats.class));
+ bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500);
assertEquals(buffer.position(), newBuffer.position());
assertEquals(500, newBuffer.capacity());
newBuffer.flip();
@@ -66,8 +73,7 @@ public class BuffersTest {
buffer.position(0);
buffer.limit(256);
ByteBuffer newBuffer =
- Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500,
- mock(DMStats.class));
+ bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500);
assertEquals(0, newBuffer.position());
assertEquals(500, newBuffer.capacity());
for (int i = 0; i < 256; i++) {
@@ -84,8 +90,9 @@ public class BuffersTest {
ByteBuffer buffer = ByteBuffer.allocate(33842);
buffer.position(7);
buffer.limit(16384);
- ByteBuffer newBuffer = Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer,
- 40899, mock(DMStats.class));
+ ByteBuffer newBuffer =
+ bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer,
+ 40899);
assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899);
// buffer should be ready to read the same amount of data
assertThat(newBuffer.position()).isEqualTo(0);
@@ -98,8 +105,9 @@ public class BuffersTest {
ByteBuffer buffer = ByteBuffer.allocate(33842);
buffer.position(16384);
buffer.limit(buffer.capacity());
- ByteBuffer newBuffer = Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer,
- 40899, mock(DMStats.class));
+ ByteBuffer newBuffer =
+ bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer,
+ 40899);
assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899);
// buffer should have the same amount of data as the old one
assertThat(newBuffer.position()).isEqualTo(16384);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index 5fe4def..133d827 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -36,17 +36,15 @@ import org.apache.geode.distributed.internal.DMStats;
public class NioPlainEngineTest {
- private static final int netBufferSize = 10000;
- private static final int appBufferSize = 20000;
-
private DMStats mockStats;
private NioPlainEngine nioEngine;
+ private BufferPool bufferPool;
@Before
public void setUp() throws Exception {
mockStats = mock(DMStats.class);
-
- nioEngine = new NioPlainEngine();
+ bufferPool = new BufferPool(mockStats);
+ nioEngine = new NioPlainEngine(bufferPool);
}
@Test
@@ -60,13 +58,13 @@ public class NioPlainEngineTest {
@Test
@Ignore("Pending fix of GEODE-6733 to remove static from Buffers implementation")
public void ensureWrappedCapacity() {
- ByteBuffer wrappedBuffer = Buffers.acquireReceiveBuffer(100, mockStats);
+ ByteBuffer wrappedBuffer = bufferPool.acquireReceiveBuffer(100);
verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
nioEngine.lastReadPosition = 10;
int requestedCapacity = 210;
ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
- Buffers.BufferType.TRACKED_RECEIVER, mockStats);
+ BufferPool.BufferType.TRACKED_RECEIVER);
verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class));
assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
assertThat(result).isNotSameAs(wrappedBuffer);
@@ -90,7 +88,7 @@ public class NioPlainEngineTest {
nioEngine.lastReadPosition = consumedDataPresentInBuffer + unconsumedDataPresentInBuffer;
ByteBuffer result =
wrappedBuffer = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer,
- Buffers.BufferType.UNTRACKED, mockStats);
+ BufferPool.BufferType.UNTRACKED);
assertThat(result.capacity()).isEqualTo(requestedCapacity + unconsumedDataPresentInBuffer);
assertThat(result).isSameAs(wrappedBuffer);
// make sure that data was transferred to the new buffer
@@ -121,14 +119,14 @@ public class NioPlainEngineTest {
nioEngine.lastReadPosition = 10;
- ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+ ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
assertThat(data.position()).isEqualTo(0);
assertThat(data.limit()).isEqualTo(amountToRead);
assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
- data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+ data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
verify(mockChannel, times(5)).read(any(ByteBuffer.class));
// at end of last readAtLeast data
assertThat(data.position()).isEqualTo(amountToRead);
@@ -152,7 +150,7 @@ public class NioPlainEngineTest {
nioEngine.lastReadPosition = 10;
- nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+ nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index b12df09..7236895 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -80,7 +80,7 @@ public class NioSslEngineTest {
mockStats = mock(DMStats.class);
- nioSslEngine = new NioSslEngine(mockEngine, mockStats);
+ nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats));
spyNioSslEngine = spy(nioSslEngine);
}
@@ -113,8 +113,8 @@ public class NioSslEngineTest {
verify(mockEngine, atLeast(2)).getHandshakeStatus();
verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
- verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(Buffers.BufferType.class),
- any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+ verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class),
+ any(ByteBuffer.class), any(Integer.class));
verify(spyNioSslEngine, times(1)).handleBlockingTasks();
verify(mockChannel, times(3)).read(any(ByteBuffer.class));
}
@@ -228,8 +228,8 @@ public class NioSslEngineTest {
ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
- verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(Buffers.BufferType.class),
- any(ByteBuffer.class), any(Integer.class), any(DMStats.class));
+ verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class),
+ any(ByteBuffer.class), any(Integer.class));
appData.flip();
assertThat(wrappedBuffer).isEqualTo(appData);
verify(spyNioSslEngine, times(1)).handleBlockingTasks();
@@ -376,14 +376,14 @@ public class NioSslEngineTest {
public void ensureWrappedCapacityOfSmallMessage() {
ByteBuffer buffer = ByteBuffer.allocate(netBufferSize);
assertThat(
- nioSslEngine.ensureWrappedCapacity(10, buffer, Buffers.BufferType.UNTRACKED, mockStats))
+ nioSslEngine.ensureWrappedCapacity(10, buffer, BufferPool.BufferType.UNTRACKED))
.isEqualTo(buffer);
}
@Test
public void ensureWrappedCapacityWithNoBuffer() {
assertThat(
- nioSslEngine.ensureWrappedCapacity(10, null, Buffers.BufferType.UNTRACKED, mockStats)
+ nioSslEngine.ensureWrappedCapacity(10, null, BufferPool.BufferType.UNTRACKED)
.capacity())
.isEqualTo(netBufferSize);
}
@@ -415,7 +415,7 @@ public class NioSslEngineTest {
testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
nioSslEngine.engine = testSSLEngine;
- ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+ ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
assertThat(data.position()).isEqualTo(0);
assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
@@ -459,7 +459,7 @@ public class NioSslEngineTest {
new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
nioSslEngine.engine = testSSLEngine;
- ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats);
+ ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
assertThat(data.position()).isEqualTo(0);
assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
index 73cf06c..854685f 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
@@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.MembershipManager;
+import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.test.junit.categories.MembershipTest;
@@ -52,10 +53,12 @@ public class ConnectionJUnitTest {
DistributionManager distMgr = mock(DistributionManager.class);
MembershipManager membership = mock(MembershipManager.class);
TCPConduit conduit = mock(TCPConduit.class);
+ DMStats stats = mock(DMStats.class);
// mock the connection table and conduit
when(table.getConduit()).thenReturn(conduit);
+ when(table.getBufferPool()).thenReturn(new BufferPool(stats));
CancelCriterion stopper = mock(CancelCriterion.class);
when(stopper.cancelInProgress()).thenReturn(null);
@@ -67,7 +70,7 @@ public class ConnectionJUnitTest {
// mock the distribution manager and membership manager
when(distMgr.getMembershipManager()).thenReturn(membership);
when(conduit.getDM()).thenReturn(distMgr);
- when(conduit.getStats()).thenReturn(mock(DMStats.class));
+ when(conduit.getStats()).thenReturn(stats);
when(table.getDM()).thenReturn(distMgr);
SocketCloser closer = mock(SocketCloser.class);
when(table.getSocketCloser()).thenReturn(closer);