You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2020/10/30 17:28:59 UTC
[geode] 01/01: GEODE-8652: NioSslEngine.close() Bypasses Locks
(#5666)
This is an automated email from the ASF dual-hosted git repository.
burcham pushed a commit to branch backport-1-12-GEODE-8652-and-friends
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 1395c6afe4d943bae87665f580501e5df64de111
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Thu Oct 29 16:38:25 2020 -0700
GEODE-8652: NioSslEngine.close() Bypasses Locks (#5666)
- NioSslEngine.close() proceeds even if readers (or writers) are
operating on its ByteBuffers, allowing Connection.close() to close
its socket and proceed.
- NioSslEngine.close() needed a lock only on the output buffer, so
we split what was a single lock into two. Also instead of using
synchronized we use a ReentrantLock so we can
call tryLock() and time out if needed in NioSslEngine.close().
- Since readers/writers may hold locks on these input/output buffers
when NioSslEngine.close() is called a reference count is maintained
and the buffers are returned to the pool only when the last user
is done.
- To manage the locking and reference counting a new AutoCloseable
ByteBufferSharing interface is introduced with a trivial
implementation: ByteBufferSharingNoOp and a real implementation:
ByteBufferSharingImpl.
Co-authored-by: Bill Burcham <bi...@gmail.com>
Co-authored-by: Darrel Schneider <ds...@pivotal.io>
Co-authored-by: Ernie Burghardt <bu...@vmware.com>
(cherry picked from commit 08e9e9673d0ed05555a3d74c6d16e706817cab09)
---
.../tcp/ConnectionCloseSSLTLSDUnitTest.java | 235 ++++++++++++
.../org/apache/geode/internal/tcp/server.keystore | Bin 0 -> 1256 bytes
...LSocketHostNameVerificationIntegrationTest.java | 4 +-
.../internal/net/SSLSocketIntegrationTest.java | 57 +--
.../apache/geode/codeAnalysis/excludedClasses.txt | 1 +
.../geode/internal/net/ByteBufferSharing.java | 55 +++
.../geode/internal/net/ByteBufferSharingImpl.java | 148 ++++++++
.../geode/internal/net/ByteBufferSharingNoOp.java | 52 +++
.../org/apache/geode/internal/net/NioFilter.java | 69 ++--
.../apache/geode/internal/net/NioPlainEngine.java | 27 +-
.../apache/geode/internal/net/NioSslEngine.java | 357 +++++++++---------
.../org/apache/geode/internal/tcp/Connection.java | 34 +-
.../org/apache/geode/internal/tcp/MsgReader.java | 15 +-
.../internal/net/ByteBufferSharingImplTest.java | 163 +++++++++
.../geode/internal/net/NioPlainEngineTest.java | 47 ++-
.../geode/internal/net/NioSslEngineTest.java | 397 +++++++++++----------
16 files changed, 1199 insertions(+), 462 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
new file mode 100644
index 0000000..586cd53
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/ConnectionCloseSSLTLSDUnitTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.distributed.ConfigurationProperties.CONSERVE_SOCKETS;
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.NAME;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.ConfigurationProperties.SOCKET_LEASE_TIME;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_PROTOCOLS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase.getBlackboard;
+import static org.apache.geode.test.util.ResourceUtils.createTempFileFromResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave;
+import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+/**
+ * It would be nice if this test didn't need to use the cache since the test's purpose is to test
+ * that the {@link Connection} class can be closed while readers and writers hold locks on its
+ * internal TLS {@link ByteBuffer}s
+ *
+ * But this test does use the cache (region) because it enabled us to use existing cache messaging
+ * and to use the DistributionMessageObserver (observer) hooks.
+ *
+ * see also ClusterCommunicationsDUnitTest
+ */
+public class ConnectionCloseSSLTLSDUnitTest implements Serializable {
+
+ private static final int SMALL_BUFFER_SIZE = 8000;
+ private static final String UPDATE_ENTERED_GATE = "connectionCloseDUnitTest.regionUpdateEntered";
+ private static final String SUSPEND_UPDATE_GATE = "connectionCloseDUnitTest.suspendRegionUpdate";
+ private static final String regionName = "connectionCloseDUnitTestRegion";
+ private static final Logger logger = LogService.getLogger();
+
+ private static Cache cache;
+
+ @Rule
+ public DistributedRule distributedRule =
+ DistributedRule.builder().withVMCount(3).build();
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
+ private VM locator;
+ private VM sender;
+ private VM receiver;
+
+ @Before
+ public void before() {
+ locator = getVM(0);
+ sender = getVM(1);
+ receiver = getVM(2);
+ }
+
+ @After
+ public void after() {
+ receiver.invoke(() -> {
+ DistributionMessageObserver.setInstance(null);
+ });
+ }
+
+ @Test
+ public void connectionWithHungReaderIsCloseableAndUnhangsReader()
+ throws InterruptedException, TimeoutException {
+
+ getBlackboard().initBlackboard();
+
+ final int locatorPort = createLocator(locator);
+ createCacheAndRegion(sender, locatorPort);
+ createCacheAndRegion(receiver, locatorPort);
+
+ receiver
+ .invoke("set up DistributionMessageObserver to 'hang' sender's put (on receiver)",
+ () -> {
+ final DistributionMessageObserver observer =
+ new DistributionMessageObserver() {
+
+ @Override
+ public void beforeProcessMessage(final ClusterDistributionManager dm,
+ final DistributionMessage message) {
+ guardMessageProcessingHook(message, () -> {
+ try {
+ getBlackboard().signalGate(UPDATE_ENTERED_GATE);
+ getBlackboard().waitForGate(SUSPEND_UPDATE_GATE, 5, MINUTES);
+ } catch (TimeoutException | InterruptedException e) {
+ fail("message observus interruptus");
+ }
+ logger.info("BGB: got before process message: " + message);
+ });
+ }
+ };
+ DistributionMessageObserver.setInstance(observer);
+ });
+
+ final AsyncInvocation<Object> putInvocation = sender.invokeAsync("try a put", () -> {
+ final Region<Object, Object> region = cache.getRegion(regionName);
+ // test is going to close the cache while we are waiting for our ack
+ assertThatThrownBy(() -> {
+ region.put("hello", "world");
+ }).isInstanceOf(DistributedSystemDisconnectedException.class);
+ });
+
+ // wait until our message observer is blocked
+ getBlackboard().waitForGate(UPDATE_ENTERED_GATE, 5, MINUTES);
+
+ // at this point our put() is blocked waiting for a direct ack
+ assertThat(putInvocation.isAlive()).as("put is waiting for remote region to ack").isTrue();
+
+ /*
+ * Now close the cache. The point of calling it is to test that we don't block while trying
+ * to close connections. Cache.close() calls DistributedSystem.disconnect() which in turn
+ * closes all the connections (and their sockets.) We want the sockets to close because that'll
+ * cause our hung put() to see a DistributedSystemDisconnectedException.
+ */
+ sender.invoke("", () -> cache.close());
+
+ // wait for put task to complete: with an exception, that is!
+ putInvocation.get();
+
+ // un-stick our message observer
+ getBlackboard().signalGate(SUSPEND_UPDATE_GATE);
+ }
+
+ private void guardMessageProcessingHook(final DistributionMessage message,
+ final Runnable runnable) {
+ if (message instanceof UpdateMessage) {
+ final UpdateMessage updateMessage = (UpdateMessage) message;
+ if (updateMessage.getRegionPath().equals("/" + regionName)) {
+ runnable.run();
+ }
+ }
+ }
+
+ private int createLocator(VM memberVM) {
+ return memberVM.invoke("create locator", () -> {
+ // if you need to debug SSL communications use this property:
+ // System.setProperty("javax.net.debug", "all");
+ System.setProperty(GMSJoinLeave.BYPASS_DISCOVERY_PROPERTY, "true");
+ return Locator.startLocatorAndDS(0, new File(""), getDistributedSystemProperties())
+ .getPort();
+ });
+ }
+
+ private void createCacheAndRegion(VM memberVM, int locatorPort) {
+ memberVM.invoke("start cache and create region", () -> {
+ cache = createCache(locatorPort);
+ cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ });
+ }
+
+ private Cache createCache(int locatorPort) {
+ // if you need to debug SSL communications use this property:
+ // System.setProperty("javax.net.debug", "all");
+ Properties properties = getDistributedSystemProperties();
+ properties.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+ return new CacheFactory(properties).create();
+ }
+
+ private Properties getDistributedSystemProperties() {
+ Properties properties = new Properties();
+ properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+ properties.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+ properties.setProperty(NAME, "vm" + VM.getCurrentVMNum());
+ properties.setProperty(CONSERVE_SOCKETS, "false"); // we are testing direct ack
+ properties.setProperty(SOCKET_LEASE_TIME, "10000");
+ properties.setProperty(SOCKET_BUFFER_SIZE, "" + SMALL_BUFFER_SIZE);
+
+ properties.setProperty(SSL_ENABLED_COMPONENTS, "cluster,locator");
+ properties
+ .setProperty(SSL_KEYSTORE, createTempFileFromResource(getClass(), "server.keystore")
+ .getAbsolutePath());
+ properties.setProperty(SSL_TRUSTSTORE,
+ createTempFileFromResource(getClass(), "server.keystore")
+ .getAbsolutePath());
+ properties.setProperty(SSL_PROTOCOLS, "TLSv1.2");
+ properties.setProperty(SSL_KEYSTORE_PASSWORD, "password");
+ properties.setProperty(SSL_TRUSTSTORE_PASSWORD, "password");
+ properties.setProperty(SSL_REQUIRE_AUTHENTICATION, "true");
+ return properties;
+ }
+
+}
diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore
new file mode 100644
index 0000000..8b5305f
Binary files /dev/null and b/geode-core/src/distributedTest/resources/org/apache/geode/internal/tcp/server.keystore differ
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
index 5483457..91e5f55 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java
@@ -215,7 +215,9 @@ public class SSLSocketHostNameVerificationIntegrationTest {
final NioSslEngine nioSslEngine = engine;
engine.close(socket.getChannel());
assertThatThrownBy(() -> {
- nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
+ try (final ByteBufferSharing unused =
+ nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
+ }
})
.isInstanceOf(IOException.class);
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
index 4e6747b..7a3759b 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java
@@ -244,11 +244,13 @@ public class SSLSocketIntegrationTest {
ByteBuffer buffer = bbos.getContentBuffer();
System.out.println(
"client buffer position is " + buffer.position() + " and limit is " + buffer.limit());
- ByteBuffer wrappedBuffer = engine.wrap(buffer);
- System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
- + " and limit is " + wrappedBuffer.limit());
- int bytesWritten = clientChannel.write(wrappedBuffer);
- System.out.println("client bytes written is " + bytesWritten);
+ try (final ByteBufferSharing outputSharing = engine.wrap(buffer)) {
+ ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+ System.out.println("client wrapped buffer position is " + wrappedBuffer.position()
+ + " and limit is " + wrappedBuffer.limit());
+ int bytesWritten = clientChannel.write(wrappedBuffer);
+ System.out.println("client bytes written is " + bytesWritten);
+ }
}
private Thread startServerNIO(final ServerSocket serverSocket, int timeoutMillis)
@@ -279,7 +281,9 @@ public class SSLSocketIntegrationTest {
final NioSslEngine nioSslEngine = engine;
engine.close(socket.getChannel());
assertThatThrownBy(() -> {
- nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]));
+ try (final ByteBufferSharing unused =
+ nioSslEngine.unwrap(ByteBuffer.wrap(new byte[0]))) {
+ }
})
.isInstanceOf(IOException.class);
}
@@ -293,24 +297,35 @@ public class SSLSocketIntegrationTest {
private void readMessageFromNIOSSLClient(Socket socket, ByteBuffer buffer, NioSslEngine engine)
throws IOException {
- ByteBuffer unwrapped = engine.getUnwrappedBuffer(buffer);
- // if we already have unencrypted data skip unwrapping
- if (unwrapped.position() == 0) {
- int bytesRead;
- // if we already have encrypted data skip reading from the socket
- if (buffer.position() == 0) {
- bytesRead = socket.getChannel().read(buffer);
- buffer.flip();
+ try (final ByteBufferSharing sharedBuffer = engine.getUnwrappedBuffer()) {
+ final ByteBuffer unwrapped = sharedBuffer.getBuffer();
+ // if we already have unencrypted data skip unwrapping
+ if (unwrapped.position() == 0) {
+ int bytesRead;
+ // if we already have encrypted data skip reading from the socket
+ if (buffer.position() == 0) {
+ bytesRead = socket.getChannel().read(buffer);
+ buffer.flip();
+ } else {
+ bytesRead = buffer.remaining();
+ }
+ System.out.println("server bytes read is " + bytesRead + ": buffer position is "
+ + buffer.position() + " and limit is " + buffer.limit());
+ try (final ByteBufferSharing sharedBuffer2 = engine.unwrap(buffer)) {
+ final ByteBuffer unwrapped2 = sharedBuffer2.getBuffer();
+
+ unwrapped2.flip();
+ System.out.println("server unwrapped buffer position is " + unwrapped2.position()
+ + " and limit is " + unwrapped2.limit());
+ finishReadMessageFromNIOSSLClient(unwrapped2);
+ }
} else {
- bytesRead = buffer.remaining();
+ finishReadMessageFromNIOSSLClient(unwrapped);
}
- System.out.println("server bytes read is " + bytesRead + ": buffer position is "
- + buffer.position() + " and limit is " + buffer.limit());
- unwrapped = engine.unwrap(buffer);
- unwrapped.flip();
- System.out.println("server unwrapped buffer position is " + unwrapped.position()
- + " and limit is " + unwrapped.limit());
}
+ }
+
+ private void finishReadMessageFromNIOSSLClient(final ByteBuffer unwrapped) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(unwrapped);
DataInputStream dis = new DataInputStream(bbis);
String welcome = dis.readUTF();
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 0de147d..af3bd1e 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -104,3 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType
org/apache/geode/cache/query/internal/xml/ElementType$1
org/apache/geode/cache/query/internal/xml/ElementType$2
org/apache/geode/cache/query/internal/xml/ElementType$3
+org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut
\ No newline at end of file
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
new file mode 100644
index 0000000..cdfa897
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+
+/**
+ * When a {@link ByteBufferSharing} is acquired in a try-with-resources the buffer is available (for
+ * reading and modification) within the scope of that try block.
+ *
+ * Releases managed ByteBuffer back to pool after last reference is dropped.
+ */
+public interface ByteBufferSharing extends AutoCloseable {
+
+ /**
+ * Call this method only within a try-with-resource in which this {@link ByteBufferSharing} was
+ * acquired. Retain the reference only within the scope of that try-with-resources.
+ *
+ * @return the buffer: manipulable only within the scope of the try-with-resources
+ * @throws IOException if the buffer is no longer accessible
+ */
+ ByteBuffer getBuffer() throws IOException;
+
+ /**
+ * Expand the buffer if needed. This may return a different object so be sure to pay attention to
+ * the return value if you need access to the potentially- expanded buffer.
+ *
+ * Subsequent calls to {@link #getBuffer()} will return that new buffer too.
+ *
+ * @return the same buffer or a different (bigger) buffer
+ * @throws IOException if the buffer is no longer accessible
+ */
+ ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException;
+
+ /**
+ * Override {@link AutoCloseable#close()} without throws clause since we don't need one.
+ */
+ @Override
+ void close();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
new file mode 100644
index 0000000..e9a941e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingImpl.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.net.BufferPool.BufferType;
+
+/**
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
+ * try-with-resources.
+ */
+class ByteBufferSharingImpl implements ByteBufferSharing {
+
+ static class OpenAttemptTimedOut extends Exception {
+ }
+
+ private final Lock lock;
+ private final AtomicBoolean isClosed;
+ // mutable because in general our ByteBuffer may need to be resized (grown or compacted)
+ private ByteBuffer buffer;
+ private final BufferType bufferType;
+ private final AtomicInteger counter;
+ private final BufferPool bufferPool;
+
+ /**
+ * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}).
+ *
+ * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed
+ * to an external object or is returned to an external caller.)
+ *
+ * This constructor acquires no lock. The reference count will be 1 after this constructor
+ * completes.
+ */
+ ByteBufferSharingImpl(final ByteBuffer buffer, final BufferType bufferType,
+ final BufferPool bufferPool) {
+ this.buffer = buffer;
+ this.bufferType = bufferType;
+ this.bufferPool = bufferPool;
+ lock = new ReentrantLock();
+ counter = new AtomicInteger(1);
+ isClosed = new AtomicBoolean(false);
+ }
+
+ /**
+ * The destructor. Called by the resource owner to undo the work of the constructor.
+ */
+ void destruct() {
+ if (isClosed.compareAndSet(false, true)) {
+ dropReference();
+ }
+ }
+
+ /**
+ * This method is for use only by the owner of the shared resource. It's used for handing out
+ * references to the shared resource. So it does reference counting and also acquires a lock.
+ *
+ * Resource owners call this method as the last thing before returning a reference to the caller.
+ * That caller binds that reference to a variable in a try-with-resources statement and relies on
+ * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block.
+ */
+ ByteBufferSharing open() {
+ lock.lock();
+ addReference();
+ return this;
+ }
+
+ /**
+ * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time.
+ */
+ ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut {
+ try {
+ if (!lock.tryLock(time, unit)) {
+ throw new OpenAttemptTimedOut();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new OpenAttemptTimedOut();
+ }
+ addReference();
+ return this;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() throws IOException {
+ if (isClosed.get()) {
+ throw new IOException("NioSslEngine has been closed");
+ } else {
+ return buffer;
+ }
+ }
+
+ @Override
+ public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+ return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity);
+ }
+
+ @Override
+ public void close() {
+ /*
+ * We are counting on our ReentrantLock throwing an exception if the current thread
+ * does not hold the lock. In that case dropReference() will not be called. This
+ * prevents ill-behaved clients (clients that call close() too many times) from
+ * corrupting our reference count.
+ */
+ lock.unlock();
+ dropReference();
+ }
+
+ private int addReference() {
+ return counter.incrementAndGet();
+ }
+
+ private int dropReference() {
+ final int usages = counter.decrementAndGet();
+ if (usages == 0) {
+ bufferPool.releaseBuffer(bufferType, buffer);
+ }
+ return usages;
+ }
+
+ @VisibleForTesting
+ public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) {
+ buffer = newBufferForTesting;
+ }
+
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
new file mode 100644
index 0000000..bd707e3
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a
+ * {@link ByteBuffer}) is available (for reading and modification) in the scope of the
+ * try-with-resources.
+ *
+ * This implementation is a "no-op". It performs no actual locking and no reference counting. It's
+ * meant for use with the {@link NioPlainEngine} only, since that engine keeps no buffers and so,
+ * needs no reference counting on buffers, nor any synchronization around access to buffers.
+ *
+ * See also {@link ByteBufferSharingImpl}
+ */
+class ByteBufferSharingNoOp implements ByteBufferSharing {
+
+ private final ByteBuffer buffer;
+
+ ByteBufferSharingNoOp(final ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException {
+ throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine");
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 9c437ad..eb53f0e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -19,47 +19,53 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
- * Prior to transmitting a buffer or processing a received buffer
- * a NioFilter should be called to wrap (transmit) or unwrap (received)
- * the buffer in case SSL is being used.<br>
- * Implementations of this class may not be thread-safe in regard to
- * the buffers their methods return. These may be internal state that,
- * if used concurrently by multiple threads could cause corruption.
- * Appropriate external synchronization must be used in order to provide
- * thread-safety. Do this by invoking getSynchObject() and synchronizing on
- * the returned object while using the buffer.
+ * Prior to transmitting a buffer or processing a received buffer a NioFilter should be called to
+ * wrap (transmit) or unwrap (received) the buffer in case SSL is being used.<br>
+ * Implementations of
+ * this class may not be thread-safe in regard to the buffers their methods return. These may be
+ * internal state that, if used concurrently by multiple threads could cause corruption. Appropriate
+ * external synchronization must be used in order to provide thread-safety. Do this by invoking
+ * getSynchObject() and synchronizing on the returned object while using the buffer.
*/
public interface NioFilter {
/**
* wrap bytes for transmission to another process
+ *
+ * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+ * to call this method in a try-with-resources statement.
*/
- ByteBuffer wrap(ByteBuffer buffer) throws IOException;
+ ByteBufferSharing wrap(ByteBuffer buffer) throws IOException;
/**
- * unwrap bytes received from another process. The unwrapped
- * buffer should be flipped before reading. When done reading invoke
- * doneReading() to reset for future read ops
+ * unwrap bytes received from another process. The unwrapped buffer should be flipped before
+ * reading. When done reading invoke doneReading() to reset for future read ops
+ *
+ * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+ * to call this method in a try-with-resources statement.
*/
- ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException;
+ ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException;
/**
- * ensure that the wrapped buffer has enough room to read the given amount of data.
- * This must be invoked before readAtLeast. A new buffer may be returned by this method.
+ * ensure that the wrapped buffer has enough room to read the given amount of data. This must be
+ * invoked before readAtLeast. A new buffer may be returned by this method.
*/
ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
BufferPool.BufferType bufferType);
/**
- * read at least the indicated amount of bytes from the given
- * socket. The buffer position will be ready for reading
- * the data when this method returns. Note: you must invoke ensureWrappedCapacity
- * with the given amount prior to each invocation of this method.
+ * read at least the indicated amount of bytes from the given socket. The buffer position will be
+ * ready for reading the data when this method returns. Note: you must invoke
+ * ensureWrappedCapacity with the given amount prior to each invocation of this method.
* <br>
* wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br>
- * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
+ * unwrappedBuffer
+ * = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
+ *
+ * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+ * to call this method in a try-with-resources statement.
*/
- ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
+ ByteBufferSharing readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer)
throws IOException;
/**
@@ -81,28 +87,19 @@ public interface NioFilter {
}
}
- default boolean isClosed() {
- return false;
- }
-
/**
* invoke this method when you are done using the NioFilter
- *
*/
default void close(SocketChannel socketChannel) {
// nothing by default
}
/**
- * returns the unwrapped byte buffer associated with the given wrapped buffer.
+ * Returns the sharing object for the {@link NioFilter}'s unwrapped buffer, if one exists.
+ *
+ * Be sure to call close() on the returned {@link ByteBufferSharing}. The best way to do that is
+ * to call this method in a try-with-resources statement.
*/
- ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
+ ByteBufferSharing getUnwrappedBuffer();
- /**
- * returns an object to be used in synchronizing on the use of buffers returned by
- * a NioFilter.
- */
- default Object getSynchObject() {
- return this;
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 3ebce38..8b5df96 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import org.apache.geode.annotations.internal.MakeImmutable;
import org.apache.geode.internal.Assert;
/**
@@ -27,6 +28,12 @@ import org.apache.geode.internal.Assert;
* secure communications.
*/
public class NioPlainEngine implements NioFilter {
+
+ // this variable requires the MakeImmutable annotation but the buffer is empty and
+ // not really modifiable
+ @MakeImmutable
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
private final BufferPool bufferPool;
int lastReadPosition;
@@ -38,14 +45,14 @@ public class NioPlainEngine implements NioFilter {
}
@Override
- public ByteBuffer wrap(ByteBuffer buffer) {
- return buffer;
+ public ByteBufferSharing wrap(ByteBuffer buffer) {
+ return shareBuffer(buffer);
}
@Override
- public ByteBuffer unwrap(ByteBuffer wrappedBuffer) {
+ public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) {
wrappedBuffer.position(wrappedBuffer.limit());
- return wrappedBuffer;
+ return shareBuffer(wrappedBuffer);
}
@Override
@@ -82,7 +89,7 @@ public class NioPlainEngine implements NioFilter {
}
@Override
- public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
+ public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer)
throws IOException {
ByteBuffer buffer = wrappedBuffer;
@@ -108,7 +115,7 @@ public class NioPlainEngine implements NioFilter {
buffer.position(lastProcessedPosition);
lastProcessedPosition += bytes;
- return buffer;
+ return shareBuffer(buffer);
}
public void doneReading(ByteBuffer unwrappedBuffer) {
@@ -121,8 +128,12 @@ public class NioPlainEngine implements NioFilter {
}
@Override
- public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
- return wrappedBuffer;
+ public ByteBufferSharing getUnwrappedBuffer() {
+ return shareBuffer(EMPTY_BUFFER);
+ }
+
+ private ByteBufferSharingNoOp shareBuffer(final ByteBuffer wrappedBuffer) {
+ return new ByteBufferSharingNoOp(wrappedBuffer);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2398b35..7e642ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -40,48 +40,48 @@ import javax.net.ssl.SSLSession;
import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
-import org.apache.geode.annotations.internal.MakeImmutable;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.internal.net.BufferPool.BufferType;
+import org.apache.geode.internal.net.ByteBufferSharingImpl.OpenAttemptTimedOut;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
- * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread
- * safe. Its use should be confined to one thread or should be protected by external
- * synchronization.
+ * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This class is not thread safe.
+ * Its use should be confined to one thread or should be protected by external synchronization.
*/
public class NioSslEngine implements NioFilter {
private static final Logger logger = LogService.getLogger();
- // this variable requires the MakeImmutable annotation but the buffer is empty and
- // not really modifiable
- @MakeImmutable
- private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
private final BufferPool bufferPool;
- private volatile boolean closed;
+ private boolean closed;
SSLEngine engine;
/**
- * myNetData holds bytes wrapped by the SSLEngine
+ * holds bytes wrapped by the SSLEngine; a.k.a. myNetData
*/
- ByteBuffer myNetData;
+ private final ByteBufferSharingImpl outputSharing;
/**
- * peerAppData holds the last unwrapped data from a peer
+ * holds the last unwrapped data from a peer; a.k.a. peerAppData
*/
- ByteBuffer peerAppData;
+ private final ByteBufferSharingImpl inputSharing;
NioSslEngine(SSLEngine engine, BufferPool bufferPool) {
SSLSession session = engine.getSession();
int appBufferSize = session.getApplicationBufferSize();
int packetBufferSize = engine.getSession().getPacketBufferSize();
+ closed = false;
this.engine = engine;
this.bufferPool = bufferPool;
- this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize);
- this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize);
+ outputSharing =
+ new ByteBufferSharingImpl(bufferPool.acquireDirectSenderBuffer(packetBufferSize),
+ TRACKED_SENDER, bufferPool);
+ inputSharing =
+ new ByteBufferSharingImpl(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize),
+ TRACKED_RECEIVER, bufferPool);
}
/**
@@ -135,57 +135,65 @@ public class NioSslEngine implements NioFilter {
switch (status) {
case NEED_UNWRAP:
- // Receive handshaking data from peer
- int dataRead = socketChannel.read(handshakeBuffer);
-
- // Process incoming handshaking data
- handshakeBuffer.flip();
- engineResult = engine.unwrap(handshakeBuffer, peerAppData);
- handshakeBuffer.compact();
- status = engineResult.getHandshakeStatus();
-
- // if we're not finished, there's nothing to process and no data was read let's hang out
- // for a little
- if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
- Thread.sleep(10);
- }
+ try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+ final ByteBuffer peerAppData = inputSharing.getBuffer();
+
+ // Receive handshaking data from peer
+ int dataRead = socketChannel.read(handshakeBuffer);
+
+ // Process incoming handshaking data
+ handshakeBuffer.flip();
+
+
+ engineResult = engine.unwrap(handshakeBuffer, peerAppData);
+ handshakeBuffer.compact();
+ status = engineResult.getHandshakeStatus();
+
+ // if we're not finished, there's nothing to process and no data was read let's hang out
+ // for a little
+ if (peerAppData.remaining() == 0 && dataRead == 0 && status == NEED_UNWRAP) {
+ Thread.sleep(10);
+ }
- if (engineResult.getStatus() == BUFFER_OVERFLOW) {
- peerAppData =
- expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2);
+ if (engineResult.getStatus() == BUFFER_OVERFLOW) {
+ inputSharing.expandWriteBufferIfNeeded(peerAppData.capacity() * 2);
+ }
+ break;
}
- break;
case NEED_WRAP:
- // Empty the local network packet buffer.
- myNetData.clear();
-
- // Generate handshaking data
- engineResult = engine.wrap(myAppData, myNetData);
- status = engineResult.getHandshakeStatus();
-
- // Check status
- switch (engineResult.getStatus()) {
- case BUFFER_OVERFLOW:
- myNetData =
- expandWriteBuffer(TRACKED_SENDER, myNetData,
- myNetData.capacity() * 2);
- break;
- case OK:
- myNetData.flip();
- // Send the handshaking data to peer
- while (myNetData.hasRemaining()) {
- socketChannel.write(myNetData);
- }
- break;
- case CLOSED:
- break;
- default:
- logger.info("handshake terminated with illegal state due to {}", status);
- throw new IllegalStateException(
- "Unknown SSLEngineResult status: " + engineResult.getStatus());
+ try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
+ final ByteBuffer myNetData = outputSharing.getBuffer();
+
+ // Empty the local network packet buffer.
+ myNetData.clear();
+
+ // Generate handshaking data
+ engineResult = engine.wrap(myAppData, myNetData);
+ status = engineResult.getHandshakeStatus();
+
+ // Check status
+ switch (engineResult.getStatus()) {
+ case BUFFER_OVERFLOW:
+ // no need to assign return value because we will never reference it
+ outputSharing.expandWriteBufferIfNeeded(myNetData.capacity() * 2);
+ break;
+ case OK:
+ myNetData.flip();
+ // Send the handshaking data to peer
+ while (myNetData.hasRemaining()) {
+ socketChannel.write(myNetData);
+ }
+ break;
+ case CLOSED:
+ break;
+ default:
+ logger.info("handshake terminated with illegal state due to {}", status);
+ throw new IllegalStateException(
+ "Unknown SSLEngineResult status: " + engineResult.getStatus());
+ }
+ break;
}
- break;
case NEED_TASK:
// Handle blocking tasks
handleBlockingTasks();
@@ -213,17 +221,6 @@ public class NioSslEngine implements NioFilter {
return true;
}
- ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing,
- int desiredCapacity) {
- return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity);
- }
-
- void checkClosed() throws IOException {
- if (closed) {
- throw new IOException("NioSslEngine has been closed");
- }
- }
-
void handleBlockingTasks() {
Runnable task;
while ((task = engine.getDelegatedTask()) != null) {
@@ -233,72 +230,77 @@ public class NioSslEngine implements NioFilter {
}
@Override
- public synchronized ByteBuffer wrap(ByteBuffer appData) throws IOException {
- checkClosed();
+ public ByteBufferSharing wrap(ByteBuffer appData) throws IOException {
+ try (final ByteBufferSharing outputSharing = shareOutputBuffer()) {
- myNetData.clear();
+ ByteBuffer myNetData = outputSharing.getBuffer();
- while (appData.hasRemaining()) {
- // ensure we have lots of capacity since encrypted data might
- // be larger than the app data
- int remaining = myNetData.capacity() - myNetData.position();
+ myNetData.clear();
- if (remaining < (appData.remaining() * 2)) {
- int newCapacity = expandedCapacity(appData, myNetData);
- myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity);
- }
+ while (appData.hasRemaining()) {
+ // ensure we have lots of capacity since encrypted data might
+ // be larger than the app data
+ int remaining = myNetData.capacity() - myNetData.position();
- SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
+ if (remaining < (appData.remaining() * 2)) {
+ int newCapacity = expandedCapacity(appData, myNetData);
+ myNetData = outputSharing.expandWriteBufferIfNeeded(newCapacity);
+ }
- if (wrapResult.getHandshakeStatus() == NEED_TASK) {
- handleBlockingTasks();
- }
+ SSLEngineResult wrapResult = engine.wrap(appData, myNetData);
- if (wrapResult.getStatus() != OK) {
- throw new SSLException("Error encrypting data: " + wrapResult);
+ if (wrapResult.getHandshakeStatus() == NEED_TASK) {
+ handleBlockingTasks();
+ }
+
+ if (wrapResult.getStatus() != OK) {
+ throw new SSLException("Error encrypting data: " + wrapResult);
+ }
}
- }
- myNetData.flip();
+ myNetData.flip();
- return myNetData;
+ return shareOutputBuffer();
+ }
}
@Override
- public synchronized ByteBuffer unwrap(ByteBuffer wrappedBuffer) throws IOException {
- checkClosed();
-
- // note that we do not clear peerAppData as it may hold a partial
- // message. TcpConduit, for instance, uses message chunking to
- // transmit large payloads and we may have read a partial chunk
- // during the previous unwrap
-
- peerAppData.limit(peerAppData.capacity());
- while (wrappedBuffer.hasRemaining()) {
- SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
- switch (unwrapResult.getStatus()) {
- case BUFFER_OVERFLOW:
- // buffer overflow expand and try again - double the available decryption space
- int newCapacity =
- (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
- newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
- peerAppData =
- bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, newCapacity);
- peerAppData.limit(peerAppData.capacity());
- break;
- case BUFFER_UNDERFLOW:
- // partial data - need to read more. When this happens the SSLEngine will not have
- // changed the buffer position
- wrappedBuffer.compact();
- return peerAppData;
- case OK:
- break;
- default:
- throw new SSLException("Error decrypting data: " + unwrapResult);
+ public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException {
+ try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
+
+ ByteBuffer peerAppData = inputSharing.getBuffer();
+
+ // note that we do not clear peerAppData as it may hold a partial
+ // message. TcpConduit, for instance, uses message chunking to
+ // transmit large payloads and we may have read a partial chunk
+ // during the previous unwrap
+
+ peerAppData.limit(peerAppData.capacity());
+ while (wrappedBuffer.hasRemaining()) {
+ SSLEngineResult unwrapResult = engine.unwrap(wrappedBuffer, peerAppData);
+ switch (unwrapResult.getStatus()) {
+ case BUFFER_OVERFLOW:
+ // buffer overflow expand and try again - double the available decryption space
+ int newCapacity =
+ (peerAppData.capacity() - peerAppData.position()) * 2 + peerAppData.position();
+ newCapacity = Math.max(newCapacity, peerAppData.capacity() / 2 * 3);
+ peerAppData = inputSharing.expandWriteBufferIfNeeded(newCapacity);
+ peerAppData.limit(peerAppData.capacity());
+ break;
+ case BUFFER_UNDERFLOW:
+ // partial data - need to read more. When this happens the SSLEngine will not have
+ // changed the buffer position
+ wrappedBuffer.compact();
+ return shareInputBuffer();
+ case OK:
+ break;
+ default:
+ throw new SSLException("Error decrypting data: " + unwrapResult);
+ }
}
+ wrappedBuffer.clear();
+ return shareInputBuffer();
}
- wrappedBuffer.clear();
- return peerAppData;
}
@Override
@@ -315,50 +317,45 @@ public class NioSslEngine implements NioFilter {
}
@Override
- public ByteBuffer readAtLeast(SocketChannel channel, int bytes,
+ public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes,
ByteBuffer wrappedBuffer) throws IOException {
- if (peerAppData.capacity() > bytes) {
- // we already have a buffer that's big enough
- if (peerAppData.capacity() - peerAppData.position() < bytes) {
- peerAppData.compact();
- peerAppData.flip();
- }
- }
+ try (final ByteBufferSharing inputSharing = shareInputBuffer()) {
- while (peerAppData.remaining() < bytes) {
- wrappedBuffer.limit(wrappedBuffer.capacity());
- int amountRead = channel.read(wrappedBuffer);
- if (amountRead < 0) {
- throw new EOFException();
+ ByteBuffer peerAppData = inputSharing.getBuffer();
+
+ if (peerAppData.capacity() > bytes) {
+ // we already have a buffer that's big enough
+ if (peerAppData.capacity() - peerAppData.position() < bytes) {
+ peerAppData.compact();
+ peerAppData.flip();
+ }
}
- if (amountRead > 0) {
- wrappedBuffer.flip();
- // prep the decoded buffer for writing
- peerAppData.compact();
- peerAppData = unwrap(wrappedBuffer);
- // done writing to the decoded buffer - prep it for reading again
- peerAppData.flip();
+
+ while (peerAppData.remaining() < bytes) {
+ wrappedBuffer.limit(wrappedBuffer.capacity());
+ int amountRead = channel.read(wrappedBuffer);
+ if (amountRead < 0) {
+ throw new EOFException();
+ }
+ if (amountRead > 0) {
+ wrappedBuffer.flip();
+ // prep the decoded buffer for writing
+ peerAppData.compact();
+ try (final ByteBufferSharing inputSharing2 = unwrap(wrappedBuffer)) {
+ // done writing to the decoded buffer - prep it for reading again
+ final ByteBuffer peerAppDataNew = inputSharing2.getBuffer();
+ peerAppDataNew.flip();
+ peerAppData = peerAppDataNew; // loop needs new reference!
+ }
+ }
}
+ return shareInputBuffer();
}
- return peerAppData;
}
@Override
- public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
- return peerAppData;
- }
-
- /**
- * ensures that the unwrapped buffer associated with the given wrapped buffer has
- * sufficient capacity for the given amount of bytes. This may compact the
- * buffer or it may return a new buffer.
- */
- public ByteBuffer ensureUnwrappedCapacity(int amount) {
- // for TTLS the app-data buffers do not need to be tracked direct-buffers since we
- // do not use them for I/O operations
- peerAppData =
- bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount);
- return peerAppData;
+ public ByteBufferSharing getUnwrappedBuffer() {
+ return shareInputBuffer();
}
@Override
@@ -369,16 +366,14 @@ public class NioSslEngine implements NioFilter {
}
@Override
- public synchronized boolean isClosed() {
- return closed;
- }
-
- @Override
- public void close(SocketChannel socketChannel) {
+ public synchronized void close(SocketChannel socketChannel) {
if (closed) {
return;
}
- try {
+ closed = true;
+ inputSharing.destruct();
+ try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) {
+ final ByteBuffer myNetData = outputSharing.getBuffer();
if (!engine.isOutboundDone()) {
ByteBuffer empty = ByteBuffer.wrap(new byte[0]);
@@ -405,14 +400,13 @@ public class NioSslEngine implements NioFilter {
// we can't send a close message if the channel is closed
} catch (IOException e) {
throw new GemFireIOException("exception closing SSL session", e);
+ } catch (final OpenAttemptTimedOut _unused) {
+ logger.info(String.format("Couldn't get output lock in time, eliding TLS close message"));
+ if (!engine.isOutboundDone()) {
+ engine.closeOutbound();
+ }
} finally {
- ByteBuffer netData = myNetData;
- ByteBuffer appData = peerAppData;
- myNetData = null;
- peerAppData = EMPTY_BUFFER;
- bufferPool.releaseBuffer(TRACKED_SENDER, netData);
- bufferPool.releaseBuffer(TRACKED_RECEIVER, appData);
- this.closed = true;
+ outputSharing.destruct();
}
}
@@ -421,4 +415,17 @@ public class NioSslEngine implements NioFilter {
targetBuffer.capacity() * 2);
}
+ @VisibleForTesting
+ public ByteBufferSharing shareOutputBuffer() {
+ return outputSharing.open();
+ }
+
+ private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit)
+ throws OpenAttemptTimedOut {
+ return outputSharing.open(time, unit);
+ }
+
+ public ByteBufferSharing shareInputBuffer() {
+ return inputSharing.open();
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index d93b75c..95e1953 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -78,6 +78,7 @@ import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.ByteBufferSharing;
import org.apache.geode.internal.net.NioFilter;
import org.apache.geode.internal.net.NioPlainEngine;
import org.apache.geode.internal.net.SocketCreator;
@@ -789,11 +790,12 @@ public class Connection implements Runnable {
@VisibleForTesting
void clearSSLInputBuffer() {
if (getConduit().useSSL() && ioFilter != null) {
- synchronized (ioFilter.getSynchObject()) {
- if (!ioFilter.isClosed()) {
- // clear out any remaining handshake bytes
- ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
- buffer.position(0).limit(0);
+ try (final ByteBufferSharing sharedBuffer = ioFilter.getUnwrappedBuffer()) {
+ // clear out any remaining handshake bytes
+ try {
+ sharedBuffer.getBuffer().position(0).limit(0);
+ } catch (IOException e) {
+ // means the NioFilter was already closed
}
}
}
@@ -2403,8 +2405,9 @@ public class Connection implements Runnable {
long queueTimeoutTarget = now + asyncQueueTimeout;
channel.configureBlocking(false);
try {
- synchronized (ioFilter.getSynchObject()) {
- ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+ try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
+ final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+
int waitTime = 1;
do {
owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -2557,9 +2560,9 @@ public class Connection implements Runnable {
}
// fall through
}
- // synchronize on the ioFilter while using its network buffer
- synchronized (ioFilter.getSynchObject()) {
- ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+ try (final ByteBufferSharing outputSharing = ioFilter.wrap(buffer)) {
+ final ByteBuffer wrappedBuffer = outputSharing.getBuffer();
+
while (wrappedBuffer.remaining() > 0) {
int amtWritten = 0;
long start = stats.startSocketWrite(true);
@@ -2611,10 +2614,12 @@ public class Connection implements Runnable {
final Version version = getRemoteVersion();
try {
msgReader = new MsgReader(this, ioFilter, version);
+
ReplyMessage msg;
int len;
- synchronized (ioFilter.getSynchObject()) {
+ // (we have to lock here to protect between reading header and message body)
+ try (final ByteBufferSharing _unused = ioFilter.getUnwrappedBuffer()) {
Header header = msgReader.readHeader();
if (header.getMessageType() == NORMAL_MSG_TYPE) {
@@ -2631,7 +2636,7 @@ public class Connection implements Runnable {
releaseMsgDestreamer(header.getMessageId(), destreamer);
len = destreamer.size();
}
- } // sync
+ }
// I'd really just like to call dispatchMessage here. However,
// that call goes through a bunch of checks that knock about
// 10% of the performance. Since this direct-ack stuff is all
@@ -2698,8 +2703,9 @@ public class Connection implements Runnable {
private void processInputBuffer() throws ConnectionException, IOException {
inputBuffer.flip();
- synchronized (ioFilter.getSynchObject()) {
- ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+ try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) {
+ final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer();
+
peerDataBuffer.flip();
boolean done = false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 396ece2..503e48b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -26,6 +26,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.ByteBufferSharing;
import org.apache.geode.internal.net.NioFilter;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -54,8 +55,8 @@ public class MsgReader {
}
Header readHeader() throws IOException {
- synchronized (ioFilter.getSynchObject()) {
- ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+ try (final ByteBufferSharing sharedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES)) {
+ ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
@@ -89,8 +90,8 @@ public class MsgReader {
*/
DistributionMessage readMessage(Header header)
throws IOException, ClassNotFoundException {
- synchronized (ioFilter.getSynchObject()) {
- ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+ try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
+ ByteBuffer nioInputBuffer = sharedBuffer.getBuffer();
Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
this.getStats().incMessagesBeingReceived(true, header.messageLength);
long startSer = this.getStats().startMsgDeserialization();
@@ -112,8 +113,8 @@ public class MsgReader {
void readChunk(Header header, MsgDestreamer md)
throws IOException {
- synchronized (ioFilter.getSynchObject()) {
- ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
+ try (final ByteBufferSharing sharedBuffer = readAtLeast(header.messageLength)) {
+ ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
md.addChunk(unwrappedBuffer, header.messageLength);
// show that the bytes have been consumed by adjusting the buffer's position
@@ -123,7 +124,7 @@ public class MsgReader {
- private ByteBuffer readAtLeast(int bytes) throws IOException {
+ private ByteBufferSharing readAtLeast(int bytes) throws IOException {
peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
BufferPool.BufferType.TRACKED_RECEIVER);
return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
new file mode 100644
index 0000000..bb5a75f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferSharingImplTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.net;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class ByteBufferSharingImplTest {
+
+ private ByteBufferSharingImpl sharing;
+ private BufferPool poolMock;
+ private CountDownLatch clientHasOpenedResource;
+ private CountDownLatch clientMayComplete;
+
+ @Before
+ public void before() {
+ poolMock = mock(BufferPool.class);
+ sharing =
+ new ByteBufferSharingImpl(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER,
+ poolMock);
+ clientHasOpenedResource = new CountDownLatch(1);
+ clientMayComplete = new CountDownLatch(1);
+ }
+
+ @Test
+ public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException {
+ resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> {
+ try (final ByteBufferSharing _unused = sharing.open()) {
+ }
+ });
+ }
+
+ @Test
+ public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException {
+ resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> {
+ final ByteBufferSharing sharing2 = sharing.open();
+ sharing2.close();
+ verify(poolMock, times(0)).releaseBuffer(any(), any());
+ assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+ verify(poolMock, times(0)).releaseBuffer(any(), any());
+ });
+ }
+
+ @Test
+ public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException {
+ clientIsLastReferenceHolder("client with balanced close calls", () -> {
+ try (final ByteBufferSharing _unused = sharing.open()) {
+ clientHasOpenedResource.countDown();
+ blockClient();
+ }
+ });
+ }
+
+ @Test
+ public void extraCloseClientIsLastReferenceHolder() throws InterruptedException {
+ clientIsLastReferenceHolder("client with extra close calls", () -> {
+ final ByteBufferSharing sharing2 = sharing.open();
+ clientHasOpenedResource.countDown();
+ blockClient();
+ sharing2.close();
+ verify(poolMock, times(1)).releaseBuffer(any(), any());
+ assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+ System.out.println("here");
+ });
+ }
+
+ @Test
+ public void extraCloseDoesNotPrematurelyReturnBufferToPool() {
+ final ByteBufferSharing sharing2 = sharing.open();
+ sharing2.close();
+ assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+ verify(poolMock, times(0)).releaseBuffer(any(), any());
+ sharing.destruct();
+ verify(poolMock, times(1)).releaseBuffer(any(), any());
+ }
+
+ @Test
+ public void extraCloseDoesNotDecrementRefCount() {
+ final ByteBufferSharing sharing2 = sharing.open();
+ sharing2.close();
+ assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class);
+ final ByteBufferSharing sharing3 = this.sharing.open();
+ sharing.destruct();
+ verify(poolMock, times(0)).releaseBuffer(any(), any());
+ }
+
+ private void resourceOwnerIsLastReferenceHolder(final String name, final Runnable client)
+ throws InterruptedException {
+ /*
+ * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
+ */
+
+ /*
+ * clientThread thread is playing the role of the client (of the resource owner)
+ */
+ final Thread clientThread = new Thread(client, name);
+ clientThread.start();
+ clientThread.join();
+
+ verify(poolMock, times(0)).releaseBuffer(any(), any());
+
+ sharing.destruct();
+
+ verify(poolMock, times(1)).releaseBuffer(any(), any());
+ }
+
+ private void clientIsLastReferenceHolder(final String name, final Runnable client)
+ throws InterruptedException {
+ /*
+ * Thread.currentThread() is thread is playing the role of the (ByteBuffer) resource owner
+ */
+
+ /*
+ * clientThread thread is playing the role of the client (of the resource owner)
+ */
+ final Thread clientThread = new Thread(client, name);
+ clientThread.start();
+
+ clientHasOpenedResource.await();
+
+ sharing.destruct();
+
+ verify(poolMock, times(0)).releaseBuffer(any(), any());
+
+ clientMayComplete.countDown(); // let client finish
+
+ clientThread.join();
+
+ verify(poolMock, times(1)).releaseBuffer(any(), any());
+ }
+
+ private void blockClient() {
+ try {
+ clientMayComplete.await();
+ } catch (InterruptedException e) {
+ fail("test client thread interrupted: " + e);
+ }
+ }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
index 166c2a5..bbf7a3b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java
@@ -50,7 +50,8 @@ public class NioPlainEngineTest {
public void unwrap() {
ByteBuffer buffer = ByteBuffer.allocate(100);
buffer.position(0).limit(buffer.capacity());
- nioEngine.unwrap(buffer);
+ try (final ByteBufferSharing unused = nioEngine.unwrap(buffer)) {
+ }
assertThat(buffer.position()).isEqualTo(buffer.limit());
}
@@ -116,23 +117,29 @@ public class NioPlainEngineTest {
nioEngine.lastReadPosition = 10;
- ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
- verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit()).isEqualTo(amountToRead);
- assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
- assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
-
- data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
- verify(mockChannel, times(5)).read(any(ByteBuffer.class));
- // at end of last readAtLeast data
- assertThat(data.position()).isEqualTo(amountToRead);
- // we read amountToRead bytes
- assertThat(data.limit()).isEqualTo(amountToRead * 2);
- // we did 2 more reads from the network
- assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
- // the next read will start at the end of consumed data
- assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
+ try (final ByteBufferSharing sharedBuffer =
+ nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+ ByteBuffer data = sharedBuffer.getBuffer();
+ verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit()).isEqualTo(amountToRead);
+ assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes);
+ assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead);
+ }
+
+ try (final ByteBufferSharing sharedBuffer =
+ nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+ final ByteBuffer data = sharedBuffer.getBuffer();
+ verify(mockChannel, times(5)).read(any(ByteBuffer.class));
+ // at end of last readAtLeast data
+ assertThat(data.position()).isEqualTo(amountToRead);
+ // we read amountToRead bytes
+ assertThat(data.limit()).isEqualTo(amountToRead * 2);
+ // we did 2 more reads from the network
+ assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 5 + preexistingBytes);
+ // the next read will start at the end of consumed data
+ assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead * 2);
+ }
}
@@ -147,7 +154,9 @@ public class NioPlainEngineTest {
nioEngine.lastReadPosition = 10;
- nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
+ try (final ByteBufferSharing unused =
+ nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+ }
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index aef1672..e9b01cf 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -62,13 +62,14 @@ import org.apache.geode.test.junit.categories.MembershipTest;
@Category({MembershipTest.class})
public class NioSslEngineTest {
- private static final int netBufferSize = 10000;
- private static final int appBufferSize = 20000;
+ private static final int netBufferSize = 4096;
+ private static final int appBufferSize = 32768;
private SSLEngine mockEngine;
private DMStats mockStats;
private NioSslEngine nioSslEngine;
private NioSslEngine spyNioSslEngine;
+ private BufferPool spyBufferPool;
@Before
public void setUp() throws Exception {
@@ -81,11 +82,20 @@ public class NioSslEngineTest {
mockStats = mock(DMStats.class);
- nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats));
+ final BufferPool bufferPool = new BufferPool(mockStats);
+ spyBufferPool = spy(bufferPool);
+ nioSslEngine = new NioSslEngine(mockEngine, spyBufferPool);
spyNioSslEngine = spy(nioSslEngine);
}
@Test
+ public void engineUsesDirectBuffers() throws IOException {
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+ assertThat(outputSharing.getBuffer().isDirect()).isTrue();
+ }
+ }
+
+ @Test
public void handshake() throws Exception {
SocketChannel mockChannel = mock(SocketChannel.class);
when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
@@ -114,7 +124,7 @@ public class NioSslEngineTest {
verify(mockEngine, atLeast(2)).getHandshakeStatus();
verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class));
verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class));
- verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class),
+ verify(spyBufferPool, times(2)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
any(ByteBuffer.class), any(Integer.class));
verify(spyNioSslEngine, times(1)).handleBlockingTasks();
verify(mockChannel, times(3)).read(any(ByteBuffer.class));
@@ -178,148 +188,148 @@ public class NioSslEngineTest {
.hasMessageContaining("SSL Handshake terminated with status");
}
-
- @Test
- public void checkClosed() throws Exception {
- nioSslEngine.checkClosed();
- }
-
- @Test(expected = IOException.class)
- public void checkClosedThrows() throws Exception {
- when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
- new SSLEngineResult(CLOSED, FINISHED, 0, 100));
- nioSslEngine.close(mock(SocketChannel.class));
- nioSslEngine.checkClosed();
- }
-
- @Test
- public void synchObjectIsSelf() {
- // for thread-safety the synchronization object given to outside entities
- // must be the the engine itself. This allows external manipulation or
- // use of the engine's buffers to be protected in the same way as its synchronized
- // methods
- assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine);
- }
-
@Test
public void wrap() throws Exception {
- // make the application data too big to fit into the engine's encryption buffer
- ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
- byte[] appBytes = new byte[appData.capacity()];
- Arrays.fill(appBytes, (byte) 0x1F);
- appData.put(appBytes);
- appData.flip();
-
- // create an engine that will transfer bytes from the application buffer to the encrypted buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(
- new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
- spyNioSslEngine.engine = testEngine;
-
- ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData);
-
- verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class),
- any(ByteBuffer.class), any(Integer.class));
- appData.flip();
- assertThat(wrappedBuffer).isEqualTo(appData);
- verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+
+ // make the application data too big to fit into the engine's encryption buffer
+ ByteBuffer appData =
+ ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
+ byte[] appBytes = new byte[appData.capacity()];
+ Arrays.fill(appBytes, (byte) 0x1F);
+ appData.put(appBytes);
+ appData.flip();
+
+ // create an engine that will transfer bytes from the application buffer to the encrypted
+ // buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(
+ new SSLEngineResult(OK, NEED_TASK, appData.remaining(), appData.remaining()));
+ spyNioSslEngine.engine = testEngine;
+
+ try (final ByteBufferSharing outputSharing2 = spyNioSslEngine.wrap(appData)) {
+ ByteBuffer wrappedBuffer = outputSharing2.getBuffer();
+
+ verify(spyBufferPool, times(1)).expandWriteBufferIfNeeded(any(BufferPool.BufferType.class),
+ any(ByteBuffer.class), any(Integer.class));
+ appData.flip();
+ assertThat(wrappedBuffer).isEqualTo(appData);
+ }
+ verify(spyNioSslEngine, times(1)).handleBlockingTasks();
+ }
}
@Test
- public void wrapFails() {
- // make the application data too big to fit into the engine's encryption buffer
- ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);
- byte[] appBytes = new byte[appData.capacity()];
- Arrays.fill(appBytes, (byte) 0x1F);
- appData.put(appBytes);
- appData.flip();
-
- // create an engine that will transfer bytes from the application buffer to the encrypted buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(
- new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
- spyNioSslEngine.engine = testEngine;
-
- assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
- .hasMessageContaining("Error encrypting data");
+ public void wrapFails() throws IOException {
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+ // make the application data too big to fit into the engine's encryption buffer
+ ByteBuffer appData =
+ ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100);
+ byte[] appBytes = new byte[appData.capacity()];
+ Arrays.fill(appBytes, (byte) 0x1F);
+ appData.put(appBytes);
+ appData.flip();
+
+ // create an engine that will transfer bytes from the application buffer to the encrypted
+ // buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(
+ new SSLEngineResult(CLOSED, NEED_TASK, appData.remaining(), appData.remaining()));
+ spyNioSslEngine.engine = testEngine;
+
+ assertThatThrownBy(() -> spyNioSslEngine.wrap(appData)).isInstanceOf(SSLException.class)
+ .hasMessageContaining("Error encrypting data");
+ }
}
@Test
public void unwrapWithBufferOverflow() throws Exception {
- // make the application data too big to fit into the engine's encryption buffer
- int originalPeerAppDataCapacity = nioSslEngine.peerAppData.capacity();
- int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
- nioSslEngine.peerAppData.position(originalPeerAppDataPosition);
- ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
- byte[] netBytes = new byte[wrappedData.capacity()];
- Arrays.fill(netBytes, (byte) 0x1F);
- wrappedData.put(netBytes);
- wrappedData.flip();
-
- // create an engine that will transfer bytes from the application buffer to the encrypted buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- spyNioSslEngine.engine = testEngine;
-
- testEngine.addReturnResult(
- new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
- new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
- new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
- new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
-
- int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
- expectedCapacity =
- 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
- expectedCapacity =
- 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
- ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
- unwrappedBuffer.flip();
- assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ // make the application data too big to fit into the engine's encryption buffer
+ final ByteBuffer peerAppData = inputSharing.getBuffer();
+
+ int originalPeerAppDataCapacity = peerAppData.capacity();
+ int originalPeerAppDataPosition = originalPeerAppDataCapacity / 2;
+ peerAppData.position(originalPeerAppDataPosition);
+ ByteBuffer wrappedData = ByteBuffer.allocate(originalPeerAppDataCapacity + 100);
+ byte[] netBytes = new byte[wrappedData.capacity()];
+ Arrays.fill(netBytes, (byte) 0x1F);
+ wrappedData.put(netBytes);
+ wrappedData.flip();
+
+ // create an engine that will transfer bytes from the application buffer to the encrypted
+ // buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ spyNioSslEngine.engine = testEngine;
+
+ testEngine.addReturnResult(
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // results in 30,000 byte buffer
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 50,000 bytes
+ new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0), // 90,000 bytes
+ new SSLEngineResult(OK, FINISHED, netBytes.length, netBytes.length));
+
+ int expectedCapacity = 2 * originalPeerAppDataCapacity - originalPeerAppDataPosition;
+ expectedCapacity =
+ 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+ expectedCapacity =
+ 2 * (expectedCapacity - originalPeerAppDataPosition) + originalPeerAppDataPosition;
+ try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
+ ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+ unwrappedBuffer.flip();
+ assertThat(unwrappedBuffer.capacity()).isEqualTo(expectedCapacity);
+ }
+ }
}
@Test
public void unwrapWithBufferUnderflow() throws Exception {
- ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
- byte[] netBytes = new byte[wrappedData.capacity() / 2];
- Arrays.fill(netBytes, (byte) 0x1F);
- wrappedData.put(netBytes);
- wrappedData.flip();
-
- // create an engine that will transfer bytes from the application buffer to the encrypted buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
- spyNioSslEngine.engine = testEngine;
-
- ByteBuffer unwrappedBuffer = spyNioSslEngine.unwrap(wrappedData);
- unwrappedBuffer.flip();
- assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
- assertThat(wrappedData.position()).isEqualTo(netBytes.length);
- }
-
- @Test
- public void unwrapWithDecryptionError() {
- // make the application data too big to fit into the engine's encryption buffer
- ByteBuffer wrappedData = ByteBuffer.allocate(nioSslEngine.peerAppData.capacity());
- byte[] netBytes = new byte[wrappedData.capacity() / 2];
- Arrays.fill(netBytes, (byte) 0x1F);
- wrappedData.put(netBytes);
- wrappedData.flip();
-
- // create an engine that will transfer bytes from the application buffer to the encrypted buffer
- TestSSLEngine testEngine = new TestSSLEngine();
- testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
- spyNioSslEngine.engine = testEngine;
-
- assertThatThrownBy(() -> spyNioSslEngine.unwrap(wrappedData)).isInstanceOf(SSLException.class)
- .hasMessageContaining("Error decrypting data");
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ ByteBuffer wrappedData =
+ ByteBuffer.allocate(inputSharing.getBuffer().capacity());
+ byte[] netBytes = new byte[wrappedData.capacity() / 2];
+ Arrays.fill(netBytes, (byte) 0x1F);
+ wrappedData.put(netBytes);
+ wrappedData.flip();
+
+ // create an engine that will transfer bytes from the application buffer to the encrypted
+ // buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(new SSLEngineResult(BUFFER_UNDERFLOW, NEED_TASK, 0, 0));
+ spyNioSslEngine.engine = testEngine;
+
+ try (final ByteBufferSharing sharedBuffer = spyNioSslEngine.unwrap(wrappedData)) {
+ ByteBuffer unwrappedBuffer = sharedBuffer.getBuffer();
+ unwrappedBuffer.flip();
+ assertThat(unwrappedBuffer.remaining()).isEqualTo(0);
+ }
+ assertThat(wrappedData.position()).isEqualTo(netBytes.length);
+ }
}
@Test
- public void ensureUnwrappedCapacity() {
- ByteBuffer wrappedBuffer = ByteBuffer.allocate(netBufferSize);
- int requestedCapacity = nioSslEngine.getUnwrappedBuffer(wrappedBuffer).capacity() * 2;
- ByteBuffer unwrappedBuffer = nioSslEngine.ensureUnwrappedCapacity(requestedCapacity);
- assertThat(unwrappedBuffer.capacity()).isGreaterThanOrEqualTo(requestedCapacity);
+ public void unwrapWithDecryptionError() throws IOException {
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ // make the application data too big to fit into the engine's encryption buffer
+ ByteBuffer wrappedData =
+ ByteBuffer.allocate(inputSharing.getBuffer().capacity());
+ byte[] netBytes = new byte[wrappedData.capacity() / 2];
+ Arrays.fill(netBytes, (byte) 0x1F);
+ wrappedData.put(netBytes);
+ wrappedData.flip();
+
+ // create an engine that will transfer bytes from the application buffer to the encrypted
+ // buffer
+ TestSSLEngine testEngine = new TestSSLEngine();
+ testEngine.addReturnResult(new SSLEngineResult(CLOSED, FINISHED, 0, 0));
+ spyNioSslEngine.engine = testEngine;
+
+ assertThatThrownBy(() -> {
+ try (final ByteBufferSharing unused = spyNioSslEngine.unwrap(wrappedData)) {
+ }
+ }).isInstanceOf(SSLException.class)
+ .hasMessageContaining("Error decrypting data");
+ }
}
@Test
@@ -333,7 +343,11 @@ public class NioSslEngineTest {
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
new SSLEngineResult(CLOSED, FINISHED, 0, 0));
nioSslEngine.close(mockChannel);
- assertThatThrownBy(() -> nioSslEngine.checkClosed()).isInstanceOf(IOException.class)
+ assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer())
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("NioSslEngine has been closed");
+ assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer())
+ .isInstanceOf(IOException.class)
.hasMessageContaining("NioSslEngine has been closed");
nioSslEngine.close(mockChannel);
}
@@ -362,10 +376,12 @@ public class NioSslEngineTest {
when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE);
when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> {
- // give the NioSslEngine something to write on its socket channel, simulating a TLS close
- // message
- nioSslEngine.myNetData.put("Goodbye cruel world".getBytes());
- return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+ try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) {
+ // give the NioSslEngine something to write on its socket channel, simulating a TLS close
+ // message
+ outputSharing.getBuffer().put("Goodbye cruel world".getBytes());
+ return new SSLEngineResult(CLOSED, FINISHED, 0, 0);
+ }
});
when(mockChannel.write(any(ByteBuffer.class))).thenThrow(new ClosedChannelException());
nioSslEngine.close(mockChannel);
@@ -396,37 +412,42 @@ public class NioSslEngineTest {
ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000);
SocketChannel mockChannel = mock(SocketChannel.class);
- // force a compaction by making the decoded buffer appear near to being full
- ByteBuffer unwrappedBuffer = nioSslEngine.peerAppData;
- unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
- unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
-
- // simulate some socket reads
- when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
- @Override
- public Integer answer(InvocationOnMock invocation) throws Throwable {
- ByteBuffer buffer = invocation.getArgument(0);
- buffer.position(buffer.position() + individualRead);
- return individualRead;
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ // force a compaction by making the decoded buffer appear near to being full
+ ByteBuffer unwrappedBuffer = inputSharing.getBuffer();
+ unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead);
+ unwrappedBuffer.limit(unwrappedBuffer.position() + preexistingBytes);
+
+ // simulate some socket reads
+ when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ ByteBuffer buffer = invocation.getArgument(0);
+ buffer.position(buffer.position() + individualRead);
+ return individualRead;
+ }
+ });
+
+ TestSSLEngine testSSLEngine = new TestSSLEngine();
+ testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
+ nioSslEngine.engine = testSSLEngine;
+
+ try (final ByteBufferSharing sharedBuffer =
+ nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+ ByteBuffer data = sharedBuffer.getBuffer();
+ verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
}
- });
-
- TestSSLEngine testSSLEngine = new TestSSLEngine();
- testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
- nioSslEngine.engine = testSSLEngine;
-
- ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
- verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+ }
}
/**
- * This tests the case where a message header has been read and part of a message has been
- * read, but the decoded buffer is too small to hold all of the message. In this case
- * the readAtLeast method will have to expand the capacity of the decoded buffer and return
- * the new, expanded, buffer as the method result.
+ * This tests the case where a message header has been read and part of a message has been read,
+ * but the decoded buffer is too small to hold all of the message. In this case the readAtLeast
+ * method will have to expand the capacity of the decoded buffer and return the new, expanded,
+ * buffer as the method result.
*/
@Test
public void readAtLeastUsingSmallAppBuffer() throws Exception {
@@ -440,7 +461,11 @@ public class NioSslEngineTest {
int initialUnwrappedBufferSize = 100;
ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
- nioSslEngine.peerAppData = unwrappedBuffer;
+
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+ inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+ }
// simulate some socket reads
when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -460,22 +485,26 @@ public class NioSslEngineTest {
new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190
nioSslEngine.engine = testSSLEngine;
- ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
- verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
- // The initial available space in the unwrapped buffer should have doubled
- int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
- assertThat(nioSslEngine.peerAppData.capacity())
- .isEqualTo(2 * initialFreeSpace + preexistingBytes);
+ try (final ByteBufferSharing sharedBuffer =
+ nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+ ByteBuffer data = sharedBuffer.getBuffer();
+ verify(mockChannel, times(3)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes);
+ // The initial available space in the unwrapped buffer should have doubled
+ int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes;
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ assertThat(inputSharing.getBuffer().capacity())
+ .isEqualTo(2 * initialFreeSpace + preexistingBytes);
+ }
+ }
}
/**
- * This tests the case where a message header has been read and part of a message has been
- * read, but the decoded buffer is too small to hold all of the message. In this case
- * the buffer is completely full and should only take one overflow response to resolve
- * the problem.
+ * This tests the case where a message header has been read and part of a message has been read,
+ * but the decoded buffer is too small to hold all of the message. In this case the buffer is
+ * completely full and should only take one overflow response to resolve the problem.
*/
@Test
public void readAtLeastUsingSmallAppBufferAtWriteLimit() throws Exception {
@@ -490,7 +519,10 @@ public class NioSslEngineTest {
// force buffer expansion by making a small decoded buffer appear near to being full
ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize);
unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored
- nioSslEngine.peerAppData = unwrappedBuffer;
+ try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) {
+ final ByteBufferSharingImpl inputSharingImpl = (ByteBufferSharingImpl) inputSharing;
+ inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer);
+ }
// simulate some socket reads
when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() {
@@ -510,11 +542,14 @@ public class NioSslEngineTest {
new SSLEngineResult(OK, NEED_UNWRAP, 0, 0));
nioSslEngine.engine = testSSLEngine;
- ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer);
- verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
- assertThat(data.position()).isEqualTo(0);
- assertThat(data.limit())
- .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
+ try (final ByteBufferSharing sharedBuffer =
+ nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer)) {
+ ByteBuffer data = sharedBuffer.getBuffer();
+ verify(mockChannel, times(1)).read(isA(ByteBuffer.class));
+ assertThat(data.position()).isEqualTo(0);
+ assertThat(data.limit())
+ .isEqualTo(individualRead * testSSLEngine.getNumberOfUnwraps() + preexistingBytes);
+ }
}
@@ -652,8 +687,8 @@ public class NioSslEngineTest {
}
/**
- * add an engine operation result to be returned by wrap or unwrap.
- * Like Mockito's thenReturn(), the last return result will repeat forever
+ * add an engine operation result to be returned by wrap or unwrap. Like Mockito's thenReturn(),
+ * the last return result will repeat forever
*/
void addReturnResult(SSLEngineResult... sslEngineResult) {
for (SSLEngineResult result : sslEngineResult) {