You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/10/08 21:42:31 UTC
[geode] branch support/1.12 updated: GEODE-8584: Message
transmission fails with IllegalStateException in socket i/o code (#5605)
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new 2ccaf7b GEODE-8584: Message transmission fails with IllegalStateException in socket i/o code (#5605)
2ccaf7b is described below
commit 2ccaf7b8fe1cb490a3d3001b61b12ea52c5ed1d0
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Thu Oct 8 14:09:42 2020 -0700
GEODE-8584: Message transmission fails with IllegalStateException in socket i/o code (#5605)
* GEODE-8584: Message transmission fails with IllegalStateException in socket i/o code
Add appropriate synchronization when using ioFilter's buffers.
to do: add testing
to do: document the need for synchronization in the NioFilter interface.
* revised synchronization across all uses of NioFilter, added test
* remove dangling debug logging
* fix pmd problem
* fix pmd problem
* remove unnecessary volatile modifier
(cherry picked from commit f4d44d658a6273d7de27479a67c05117310205a7)
---
.../org/apache/geode/internal/net/BufferPool.java | 7 +-
.../org/apache/geode/internal/net/NioFilter.java | 22 +-
.../apache/geode/internal/net/NioSslEngine.java | 18 +-
.../org/apache/geode/internal/tcp/Connection.java | 333 +++++++++++----------
.../org/apache/geode/internal/tcp/MsgReader.java | 82 ++---
.../geode/internal/net/NioSslEngineTest.java | 26 +-
6 files changed, 266 insertions(+), 222 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index e119250..3938422 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -255,10 +255,15 @@ public class BufferPool {
* "slice" of the buffer having the requested capacity and hand that out instead.
* When we put the buffer back in the pool we need to find the original, non-sliced,
* buffer. This is held in DirectBuffer in its "attachment" field, which is a public
- * method, though DirectBuffer is package-private.
+ * method, though DirectBuffer is package-private. This method is visible for use
+ * in debugging and testing. For debugging, invoke this method if you need to see
+ * the non-sliced buffer for some reason, such as logging its hashcode.
*/
@VisibleForTesting
public ByteBuffer getPoolableBuffer(ByteBuffer buffer) {
+ if (!buffer.isDirect()) {
+ return buffer;
+ }
ByteBuffer result = buffer;
if (parentOfSliceMethod == null) {
Class clazz = buffer.getClass();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 01556dc..9c437ad 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -21,7 +21,13 @@ import java.nio.channels.SocketChannel;
/**
* Prior to transmitting a buffer or processing a received buffer
* a NioFilter should be called to wrap (transmit) or unwrap (received)
- * the buffer in case SSL is being used.
+ * the buffer in case SSL is being used.<br>
+ * Implementations of this class may not be thread-safe in regard to
+ * the buffers their methods return. These may be internal state that,
+ * if used concurrently by multiple threads could cause corruption.
+ * Appropriate external synchronization must be used in order to provide
+ * thread-safety. Do this by invoking getSynchObject() and synchronizing on
+ * the returned object while using the buffer.
*/
public interface NioFilter {
@@ -75,6 +81,10 @@ public interface NioFilter {
}
}
+ default boolean isClosed() {
+ return false;
+ }
+
/**
* invoke this method when you are done using the NioFilter
*
@@ -84,9 +94,15 @@ public interface NioFilter {
}
/**
- * returns the unwrapped byte buffer associated with the given wrapped buffer
+ * returns the unwrapped byte buffer associated with the given wrapped buffer.
*/
ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
-
+ /**
+ * returns an object to be used in synchronizing on the use of buffers returned by
+ * a NioFilter.
+ */
+ default Object getSynchObject() {
+ return this;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2d55fa3..2398b35 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -40,6 +40,7 @@ import javax.net.ssl.SSLSession;
import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
+import org.apache.geode.annotations.internal.MakeImmutable;
import org.apache.geode.internal.net.BufferPool.BufferType;
import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -52,6 +53,11 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
public class NioSslEngine implements NioFilter {
private static final Logger logger = LogService.getLogger();
+ // this variable requires the MakeImmutable annotation but the buffer is empty and
+ // not really modifiable
+ @MakeImmutable
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
private final BufferPool bufferPool;
private volatile boolean closed;
@@ -362,6 +368,10 @@ public class NioSslEngine implements NioFilter {
// read-operations
}
+ @Override
+ public synchronized boolean isClosed() {
+ return closed;
+ }
@Override
public void close(SocketChannel socketChannel) {
@@ -396,8 +406,12 @@ public class NioSslEngine implements NioFilter {
} catch (IOException e) {
throw new GemFireIOException("exception closing SSL session", e);
} finally {
- bufferPool.releaseBuffer(TRACKED_SENDER, myNetData);
- bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData);
+ ByteBuffer netData = myNetData;
+ ByteBuffer appData = peerAppData;
+ myNetData = null;
+ peerAppData = EMPTY_BUFFER;
+ bufferPool.releaseBuffer(TRACKED_SENDER, netData);
+ bufferPool.releaseBuffer(TRACKED_RECEIVER, appData);
this.closed = true;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index ee9892b..f02beaf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -788,9 +788,13 @@ public class Connection implements Runnable {
private void notifyHandshakeWaiter(boolean success) {
if (getConduit().useSSL() && ioFilter != null) {
- // clear out any remaining handshake bytes
- ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
- buffer.position(0).limit(0);
+ synchronized (ioFilter.getSynchObject()) {
+ if (!ioFilter.isClosed()) {
+ // clear out any remaining handshake bytes
+ ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
+ buffer.position(0).limit(0);
+ }
+ }
}
synchronized (handshakeSync) {
if (success) {
@@ -2387,115 +2391,117 @@ public class Connection implements Runnable {
long queueTimeoutTarget = now + asyncQueueTimeout;
channel.configureBlocking(false);
try {
- ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
- int waitTime = 1;
- do {
- owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
- retries++;
- int amtWritten;
- if (FORCE_ASYNC_QUEUE) {
- amtWritten = 0;
- } else {
- amtWritten = channel.write(wrappedBuffer);
- }
- if (amtWritten == 0) {
- now = System.currentTimeMillis();
- long timeoutTarget;
- if (!forceAsync) {
- if (now > distributionTimeoutTarget) {
- if (logger.isDebugEnabled()) {
- if (distributionTimeoutTarget == 0) {
- logger.debug(
- "Starting async pusher to handle async queue because distribution-timeout is 1 and the last socket write would have blocked.");
- } else {
- long blockedMs = now - distributionTimeoutTarget;
- blockedMs += asyncDistributionTimeout;
- logger.debug(
- "Blocked for {}ms which is longer than the max of {}ms so starting async pusher to handle async queue.",
- blockedMs, asyncDistributionTimeout);
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+ int waitTime = 1;
+ do {
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ retries++;
+ int amtWritten;
+ if (FORCE_ASYNC_QUEUE) {
+ amtWritten = 0;
+ } else {
+ amtWritten = channel.write(wrappedBuffer);
+ }
+ if (amtWritten == 0) {
+ now = System.currentTimeMillis();
+ long timeoutTarget;
+ if (!forceAsync) {
+ if (now > distributionTimeoutTarget) {
+ if (logger.isDebugEnabled()) {
+ if (distributionTimeoutTarget == 0) {
+ logger.debug(
+ "Starting async pusher to handle async queue because distribution-timeout is 1 and the last socket write would have blocked.");
+ } else {
+ long blockedMs = now - distributionTimeoutTarget;
+ blockedMs += asyncDistributionTimeout;
+ logger.debug(
+ "Blocked for {}ms which is longer than the max of {}ms so starting async pusher to handle async queue.",
+ blockedMs, asyncDistributionTimeout);
+ }
+ }
+ stats.incAsyncDistributionTimeoutExceeded();
+ if (totalAmtWritten > 0) {
+ // we have written part of the msg to the socket buffer
+ // and we are going to queue the remainder.
+ // We set msg to null so that will not make
+ // the partial msg a candidate for conflation.
+ msg = null;
}
+ if (handleBlockedWrite(wrappedBuffer, msg)) {
+ return;
+ }
+ }
+ timeoutTarget = distributionTimeoutTarget;
+ } else {
+ boolean disconnectNeeded = false;
+ long curQueuedBytes = queuedBytes;
+ if (curQueuedBytes > asyncMaxQueueSize) {
+ logger.warn(
+ "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
+ curQueuedBytes, asyncMaxQueueSize, remoteAddr);
+ stats.incAsyncQueueSizeExceeded(1);
+ disconnectNeeded = true;
}
- stats.incAsyncDistributionTimeoutExceeded();
- if (totalAmtWritten > 0) {
- // we have written part of the msg to the socket buffer
- // and we are going to queue the remainder.
- // We set msg to null so that will not make
- // the partial msg a candidate for conflation.
- msg = null;
+ if (now > queueTimeoutTarget) {
+ // we have waited long enough the pusher has been idle too long!
+ long blockedMs = now - queueTimeoutTarget;
+ blockedMs += asyncQueueTimeout;
+ logger.warn(
+ "Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
+ blockedMs,
+ asyncQueueTimeout, remoteAddr);
+ stats.incAsyncQueueTimeouts(1);
+ disconnectNeeded = true;
}
- if (handleBlockedWrite(wrappedBuffer, msg)) {
+ if (disconnectNeeded) {
+ disconnectSlowReceiver();
+ synchronized (outgoingQueue) {
+ asyncQueuingInProgress = false;
+ outgoingQueue.notifyAll();
+ }
return;
}
+ timeoutTarget = queueTimeoutTarget;
}
- timeoutTarget = distributionTimeoutTarget;
- } else {
- boolean disconnectNeeded = false;
- long curQueuedBytes = queuedBytes;
- if (curQueuedBytes > asyncMaxQueueSize) {
- logger.warn(
- "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
- curQueuedBytes, asyncMaxQueueSize, remoteAddr);
- stats.incAsyncQueueSizeExceeded(1);
- disconnectNeeded = true;
- }
- if (now > queueTimeoutTarget) {
- // we have waited long enough the pusher has been idle too long!
- long blockedMs = now - queueTimeoutTarget;
- blockedMs += asyncQueueTimeout;
- logger.warn(
- "Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
- blockedMs,
- asyncQueueTimeout, remoteAddr);
- stats.incAsyncQueueTimeouts(1);
- disconnectNeeded = true;
- }
- if (disconnectNeeded) {
- disconnectSlowReceiver();
- synchronized (outgoingQueue) {
- asyncQueuingInProgress = false;
- outgoingQueue.notifyAll();
+ {
+ long msToWait = waitTime;
+ long msRemaining = timeoutTarget - now;
+ if (msRemaining > 0) {
+ msRemaining /= 2;
}
- return;
- }
- timeoutTarget = queueTimeoutTarget;
- }
- {
- long msToWait = waitTime;
- long msRemaining = timeoutTarget - now;
- if (msRemaining > 0) {
- msRemaining /= 2;
- }
- if (msRemaining < msToWait) {
- msToWait = msRemaining;
- }
- if (msToWait <= 0) {
- Thread.yield();
- } else {
- boolean interrupted = Thread.interrupted();
- try {
- Thread.sleep(msToWait);
- } catch (InterruptedException ex) {
- interrupted = true;
- owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
+ if (msRemaining < msToWait) {
+ msToWait = msRemaining;
+ }
+ if (msToWait <= 0) {
+ Thread.yield();
+ } else {
+ boolean interrupted = Thread.interrupted();
+ try {
+ Thread.sleep(msToWait);
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
}
+ if (waitTime < MAX_WAIT_TIME) {
+ // double it since it is not yet the max
+ waitTime <<= 1;
+ }
+ } // amtWritten == 0
+ else {
+ totalAmtWritten += amtWritten;
+ // reset queueTimeoutTarget since we made some progress
+ queueTimeoutTarget = System.currentTimeMillis() + asyncQueueTimeout;
+ waitTime = 1;
}
- if (waitTime < MAX_WAIT_TIME) {
- // double it since it is not yet the max
- waitTime <<= 1;
- }
- } // amtWritten == 0
- else {
- totalAmtWritten += amtWritten;
- // reset queueTimeoutTarget since we made some progress
- queueTimeoutTarget = System.currentTimeMillis() + asyncQueueTimeout;
- waitTime = 1;
- }
- } while (wrappedBuffer.remaining() > 0);
+ } while (wrappedBuffer.remaining() > 0);
+ }
} finally {
channel.configureBlocking(true);
}
@@ -2539,17 +2545,20 @@ public class Connection implements Runnable {
}
// fall through
}
- ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
- while (wrappedBuffer.remaining() > 0) {
- int amtWritten = 0;
- long start = stats.startSocketWrite(true);
- try {
- amtWritten = channel.write(wrappedBuffer);
- } finally {
- stats.endSocketWrite(true, start, amtWritten, 0);
+ // synchronize on the ioFilter while using its network buffer
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+ while (wrappedBuffer.remaining() > 0) {
+ int amtWritten = 0;
+ long start = stats.startSocketWrite(true);
+ try {
+ amtWritten = channel.write(wrappedBuffer);
+ } finally {
+ stats.endSocketWrite(true, start, amtWritten, 0);
+ }
}
- }
+ }
}
} else {
writeAsync(channel, buffer, forceAsync, msg, stats);
@@ -2675,68 +2684,70 @@ public class Connection implements Runnable {
private void processInputBuffer() throws ConnectionException, IOException {
inputBuffer.flip();
- ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
- peerDataBuffer.flip();
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+ peerDataBuffer.flip();
- boolean done = false;
+ boolean done = false;
- while (!done && connected) {
- owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
- int remaining = peerDataBuffer.remaining();
- if (lengthSet || remaining >= MSG_HEADER_BYTES) {
- if (!lengthSet) {
- if (readMessageHeader(peerDataBuffer)) {
- break;
+ while (!done && connected) {
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ int remaining = peerDataBuffer.remaining();
+ if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+ if (!lengthSet) {
+ if (readMessageHeader(peerDataBuffer)) {
+ break;
+ }
}
- }
- if (remaining >= messageLength + MSG_HEADER_BYTES) {
- lengthSet = false;
- peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
- // don't trust the message deserialization to leave the position in
- // the correct spot. Some of the serialization uses buffered
- // streams that can leave the position at the wrong spot
- int startPos = peerDataBuffer.position();
- int oldLimit = peerDataBuffer.limit();
- peerDataBuffer.limit(startPos + messageLength);
-
- if (handshakeRead) {
- try {
- readMessage(peerDataBuffer);
- } catch (SerializationException e) {
- logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
- throw e;
+ if (remaining >= messageLength + MSG_HEADER_BYTES) {
+ lengthSet = false;
+ peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
+ // don't trust the message deserialization to leave the position in
+ // the correct spot. Some of the serialization uses buffered
+ // streams that can leave the position at the wrong spot
+ int startPos = peerDataBuffer.position();
+ int oldLimit = peerDataBuffer.limit();
+ peerDataBuffer.limit(startPos + messageLength);
+
+ if (handshakeRead) {
+ try {
+ readMessage(peerDataBuffer);
+ } catch (SerializationException e) {
+ logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
+ throw e;
+ }
+ } else {
+ ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
+ DataInputStream dis = new DataInputStream(bbis);
+ if (!isReceiver) {
+ // we read the handshake and then stop processing since we don't want
+ // to process the input buffer anymore in a handshake thread
+ readHandshakeForSender(dis, peerDataBuffer);
+ return;
+ }
+ if (readHandshakeForReceiver(dis)) {
+ ioFilter.doneReading(peerDataBuffer);
+ return;
+ }
}
- } else {
- ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
- DataInputStream dis = new DataInputStream(bbis);
- if (!isReceiver) {
- // we read the handshake and then stop processing since we don't want
- // to process the input buffer anymore in a handshake thread
- readHandshakeForSender(dis, peerDataBuffer);
- return;
+ if (!connected) {
+ continue;
}
- if (readHandshakeForReceiver(dis)) {
+ accessed();
+ peerDataBuffer.limit(oldLimit);
+ peerDataBuffer.position(startPos + messageLength);
+ } else {
+ done = true;
+ if (getConduit().useSSL()) {
ioFilter.doneReading(peerDataBuffer);
- return;
+ } else {
+ compactOrResizeBuffer(messageLength);
}
}
- if (!connected) {
- continue;
- }
- accessed();
- peerDataBuffer.limit(oldLimit);
- peerDataBuffer.position(startPos + messageLength);
} else {
+ ioFilter.doneReading(peerDataBuffer);
done = true;
- if (getConduit().useSSL()) {
- ioFilter.doneReading(peerDataBuffer);
- } else {
- compactOrResizeBuffer(messageLength);
- }
}
- } else {
- ioFilter.doneReading(peerDataBuffer);
- done = true;
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 4561562..396ece2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -54,28 +54,30 @@ public class MsgReader {
}
Header readHeader() throws IOException {
- ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
- Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
+ Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
- try {
- int nioMessageLength = unwrappedBuffer.getInt();
- /* nioMessageVersion = */
- Connection.calcHdrVersion(nioMessageLength);
- nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
- byte nioMessageType = unwrappedBuffer.get();
- short nioMsgId = unwrappedBuffer.getShort();
+ try {
+ int nioMessageLength = unwrappedBuffer.getInt();
+ /* nioMessageVersion = */
+ Connection.calcHdrVersion(nioMessageLength);
+ nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
+ byte nioMessageType = unwrappedBuffer.get();
+ short nioMsgId = unwrappedBuffer.getShort();
- boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
- if (directAck) {
- nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
- }
+ boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
+ if (directAck) {
+ nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
+ }
- header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+ header.setFields(nioMessageLength, nioMessageType, nioMsgId);
- return header;
- } catch (BufferUnderflowException e) {
- throw e;
+ return header;
+ } catch (BufferUnderflowException e) {
+ throw e;
+ }
}
}
@@ -87,32 +89,36 @@ public class MsgReader {
*/
DistributionMessage readMessage(Header header)
throws IOException, ClassNotFoundException {
- ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
- Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
- this.getStats().incMessagesBeingReceived(true, header.messageLength);
- long startSer = this.getStats().startMsgDeserialization();
- try {
- byteBufferInputStream.setBuffer(nioInputBuffer);
- ReplyProcessor21.initMessageRPId();
- return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
- } catch (RuntimeException e) {
- throw e;
- } catch (IOException e) {
- throw e;
- } finally {
- this.getStats().endMsgDeserialization(startSer);
- this.getStats().decMessagesBeingReceived(header.messageLength);
- ioFilter.doneReadingDirectAck(nioInputBuffer);
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+ Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
+ this.getStats().incMessagesBeingReceived(true, header.messageLength);
+ long startSer = this.getStats().startMsgDeserialization();
+ try {
+ byteBufferInputStream.setBuffer(nioInputBuffer);
+ ReplyProcessor21.initMessageRPId();
+ return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ this.getStats().endMsgDeserialization(startSer);
+ this.getStats().decMessagesBeingReceived(header.messageLength);
+ ioFilter.doneReadingDirectAck(nioInputBuffer);
+ }
}
}
void readChunk(Header header, MsgDestreamer md)
throws IOException {
- ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
- this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
- md.addChunk(unwrappedBuffer, header.messageLength);
- // show that the bytes have been consumed by adjusting the buffer's position
- unwrappedBuffer.position(unwrappedBuffer.position() + header.messageLength);
+ synchronized (ioFilter.getSynchObject()) {
+ ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
+ this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
+ md.addChunk(unwrappedBuffer, header.messageLength);
+ // show that the bytes have been consumed by adjusting the buffer's position
+ unwrappedBuffer.position(unwrappedBuffer.position() + header.messageLength);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index ef16a21..aef1672 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -128,23 +128,6 @@ public class NioSslEngineTest {
when(mockChannel.socket()).thenReturn(mockSocket);
when(mockSocket.isClosed()).thenReturn(false);
- // // initial read of handshake status followed by read of handshake status after task execution
- // when(mockEngine.getHandshakeStatus()).thenReturn(NEED_UNWRAP, NEED_WRAP);
- //
- // // interleaved wraps/unwraps/task-execution
- // when(mockEngine.unwrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
- // new SSLEngineResult(OK, NEED_WRAP, 100, 100),
- // new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0),
- // new SSLEngineResult(OK, NEED_TASK, 100, 0));
- //
- // when(mockEngine.getDelegatedTask()).thenReturn(() -> {
- // }, (Runnable) null);
- //
- // when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
- // new SSLEngineResult(OK, NEED_UNWRAP, 100, 100),
- // new SSLEngineResult(BUFFER_OVERFLOW, NEED_WRAP, 0, 0),
- // new SSLEngineResult(CLOSED, FINISHED, 100, 0));
- //
assertThatThrownBy(() -> spyNioSslEngine.handshake(mockChannel, 10000,
ByteBuffer.allocate(netBufferSize / 2))).isExactlyInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Provided buffer is too small");
@@ -210,6 +193,15 @@ public class NioSslEngineTest {
}
@Test
+ public void synchObjectIsSelf() {
+ // for thread-safety the synchronization object given to outside entities
+ // must be the the engine itself. This allows external manipulation or
+ // use of the engine's buffers to be protected in the same way as its synchronized
+ // methods
+ assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine);
+ }
+
+ @Test
public void wrap() throws Exception {
// make the application data too big to fit into the engine's encryption buffer
ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);