You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ec...@apache.org on 2021/06/17 15:41:19 UTC
[geode] 01/02: Revert "GEODE-9141: (2 of 2) Handle in-buffer
concurrency"
This is an automated email from the ASF dual-hosted git repository.
echobravo pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
commit c13a62199a3720c69c142b0dcd2f20130ac3b1a4
Author: Ernest Burghardt <eb...@pivotal.io>
AuthorDate: Thu Jun 17 10:39:47 2021 -0500
Revert "GEODE-9141: (2 of 2) Handle in-buffer concurrency"
This reverts commit 1a8eb5aec580eb75871060793ea65d62f5f2d959.
---
...LSocketHostNameVerificationIntegrationTest.java | 6 +-
.../internal/net/SSLSocketIntegrationTest.java | 3 +-
.../apache/geode/codeAnalysis/excludedClasses.txt | 2 +-
.../geode/internal/net/ByteBufferSharing.java | 15 -
.../geode/internal/net/ByteBufferSharingNoOp.java | 5 -
.../geode/internal/net/ByteBufferVendor.java | 144 +++------
.../apache/geode/internal/net/NioSslEngine.java | 50 +--
.../apache/geode/internal/net/SocketCreator.java | 9 +-
.../org/apache/geode/internal/tcp/Connection.java | 334 ++++++++++-----------
.../geode/internal/net/ByteBufferVendorTest.java | 36 +--
.../geode/internal/net/NioSslEngineTest.java | 41 ++-
.../apache/geode/internal/tcp/ConnectionTest.java | 1 -
12 files changed, 285 insertions(+), 361 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index e86bfea..a70f3b1 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -103,9 +103,6 @@ public class SSLSocketHostNameVerificationIntegrationTest {
@Before
public void setUp() throws Exception {
-
- SocketCreatorFactory.close(); // to clear socket creators made in previous tests
-
IgnoredException.addIgnoredException("javax.net.ssl.SSLException: Read timed out");
this.localHost = InetAddress.getLoopbackAddress();
@@ -175,7 +172,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
try {
this.socketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
- sslEngine, 0,
+ sslEngine, 0, true,
ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
new BufferPool(mock(DMStats.class)));
@@ -208,6 +205,7 @@ public class SSLSocketHostNameVerificationIntegrationTest {
sc.handshakeSSLSocketChannel(socket.getChannel(),
sslEngine,
timeoutMillis,
+ false,
ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()),
new BufferPool(mock(DMStats.class)));
} catch (Throwable throwable) {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 13e9d5b..e7ac191 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -217,7 +217,7 @@ public class SSLSocketIntegrationTest {
clientSocket = clientChannel.socket();
NioSslEngine engine =
clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(),
- clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0,
+ clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, true,
ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class)));
clientChannel.configureBlocking(true);
@@ -267,6 +267,7 @@ public class SSLSocketIntegrationTest {
sc.handshakeSSLSocketChannel(socket.getChannel(), sc.createSSLEngine("localhost", 1234,
false),
timeoutMillis,
+ false,
ByteBuffer.allocate(65535),
new BufferPool(mock(DMStats.class)));
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index cd1af3a..af3bd1e 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,4 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
org/apache/geode/cache/query/internal/xml/ElementType$1
org/apache/geode/cache/query/internal/xml/ElementType$2
org/apache/geode/cache/query/internal/xml/ElementType$3
-org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
\ No newline at end of file
+org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
index c8a94ce..cdfa897 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -42,27 +42,12 @@ public interface ByteBufferSharing extends AutoCloseable {
*
* Subsequent calls to {@link #getBuffer()} will return that new buffer too.
*
- * This variant is for use when the buffer is being written to.
- *
* @return the same buffer or a different (bigger) buffer
* @throws IOException if the buffer is no longer accessible
*/
ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
/**
- * Expand the buffer if needed. This may return a different object so be sure to pay attention to
- * the return value if you need access to the potentially- expanded buffer.
- *
- * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
- *
- * This variant is for use when the buffer is being read from.
- *
- * @return the same buffer or a different (bigger) buffer
- * @throws IOException if the buffer is no longer accessible
- */
- ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException;
-
- /**
* Override {@link AutoCloseable#close()} without throws clause since we don't need one.
*/
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
index 4f36e5b..4a8bc49 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -48,10 +48,5 @@ class ByteBufferSharingNoOp implements ByteBufferSharing {
}
@Override
- public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
- throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
- }
-
- @Override
public void close() {}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
index 1dc74f0..4933247 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java
@@ -17,7 +17,6 @@ package org.apache.geode.internal.net;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,49 +27,49 @@ import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.internal.net.BufferPool.BufferType;
/**
- * Produces (via {@link #open()}) an {@link ByteBufferSharing} meant to used only within a
- * try-with-resources block. The resource controls access to a secondary resource
- * (via {@link ByteBufferSharing#getBuffer()}) within the scope of try-with-resources.
- * Neither the object returned by {@link #open()}, nor the object returned by invoking
- * {@link ByteBufferSharing#getBuffer()} on that object may be used outside the scope of
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
* try-with-resources.
*/
-public class ByteBufferVendor {
+class ByteBufferVendor implements ByteBufferSharing {
static class OpenAttemptTimedOut extends Exception {
}
- private interface ByteBufferSharingInternal extends ByteBufferSharing {
- void releaseBuffer();
- }
-
- private final Lock lock = new ReentrantLock();
- private final AtomicBoolean isDestructed = new AtomicBoolean(false);
- private final AtomicInteger counter = new AtomicInteger(1);
- // the object referenced by sharing is guarded by lock
- private final ByteBufferSharingInternal sharing;
+ private final Lock lock;
+ private final AtomicBoolean isDestructed;
+ // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
+ private volatile ByteBuffer buffer;
+ private final BufferType bufferType;
+ private final AtomicInteger counter;
+ private final BufferPool bufferPool;
- /*
- * These constructors are for use only by the owner of the shared resource.
+ /**
+ * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
*
* A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
* to an external object or is returned to an external caller.)
*
- * Constructors acquire no locks. The reference count will be 1 after a constructor
+ * This constructor acquires no lock. The reference count will be 1 after this constructor
* completes.
*/
+ ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType,
+ final BufferPool bufferPool) {
+ this.buffer = buffer;
+ this.bufferType = bufferType;
+ this.bufferPool = bufferPool;
+ lock = new ReentrantLock();
+ counter = new AtomicInteger(1);
+ isDestructed = new AtomicBoolean(false);
+ }
/**
- * When you have a ByteBuffer available before construction, use this constructor.
- *
- * @param bufferArg is the ByteBuffer
- * @param bufferType needed for freeing the buffer later
- * @param bufferPool needed for freeing the buffer later
+ * The destructor. Called by the resource owner to undo the work of the constructor.
*/
- public ByteBufferVendor(final ByteBuffer bufferArg,
- final BufferType bufferType,
- final BufferPool bufferPool) {
- sharing = new ByteBufferSharingInternalImpl(bufferArg, bufferType, bufferPool);
+ void destruct() {
+ if (isDestructed.compareAndSet(false, true)) {
+ dropReference();
+ }
}
/**
@@ -79,19 +78,18 @@ public class ByteBufferVendor {
*
* Resource owners call this method as the last thing before returning a reference to the caller.
* That caller binds that reference to a variable in a try-with-resources statement and relies on
- * the AutoCloseable protocol to invoke {@link AutoCloseable#close()} on the object at
- * the end of the block.
+ * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
*/
- public ByteBufferSharing open() throws IOException {
+ ByteBufferSharing open() throws IOException {
lock.lock();
addReferenceAfterLock();
- return sharing;
+ return this;
}
/**
* This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
*/
- public ByteBufferSharing open(final long time, final TimeUnit unit)
+ ByteBufferSharing open(final long time, final TimeUnit unit)
throws OpenAttemptTimedOut, IOException {
try {
if (!lock.tryLock(time, unit)) {
@@ -102,25 +100,24 @@ public class ByteBufferVendor {
throw new OpenAttemptTimedOut();
}
addReferenceAfterLock();
- return sharing;
- }
-
- /**
- * The destructor. Called by the resource owner to undo the work of the constructor.
- */
- public void destruct() {
- if (isDestructed.compareAndSet(false, true)) {
- dropReference();
- }
+ return this;
}
- private void exposingResource() throws IOException {
+ @Override
+ public ByteBuffer getBuffer() throws IOException {
if (isDestructed.get()) {
throwClosed();
}
+ return buffer;
}
- private void close() {
+ @Override
+ public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+ return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+ }
+
+ @Override
+ public void close() {
/*
* We are counting on our ReentrantLock throwing an exception if the current thread
* does not hold the lock. In that case dropReference() will not be called. This
@@ -145,11 +142,16 @@ public class ByteBufferVendor {
private int dropReference() {
final int usages = counter.decrementAndGet();
if (usages == 0) {
- sharing.releaseBuffer();
+ bufferPool.releaseBuffer(bufferType, buffer);
}
return usages;
}
+ @VisibleForTesting
+ public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+ buffer = newBufferForTesting;
+ }
+
private void addReferenceAfterLock() throws IOException {
try {
addReference();
@@ -163,54 +165,4 @@ public class ByteBufferVendor {
throw new IOException("NioSslEngine has been closed");
}
- private class ByteBufferSharingInternalImpl implements ByteBufferSharingInternal {
-
- /*
- * mutable because in general our ByteBuffer may need to be resized (grown or compacted)
- * no concurrency concerns since ByteBufferSharingNotNull is guarded by ByteBufferVendor.lock
- */
- private ByteBuffer buffer;
- private final BufferType bufferType;
- private final BufferPool bufferPool;
-
- public ByteBufferSharingInternalImpl(final ByteBuffer buffer,
- final BufferType bufferType,
- final BufferPool bufferPool) {
- Objects.requireNonNull(buffer);
- this.buffer = buffer;
- this.bufferType = bufferType;
- this.bufferPool = bufferPool;
- }
-
- @Override
- public ByteBuffer getBuffer() throws IOException {
- exposingResource();
- return buffer;
- }
-
- @Override
- public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
- return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
- }
-
- @Override
- public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException {
- return buffer = bufferPool.expandReadBufferIfNeeded(bufferType, getBuffer(), newCapacity);
- }
-
- @Override
- public void close() {
- ByteBufferVendor.this.close();
- }
-
- @Override
- public void releaseBuffer() {
- bufferPool.releaseBuffer(bufferType, buffer);
- }
- }
-
- @VisibleForTesting
- public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
- ((ByteBufferSharingInternalImpl) sharing).buffer = newBufferForTesting;
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 4c603a0..fc91a31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter {
/**
* holds bytes wrapped by the SSLEngine; a.k.a. myNetData
*/
- private final ByteBufferVendor outputBufferVendor;
+ private final ByteBufferVendor outputSharing;
/**
* holds the last unwrapped data from a peer; a.k.a. peerAppData
*/
- private final ByteBufferVendor inputBufferVendor;
+ private final ByteBufferVendor inputSharing;
NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
SSLSession session = engine.getSession();
@@ -76,10 +76,10 @@ public class NioSslEngine implements NioFilter {
closed = false;
this.engine = engine;
this.bufferPool = bufferPool;
- outputBufferVendor =
+ outputSharing =
new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
TRACKED_SENDER, bufferPool);
- inputBufferVendor =
+ inputSharing =
new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
TRACKED_RECEIVER, bufferPool);
}
@@ -98,7 +98,7 @@ public class NioSslEngine implements NioFilter {
peerNetData.capacity(), engine.getSession().getPacketBufferSize()));
}
- final ByteBuffer handshakeBuffer = peerNetData;
+ ByteBuffer handshakeBuffer = peerNetData;
handshakeBuffer.clear();
ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]);
@@ -135,7 +135,7 @@ public class NioSslEngine implements NioFilter {
switch (status) {
case NEED_UNWRAP:
- try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+ try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
final ByteBuffer peerAppData = inputSharing.getBuffer();
// Receive handshaking data from peer
@@ -162,7 +162,7 @@ public class NioSslEngine implements NioFilter {
}
case NEED_WRAP:
- try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
+ try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
final ByteBuffer myNetData = outputSharing.getBuffer();
// Empty the local network packet buffer.
@@ -231,7 +231,7 @@ public class NioSslEngine implements NioFilter {
@Override
public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
- try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) {
+ try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
ByteBuffer myNetData = outputSharing.getBuffer();
@@ -260,13 +260,13 @@ public class NioSslEngine implements NioFilter {
myNetData.flip();
- return outputBufferVendor.open();
+ return shareOutputBuffer();
}
}
@Override
public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
- try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+ try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
ByteBuffer peerAppData = inputSharing.getBuffer();
@@ -292,7 +292,7 @@ public class NioSslEngine implements NioFilter {
// partial data - need to read more. When this happens the SSLEngine will not have
// changed the buffer position
wrappedBuffer.compact();
- return inputBufferVendor.open();
+ return shareInputBuffer();
case OK:
break;
default:// if there is data in the decrypted buffer return it. Otherwise signal that we're
@@ -305,7 +305,7 @@ public class NioSslEngine implements NioFilter {
}
}
wrappedBuffer.clear();
- return inputBufferVendor.open();
+ return shareInputBuffer();
}
}
@@ -325,7 +325,7 @@ public class NioSslEngine implements NioFilter {
@Override
public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
ByteBuffer wrappedBuffer) throws IOException {
- try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
+ try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
ByteBuffer peerAppData = inputSharing.getBuffer();
@@ -355,13 +355,13 @@ public class NioSslEngine implements NioFilter {
}
}
}
- return inputBufferVendor.open();
+ return shareInputBuffer();
}
}
@Override
public ByteBufferSharing getUnwrappedBuffer() throws IOException {
- return inputBufferVendor.open();
+ return shareInputBuffer();
}
@Override
@@ -377,8 +377,8 @@ public class NioSslEngine implements NioFilter {
return;
}
closed = true;
- inputBufferVendor.destruct();
- try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) {
+ inputSharing.destruct();
+ try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
final ByteBuffer myNetData = outputSharing.getBuffer();
if (!engine.isOutboundDone()) {
@@ -412,7 +412,7 @@ public class NioSslEngine implements NioFilter {
engine.closeOutbound();
}
} finally {
- outputBufferVendor.destruct();
+ outputSharing.destruct();
}
}
@@ -422,12 +422,16 @@ public class NioSslEngine implements NioFilter {
}
@VisibleForTesting
- public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException {
- return outputBufferVendor;
+ public ByteBufferSharing shareOutputBuffer() throws IOException {
+ return outputSharing.open();
}
- @VisibleForTesting
- public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException {
- return inputBufferVendor;
+ private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
+ throws OpenAttemptTimedOut, IOException {
+ return outputSharing.open(time, unit);
+ }
+
+ public ByteBufferSharing shareInputBuffer() throws IOException {
+ return inputSharing.open();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
index a31bbb2..a232fca 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java
@@ -803,16 +803,21 @@ public class SocketCreator extends TcpSocketCreatorImpl {
* @param socketChannel the socket's NIO channel
* @param engine the sslEngine (see createSSLEngine)
* @param timeout handshake timeout in milliseconds. No timeout if <= 0
+ * @param clientSocket set to true if you initiated the connect(), false if you accepted it
* @param peerNetBuffer the buffer to use in reading data fron socketChannel. This should also be
* used in subsequent I/O operations
* @return The SSLEngine to be used in processing data for sending/receiving from the channel
*/
- public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel,
- SSLEngine engine,
+ public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, SSLEngine engine,
int timeout,
+ boolean clientSocket,
ByteBuffer peerNetBuffer,
BufferPool bufferPool)
throws IOException {
+ engine.setUseClientMode(clientSocket);
+ if (!clientSocket) {
+ engine.setNeedClientAuth(sslConfig.isRequireAuth());
+ }
while (!socketChannel.finishConnect()) {
try {
Thread.sleep(50);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 107aa9f..f5a1886 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -18,7 +18,6 @@ import static java.lang.Boolean.FALSE;
import static java.lang.ThreadLocal.withInitial;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
-import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER;
import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
import java.io.DataInput;
@@ -80,7 +79,6 @@ import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.ByteBufferSharing;
-import org.apache.geode.internal.net.ByteBufferVendor;
import org.apache.geode.internal.net.NioFilter;
import org.apache.geode.internal.net.NioPlainEngine;
import org.apache.geode.internal.net.SocketCreator;
@@ -119,7 +117,7 @@ public class Connection implements Runnable {
* Small buffer used for send socket buffer on receiver connections and receive buffer on sender
* connections.
*/
- public static final int SMALL_BUFFER_SIZE =
+ static final int SMALL_BUFFER_SIZE =
Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
/**
@@ -311,14 +309,11 @@ public class Connection implements Runnable {
/** name of thread that we're currently performing an operation in (may be null) */
private String ackThreadName;
- /*
- * This object mediates access to the input ByteBuffer and ensures its return to
- * pool after last use. This reference couldn't be final since it is initialized
- * in createIoFilter() not in the constructors. It had to be initialized there
- * because in general we have to construct an SSLEngine before we know the buffer
- * size and createIoFilter() is where we create that object.
- */
- private ByteBufferVendor inputBufferVendor;
+ /** the buffer used for message receipt */
+ private ByteBuffer inputBuffer;
+
+ /** Lock used to protect the input buffer */
+ public final Object inputBufferLock = new Object();
/** the length of the next message to be dispatched */
private int messageLength;
@@ -894,28 +889,27 @@ public class Connection implements Runnable {
waitForAddressCompletion();
InternalDistributedMember myAddr = owner.getConduit().getMemberId();
- try (final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE)) {
- /*
- * Note a byte of zero is always written because old products serialized a member id with
- * always sends an ip address. My reading of the ip-address specs indicated that the first
- * byte of a valid address would never be 0.
- */
- connectHandshake.writeByte(0);
- connectHandshake.writeByte(HANDSHAKE_VERSION);
- // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
- InternalDataSerializer.invokeToData(myAddr, connectHandshake);
- connectHandshake.writeBoolean(sharedResource);
- connectHandshake.writeBoolean(preserveOrder);
- connectHandshake.writeLong(uniqueId);
- // write the product version ordinal
- Version.CURRENT.writeOrdinal(connectHandshake, true);
- connectHandshake.writeInt(dominoCount.get() + 1);
- // this writes the sending member + thread name that is stored in senderName
- // on the receiver to show the cause of reader thread creation
- connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
- MsgIdGenerator.NO_MSG_ID);
- writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
- }
+ final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
+ /*
+ * Note a byte of zero is always written because old products serialized a member id with always
+ * sends an ip address. My reading of the ip-address specs indicated that the first byte of a
+ * valid address would never be 0.
+ */
+ connectHandshake.writeByte(0);
+ connectHandshake.writeByte(HANDSHAKE_VERSION);
+ // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
+ InternalDataSerializer.invokeToData(myAddr, connectHandshake);
+ connectHandshake.writeBoolean(sharedResource);
+ connectHandshake.writeBoolean(preserveOrder);
+ connectHandshake.writeLong(uniqueId);
+ // write the product version ordinal
+ Version.CURRENT.writeOrdinal(connectHandshake, true);
+ connectHandshake.writeInt(dominoCount.get() + 1);
+ // this writes the sending member + thread name that is stored in senderName
+ // on the receiver to show the cause of reader thread creation
+ connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
+ MsgIdGenerator.NO_MSG_ID);
+ writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
}
/**
@@ -1359,7 +1353,7 @@ public class Connection implements Runnable {
if (!isReceiver && !hasResidualReaderThread()) {
// receivers release the input buffer when exiting run(). Senders use the
// inputBuffer for reading direct-reply responses
- inputBufferVendor.destruct();
+ releaseInputBuffer();
}
lengthSet = false;
}
@@ -1500,7 +1494,7 @@ public class Connection implements Runnable {
}
}
- inputBufferVendor.destruct();
+ releaseInputBuffer();
// make sure that if the reader thread exits we notify a thread waiting for the handshake.
notifyHandshakeWaiter(false);
@@ -1512,6 +1506,16 @@ public class Connection implements Runnable {
}
}
+ private void releaseInputBuffer() {
+ synchronized (inputBufferLock) {
+ ByteBuffer tmp = inputBuffer;
+ if (tmp != null) {
+ inputBuffer = null;
+ getBufferPool().releaseReceiveBuffer(tmp);
+ }
+ }
+ }
+
BufferPool getBufferPool() {
return owner.getBufferPool();
}
@@ -1533,6 +1537,9 @@ public class Connection implements Runnable {
}
private void readMessages() {
+ if (closing.get()) {
+ return;
+ }
// take a snapshot of uniqueId to detect reconnect attempts
SocketChannel channel;
try {
@@ -1578,6 +1585,8 @@ public class Connection implements Runnable {
// we should not change the state of the connection if we are a handshake reader thread
// as there is a race between this thread and the application thread doing direct ack
boolean handshakeHasBeenRead = false;
+ // if we're using SSL/TLS the input buffer may already have data to process
+ boolean skipInitialRead = getInputBuffer().position() > 0;
try {
for (boolean isInitialRead = true;;) {
if (stopped) {
@@ -1597,9 +1606,8 @@ public class Connection implements Runnable {
break;
}
- try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
- ByteBuffer buff = inputSharing.getBuffer();
-
+ try {
+ ByteBuffer buff = getInputBuffer();
synchronized (stateLock) {
connectionState = STATE_READING;
}
@@ -1608,8 +1616,6 @@ public class Connection implements Runnable {
amountRead = channel.read(buff);
} else {
isInitialRead = false;
- // if we're using SSL/TLS the input buffer may already have data to process
- final boolean skipInitialRead = buff.position() > 0;
if (!skipInitialRead) {
amountRead = channel.read(buff);
} else {
@@ -1674,11 +1680,11 @@ public class Connection implements Runnable {
} catch (IOException e) {
// "Socket closed" check needed for Solaris jdk 1.4.2_08
if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
- if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
- logger.info("{} io exception for {}", p2pReaderName(), this, e);
+ if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
+ logger.debug("{} io exception for {}", p2pReaderName(), this, e);
}
- if (logger.isDebugEnabled()) {
- if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
+ if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) {
+ if (logger.isDebugEnabled()) {
logger.debug(
"{} received unexpected WSACancelBlockingCall exception, which may result in a hang",
p2pReaderName());
@@ -1722,55 +1728,28 @@ public class Connection implements Runnable {
private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException {
if (getConduit().useSSL() && channel != null) {
InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
- String hostName;
- if (remoteAddr != null) {
- hostName = remoteAddr.getHostName();
- } else {
- hostName = SocketCreator.getHostName(address.getAddress());
- }
SSLEngine engine =
- getConduit().getSocketCreator().createSSLEngine(hostName,
- address.getPort(), clientSocket);
-
- final int packetBufferSize = engine.getSession().getPacketBufferSize();
-
- inputBufferVendor =
- new ByteBufferVendor(
- getBufferPool().acquireDirectReceiveBuffer(packetBufferSize),
- TRACKED_RECEIVER,
- getBufferPool());
+ getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort(),
+ clientSocket);
+ int packetBufferSize = engine.getSession().getPacketBufferSize();
+ if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) {
+ // TLS has a minimum input buffer size constraint
+ if (inputBuffer != null) {
+ getBufferPool().releaseReceiveBuffer(inputBuffer);
+ }
+ inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize);
+ }
if (channel.socket().getReceiveBufferSize() < packetBufferSize) {
channel.socket().setReceiveBufferSize(packetBufferSize);
}
if (channel.socket().getSendBufferSize() < packetBufferSize) {
channel.socket().setSendBufferSize(packetBufferSize);
}
- try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
- final ByteBuffer inputBuffer = inputSharing.getBuffer();
- /*
- * It's ok to share the inputBuffer with handshakeSSLSocketChannel() since that method
- * accesses the referenced buffer for the handshake which completes before returning
- * control here. The NioSslEngine retains no reference to the buffer.
- */
- ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
- getConduit().idleConnectionTimeout, inputBuffer,
- getBufferPool());
- }
+ ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine,
+ getConduit().idleConnectionTimeout, clientSocket, inputBuffer,
+ getBufferPool());
} else {
- final int allocSize;
- if (recvBufferSize == -1) {
- allocSize = owner.getConduit().tcpBufferSize;
- } else {
- allocSize = recvBufferSize;
- }
-
- inputBufferVendor =
- new ByteBufferVendor(
- getBufferPool().acquireDirectReceiveBuffer(allocSize),
- TRACKED_RECEIVER,
- getBufferPool());
-
ioFilter = new NioPlainEngine(getBufferPool());
}
}
@@ -1802,13 +1781,9 @@ public class Connection implements Runnable {
}
msg = msg.toLowerCase();
-
- if (e instanceof SSLException && msg.contains("status = closed")) {
- return true; // engine has been closed - this is normal
- }
-
- return (msg.contains("forcibly closed") || msg.contains("reset by peer")
- || msg.contains("connection reset") || msg.contains("socket is closed"));
+ return msg.contains("forcibly closed")
+ || msg.contains("reset by peer")
+ || msg.contains("connection reset");
}
private static boolean validMsgType(int msgType) {
@@ -2652,6 +2627,20 @@ public class Connection implements Runnable {
}
/**
+ * gets the buffer for receiving message length bytes
+ */
+ private ByteBuffer getInputBuffer() {
+ if (inputBuffer == null) {
+ int allocSize = recvBufferSize;
+ if (allocSize == -1) {
+ allocSize = owner.getConduit().tcpBufferSize;
+ }
+ inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
+ }
+ return inputBuffer;
+ }
+
+ /**
* @throws SocketTimeoutException if wait expires.
* @throws ConnectionException if ack is not received
*/
@@ -2758,93 +2747,72 @@ public class Connection implements Runnable {
* deserialized and passed to TCPConduit for further processing
*/
private void processInputBuffer() throws ConnectionException, IOException {
- try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) {
- // can't be final because in some cases we expand the buffer (resulting in a new object)
- ByteBuffer inputBuffer = inputSharing.getBuffer();
- inputBuffer.flip();
+ inputBuffer.flip();
- try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
- final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+ try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
+ final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
- peerDataBuffer.flip();
+ peerDataBuffer.flip();
- boolean done = false;
+ boolean done = false;
- while (!done && connected) {
- owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
- int remaining = peerDataBuffer.remaining();
- if (lengthSet || remaining >= MSG_HEADER_BYTES) {
- if (!lengthSet) {
- if (readMessageHeader(peerDataBuffer)) {
- break;
- }
+ while (!done && connected) {
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ int remaining = peerDataBuffer.remaining();
+ if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+ if (!lengthSet) {
+ if (readMessageHeader(peerDataBuffer)) {
+ break;
}
- if (remaining >= messageLength + MSG_HEADER_BYTES) {
- lengthSet = false;
- peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
- // don't trust the message deserialization to leave the position in
- // the correct spot. Some of the serialization uses buffered
- // streams that can leave the position at the wrong spot
- int startPos = peerDataBuffer.position();
- int oldLimit = peerDataBuffer.limit();
- peerDataBuffer.limit(startPos + messageLength);
-
- if (handshakeRead) {
- try {
- readMessage(peerDataBuffer);
- } catch (SerializationException e) {
- logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
- throw e;
- }
- } else {
- try (ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
- DataInputStream dis = new DataInputStream(bbis)) {
- if (!isReceiver) {
- // we read the handshake and then stop processing since we don't want
- // to process the input buffer anymore in a handshake thread
- readHandshakeForSender(dis, peerDataBuffer);
- return;
- }
- if (readHandshakeForReceiver(dis)) {
- ioFilter.doneReading(peerDataBuffer);
- return;
- }
- }
- }
- if (!connected) {
- continue;
+ }
+ if (remaining >= messageLength + MSG_HEADER_BYTES) {
+ lengthSet = false;
+ peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
+ // don't trust the message deserialization to leave the position in
+ // the correct spot. Some of the serialization uses buffered
+ // streams that can leave the position at the wrong spot
+ int startPos = peerDataBuffer.position();
+ int oldLimit = peerDataBuffer.limit();
+ peerDataBuffer.limit(startPos + messageLength);
+
+ if (handshakeRead) {
+ try {
+ readMessage(peerDataBuffer);
+ } catch (SerializationException e) {
+ logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
+ throw e;
}
- accessed();
- peerDataBuffer.limit(oldLimit);
- peerDataBuffer.position(startPos + messageLength);
} else {
- done = true;
- if (getConduit().useSSL()) {
+ ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
+ DataInputStream dis = new DataInputStream(bbis);
+ if (!isReceiver) {
+ // we read the handshake and then stop processing since we don't want
+ // to process the input buffer anymore in a handshake thread
+ readHandshakeForSender(dis, peerDataBuffer);
+ return;
+ }
+ if (readHandshakeForReceiver(dis)) {
ioFilter.doneReading(peerDataBuffer);
- } else {
- // compact or resize the buffer
- final int oldBufferSize = inputBuffer.capacity();
- final int allocSize = messageLength + MSG_HEADER_BYTES;
- if (oldBufferSize < allocSize) {
- // need a bigger buffer
- logger.info(
- "Allocating larger network read buffer, new size is {} old size was {}.",
- allocSize, oldBufferSize);
- inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize);
- } else {
- if (inputBuffer.position() != 0) {
- inputBuffer.compact();
- } else {
- inputBuffer.position(inputBuffer.limit());
- inputBuffer.limit(inputBuffer.capacity());
- }
- }
+ return;
}
}
+ if (!connected) {
+ continue;
+ }
+ accessed();
+ peerDataBuffer.limit(oldLimit);
+ peerDataBuffer.position(startPos + messageLength);
} else {
- ioFilter.doneReading(peerDataBuffer);
done = true;
+ if (getConduit().useSSL()) {
+ ioFilter.doneReading(peerDataBuffer);
+ } else {
+ compactOrResizeBuffer(messageLength);
+ }
}
+ } else {
+ ioFilter.doneReading(peerDataBuffer);
+ done = true;
}
}
}
@@ -2979,9 +2947,10 @@ public class Connection implements Runnable {
private void readMessage(ByteBuffer peerDataBuffer) {
if (messageType == NORMAL_MSG_TYPE) {
owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
- try (ByteBufferInputStream bbis =
+ ByteBufferInputStream bbis =
remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
- : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) {
+ : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
+ try {
ReplyProcessor21.initMessageRPId();
// add serialization stats
long startSer = owner.getConduit().getStats().startMsgDeserialization();
@@ -3234,8 +3203,34 @@ public class Connection implements Runnable {
private void setThreadName(int dominoNumber) {
Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " "
+ (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un")
- + "ordered sender uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
- + " local port=" + socket.getLocalPort() + " remote port=" + socket.getPort());
+ + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
+ + " port=" + socket.getPort());
+ }
+
+ private void compactOrResizeBuffer(int messageLength) {
+ final int oldBufferSize = inputBuffer.capacity();
+ int allocSize = messageLength + MSG_HEADER_BYTES;
+ if (oldBufferSize < allocSize) {
+ // need a bigger buffer
+ logger.info("Allocating larger network read buffer, new size is {} old size was {}.",
+ allocSize, oldBufferSize);
+ ByteBuffer oldBuffer = inputBuffer;
+ inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
+
+ if (oldBuffer != null) {
+ int oldByteCount = oldBuffer.remaining();
+ inputBuffer.put(oldBuffer);
+ inputBuffer.position(oldByteCount);
+ getBufferPool().releaseReceiveBuffer(oldBuffer);
+ }
+ } else {
+ if (inputBuffer.position() != 0) {
+ inputBuffer.compact();
+ } else {
+ inputBuffer.position(inputBuffer.limit());
+ inputBuffer.limit(inputBuffer.capacity());
+ }
+ }
}
private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck)
@@ -3320,7 +3315,6 @@ public class Connection implements Runnable {
* socket is properly closed at this end. When that is the case isResidualReaderThread
* will return true.
*/
- @VisibleForTesting
public boolean hasResidualReaderThread() {
return hasResidualReaderThread;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
index fe4f08b..359d8aa 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java
@@ -32,11 +32,11 @@ import org.junit.Test;
public class ByteBufferVendorTest {
@FunctionalInterface
- private interface Foo {
+ private static interface Foo {
void run() throws IOException;
}
- private ByteBufferVendor sharingVendor;
+ private ByteBufferVendor sharing;
private BufferPool poolMock;
private CountDownLatch clientHasOpenedResource;
private CountDownLatch clientMayComplete;
@@ -44,7 +44,7 @@ public class ByteBufferVendorTest {
@Before
public void before() {
poolMock = mock(BufferPool.class);
- sharingVendor =
+ sharing =
new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
poolMock);
clientHasOpenedResource = new CountDownLatch(1);
@@ -54,7 +54,7 @@ public class ByteBufferVendorTest {
@Test
public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
- try (final ByteBufferSharing _unused = sharingVendor.open()) {
+ try (final ByteBufferSharing _unused = sharing.open()) {
}
});
}
@@ -62,7 +62,7 @@ public class ByteBufferVendorTest {
@Test
public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
- final ByteBufferSharing sharing2 = sharingVendor.open();
+ final ByteBufferSharing sharing2 = sharing.open();
sharing2.close();
verify(poolMock, times(0)).releaseBuffer(any(), any());
assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
@@ -73,7 +73,7 @@ public class ByteBufferVendorTest {
@Test
public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
clientIsLastReferenceHolder("client with balanced close calls", () -> {
- try (final ByteBufferSharing _unused = sharingVendor.open()) {
+ try (final ByteBufferSharing _unused = sharing.open()) {
clientHasOpenedResource.countDown();
blockClient();
}
@@ -83,42 +83,36 @@ public class ByteBufferVendorTest {
@Test
public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
clientIsLastReferenceHolder("client with extra close calls", () -> {
- final ByteBufferSharing sharing2 = sharingVendor.open();
+ final ByteBufferSharing sharing2 = sharing.open();
clientHasOpenedResource.countDown();
blockClient();
sharing2.close();
verify(poolMock, times(1)).releaseBuffer(any(), any());
assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+ System.out.println("here");
});
}
@Test
public void extraCloseDoesNotPrematurelyReturnBufferToPool() throws IOException {
- final ByteBufferSharing sharing2 = sharingVendor.open();
+ final ByteBufferSharing sharing2 = sharing.open();
sharing2.close();
assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
verify(poolMock, times(0)).releaseBuffer(any(), any());
- sharingVendor.destruct();
+ sharing.destruct();
verify(poolMock, times(1)).releaseBuffer(any(), any());
}
@Test
public void extraCloseDoesNotDecrementRefCount() throws IOException {
- final ByteBufferSharing sharing2 = sharingVendor.open();
+ final ByteBufferSharing sharing2 = sharing.open();
sharing2.close();
assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
- final ByteBufferSharing sharing3 = this.sharingVendor.open();
- sharingVendor.destruct();
+ final ByteBufferSharing sharing3 = this.sharing.open();
+ sharing.destruct();
verify(poolMock, times(0)).releaseBuffer(any(), any());
}
- @Test
- public void destructIsIdempotent() {
- sharingVendor.destruct();
- sharingVendor.destruct();
- verify(poolMock, times(1)).releaseBuffer(any(), any());
- }
-
private void resourceOwnerIsLastReferenceHolder(final String name, final Foo client)
throws InterruptedException {
/*
@@ -134,7 +128,7 @@ public class ByteBufferVendorTest {
verify(poolMock, times(0)).releaseBuffer(any(), any());
- sharingVendor.destruct();
+ sharing.destruct();
verify(poolMock, times(1)).releaseBuffer(any(), any());
}
@@ -153,7 +147,7 @@ public class ByteBufferVendorTest {
clientHasOpenedResource.await();
- sharingVendor.destruct();
+ sharing.destruct();
verify(poolMock, times(0)).releaseBuffer(any(), any());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index d6b9aa6..62a858c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -90,8 +90,7 @@ public class NioSslEngineTest {
@Test
public void engineUsesDirectBuffers() throws IOException {
- try (final ByteBufferSharing outputSharing =
- nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
assertThat(outputSharing.getBuffer().isDirect()).isTrue();
}
}
@@ -191,8 +190,7 @@ public class NioSslEngineTest {
@Test
public void wrap() throws Exception {
- try (final ByteBufferSharing outputSharing =
- nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
// make the application data too big to fit into the engine's encryption buffer
ByteBuffer appData =
@@ -223,8 +221,7 @@ public class NioSslEngineTest {
@Test
public void wrapFails() throws IOException {
- try (final ByteBufferSharing outputSharing =
- nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
// make the application data too big to fit into the engine's encryption buffer
ByteBuffer appData =
ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
@@ -247,8 +244,7 @@ public class NioSslEngineTest {
@Test
public void unwrapWithBufferOverflow() throws Exception {
- try (final ByteBufferSharing inputSharing =
- nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
// make the application data too big to fit into the engine's encryption buffer
final ByteBuffer peerAppData = inputSharing.getBuffer();
@@ -288,8 +284,7 @@ public class NioSslEngineTest {
@Test
public void unwrapWithBufferUnderflow() throws Exception {
- try (final ByteBufferSharing inputSharing =
- nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
ByteBuffer wrappedData =
ByteBuffer.allocate(inputSharing.getBuffer().capacity());
byte[] netBytes = new byte[wrappedData.capacity() / 2];
@@ -314,8 +309,7 @@ public class NioSslEngineTest {
@Test
public void unwrapWithDecryptionError() throws IOException {
- try (final ByteBufferSharing inputSharing =
- nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
// make the application data too big to fit into the engine's encryption buffer
ByteBuffer wrappedData =
ByteBuffer.allocate(inputSharing.getBuffer().capacity());
@@ -374,10 +368,10 @@ public class NioSslEngineTest {
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
new SSLEngineResult(CLOSED, FINISHED, 0, 0));
nioSslEngine.close(mockChannel);
- assertThatThrownBy(() -> nioSslEngine.getOutputBufferVendorForTestingOnly().open().getBuffer())
+ assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
.isInstanceOf(IOException.class)
.hasMessageContaining("NioSslEngine has been closed");
- assertThatThrownBy(() -> nioSslEngine.getInputBufferVendorForTestingOnly().open().getBuffer())
+ assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
.isInstanceOf(IOException.class)
.hasMessageContaining("NioSslEngine has been closed");
nioSslEngine.close(mockChannel);
@@ -407,8 +401,7 @@ public class NioSslEngineTest {
when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
- try (final ByteBufferSharing outputSharing =
- nioSslEngine.getOutputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
// give the NioSslEngine something to write on its socket channel, simulating a TLS close
// message
outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
@@ -444,8 +437,7 @@ public class NioSslEngineTest {
ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
SocketChannel mockChannel = mock(SocketChannel.class);
- try (final ByteBufferSharing inputSharing =
- nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
// force a compaction by making the decoded buffer appear near to being full
ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
@@ -495,7 +487,10 @@ public class NioSslEngineTest {
ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
- nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+ inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+ }
// simulate some socket reads
when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -523,8 +518,7 @@ public class NioSslEngineTest {
assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
// The initial available space in the unwrapped buffer should have doubled
int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
- try (final ByteBufferSharing inputSharing =
- nioSslEngine.getInputBufferVendorForTestingOnly().open()) {
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
assertThat(inputSharing.getBuffer().capacity())
.isEqualTo(2 * initialFreeSpace + preexistingBytes);
}
@@ -550,7 +544,10 @@ public class NioSslEngineTest {
// force buffer expansion by making a small decoded buffer appear near to being full
ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
- nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer);
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing;
+ inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+ }
// simulate some socket reads
when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index 40f1ed3..c064afb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -103,7 +103,6 @@ public class ConnectionTest {
TCPConduit tcpConduit = mock(TCPConduit.class);
when(connectionTable.getConduit()).thenReturn(tcpConduit);
- when(connectionTable.getBufferPool()).thenReturn(mock(BufferPool.class));
when(distributionConfig.getMemberTimeout()).thenReturn(100);
when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));