You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/10/08 21:42:31 UTC

[geode] branch support/1.12 updated: GEODE-8584: Message transmission fails with IllegalStateException in socket i/o code (#5605)

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 2ccaf7b  GEODE-8584: Message transmission fails with IllegalStateException in socket i/o code (#5605)
2ccaf7b is described below

commit 2ccaf7b8fe1cb490a3d3001b61b12ea52c5ed1d0
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Thu Oct 8 14:09:42 2020 -0700

    GEODE-8584: Message transmission fails with IllegalStateException in socket i/o code (#5605)
    
    * GEODE-8584: Message transmission fails with IllegalStateException in socket i/o code
    
    Add appropriate synchronization when using ioFilter's buffers.
    to do: add testing
    to do: document the need for synchronization in the NioFilter interface.
    
    * revised synchronization across all uses of NioFilter, added test
    
    * remove dangling debug logging
    
    * fix pmd problem
    
    * fix pmd problem
    
    * remove unnecessary volatile modifier
    
    (cherry picked from commit f4d44d658a6273d7de27479a67c05117310205a7)
---
 .../org/apache/geode/internal/net/BufferPool.java  |   7 +-
 .../org/apache/geode/internal/net/NioFilter.java   |  22 +-
 .../apache/geode/internal/net/NioSslEngine.java    |  18 +-
 .../org/apache/geode/internal/tcp/Connection.java  | 333 +++++++++++----------
 .../org/apache/geode/internal/tcp/MsgReader.java   |  82 ++---
 .../geode/internal/net/NioSslEngineTest.java       |  26 +-
 6 files changed, 266 insertions(+), 222 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index e119250..3938422 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -255,10 +255,15 @@ public class BufferPool {
    * "slice" of the buffer having the requested capacity and hand that out instead.
    * When we put the buffer back in the pool we need to find the original, non-sliced,
    * buffer. This is held in DirectBuffer in its "attachment" field, which is a public
-   * method, though DirectBuffer is package-private.
+   * method, though DirectBuffer is package-private. This method is visible for use
+   * in debugging and testing. For debugging, invoke this method if you need to see
+   * the non-sliced buffer for some reason, such as logging its hashcode.
    */
   @VisibleForTesting
   public ByteBuffer getPoolableBuffer(ByteBuffer buffer) {
+    if (!buffer.isDirect()) {
+      return buffer;
+    }
     ByteBuffer result = buffer;
     if (parentOfSliceMethod == null) {
       Class clazz = buffer.getClass();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 01556dc..9c437ad 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -21,7 +21,13 @@ import java.nio.channels.SocketChannel;
 /**
  * Prior to transmitting a buffer or processing a received buffer
  * a NioFilter should be called to wrap (transmit) or unwrap (received)
- * the buffer in case SSL is being used.
+ * the buffer in case SSL is being used.<br>
+ * Implementations of this class may not be thread-safe in regard to
+ * the buffers their methods return. These may be internal state that,
+ * if used concurrently by multiple threads could cause corruption.
+ * Appropriate external synchronization must be used in order to provide
+ * thread-safety. Do this by invoking getSynchObject() and synchronizing on
+ * the returned object while using the buffer.
  */
 public interface NioFilter {
 
@@ -75,6 +81,10 @@ public interface NioFilter {
     }
   }
 
+  default boolean isClosed() {
+    return false;
+  }
+
   /**
    * invoke this method when you are done using the NioFilter
    *
@@ -84,9 +94,15 @@ public interface NioFilter {
   }
 
   /**
-   * returns the unwrapped byte buffer associated with the given wrapped buffer
+   * returns the unwrapped byte buffer associated with the given wrapped buffer.
    */
   ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer);
 
-
+  /**
+   * returns an object to be used in synchronizing on the use of buffers returned by
+   * a NioFilter.
+   */
+  default Object getSynchObject() {
+    return this;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2d55fa3..2398b35 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -40,6 +40,7 @@ import javax.net.ssl.SSLSession;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.GemFireIOException;
+import org.apache.geode.annotations.internal.MakeImmutable;
 import org.apache.geode.internal.net.BufferPool.BufferType;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
@@ -52,6 +53,11 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
 public class NioSslEngine implements NioFilter {
   private static final Logger logger = LogService.getLogger();
 
+  // this variable requires the MakeImmutable annotation but the buffer is empty and
+  // not really modifiable
+  @MakeImmutable
+  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
   private final BufferPool bufferPool;
 
   private volatile boolean closed;
@@ -362,6 +368,10 @@ public class NioSslEngine implements NioFilter {
     // read-operations
   }
 
+  @Override
+  public synchronized boolean isClosed() {
+    return closed;
+  }
 
   @Override
   public void close(SocketChannel socketChannel) {
@@ -396,8 +406,12 @@ public class NioSslEngine implements NioFilter {
     } catch (IOException e) {
       throw new GemFireIOException("exception closing SSL session", e);
     } finally {
-      bufferPool.releaseBuffer(TRACKED_SENDER, myNetData);
-      bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData);
+      ByteBuffer netData = myNetData;
+      ByteBuffer appData = peerAppData;
+      myNetData = null;
+      peerAppData = EMPTY_BUFFER;
+      bufferPool.releaseBuffer(TRACKED_SENDER, netData);
+      bufferPool.releaseBuffer(TRACKED_RECEIVER, appData);
       this.closed = true;
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index ee9892b..f02beaf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -788,9 +788,13 @@ public class Connection implements Runnable {
 
   private void notifyHandshakeWaiter(boolean success) {
     if (getConduit().useSSL() && ioFilter != null) {
-      // clear out any remaining handshake bytes
-      ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
-      buffer.position(0).limit(0);
+      synchronized (ioFilter.getSynchObject()) {
+        if (!ioFilter.isClosed()) {
+          // clear out any remaining handshake bytes
+          ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
+          buffer.position(0).limit(0);
+        }
+      }
     }
     synchronized (handshakeSync) {
       if (success) {
@@ -2387,115 +2391,117 @@ public class Connection implements Runnable {
         long queueTimeoutTarget = now + asyncQueueTimeout;
         channel.configureBlocking(false);
         try {
-          ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
-          int waitTime = 1;
-          do {
-            owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-            retries++;
-            int amtWritten;
-            if (FORCE_ASYNC_QUEUE) {
-              amtWritten = 0;
-            } else {
-              amtWritten = channel.write(wrappedBuffer);
-            }
-            if (amtWritten == 0) {
-              now = System.currentTimeMillis();
-              long timeoutTarget;
-              if (!forceAsync) {
-                if (now > distributionTimeoutTarget) {
-                  if (logger.isDebugEnabled()) {
-                    if (distributionTimeoutTarget == 0) {
-                      logger.debug(
-                          "Starting async pusher to handle async queue because distribution-timeout is 1 and the last socket write would have blocked.");
-                    } else {
-                      long blockedMs = now - distributionTimeoutTarget;
-                      blockedMs += asyncDistributionTimeout;
-                      logger.debug(
-                          "Blocked for {}ms which is longer than the max of {}ms so starting async pusher to handle async queue.",
-                          blockedMs, asyncDistributionTimeout);
+          synchronized (ioFilter.getSynchObject()) {
+            ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+            int waitTime = 1;
+            do {
+              owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+              retries++;
+              int amtWritten;
+              if (FORCE_ASYNC_QUEUE) {
+                amtWritten = 0;
+              } else {
+                amtWritten = channel.write(wrappedBuffer);
+              }
+              if (amtWritten == 0) {
+                now = System.currentTimeMillis();
+                long timeoutTarget;
+                if (!forceAsync) {
+                  if (now > distributionTimeoutTarget) {
+                    if (logger.isDebugEnabled()) {
+                      if (distributionTimeoutTarget == 0) {
+                        logger.debug(
+                            "Starting async pusher to handle async queue because distribution-timeout is 1 and the last socket write would have blocked.");
+                      } else {
+                        long blockedMs = now - distributionTimeoutTarget;
+                        blockedMs += asyncDistributionTimeout;
+                        logger.debug(
+                            "Blocked for {}ms which is longer than the max of {}ms so starting async pusher to handle async queue.",
+                            blockedMs, asyncDistributionTimeout);
+                      }
+                    }
+                    stats.incAsyncDistributionTimeoutExceeded();
+                    if (totalAmtWritten > 0) {
+                      // we have written part of the msg to the socket buffer
+                      // and we are going to queue the remainder.
+                      // We set msg to null so that will not make
+                      // the partial msg a candidate for conflation.
+                      msg = null;
                     }
+                    if (handleBlockedWrite(wrappedBuffer, msg)) {
+                      return;
+                    }
+                  }
+                  timeoutTarget = distributionTimeoutTarget;
+                } else {
+                  boolean disconnectNeeded = false;
+                  long curQueuedBytes = queuedBytes;
+                  if (curQueuedBytes > asyncMaxQueueSize) {
+                    logger.warn(
+                        "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
+                        curQueuedBytes, asyncMaxQueueSize, remoteAddr);
+                    stats.incAsyncQueueSizeExceeded(1);
+                    disconnectNeeded = true;
                   }
-                  stats.incAsyncDistributionTimeoutExceeded();
-                  if (totalAmtWritten > 0) {
-                    // we have written part of the msg to the socket buffer
-                    // and we are going to queue the remainder.
-                    // We set msg to null so that will not make
-                    // the partial msg a candidate for conflation.
-                    msg = null;
+                  if (now > queueTimeoutTarget) {
+                    // we have waited long enough the pusher has been idle too long!
+                    long blockedMs = now - queueTimeoutTarget;
+                    blockedMs += asyncQueueTimeout;
+                    logger.warn(
+                        "Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
+                        blockedMs,
+                        asyncQueueTimeout, remoteAddr);
+                    stats.incAsyncQueueTimeouts(1);
+                    disconnectNeeded = true;
                   }
-                  if (handleBlockedWrite(wrappedBuffer, msg)) {
+                  if (disconnectNeeded) {
+                    disconnectSlowReceiver();
+                    synchronized (outgoingQueue) {
+                      asyncQueuingInProgress = false;
+                      outgoingQueue.notifyAll();
+                    }
                     return;
                   }
+                  timeoutTarget = queueTimeoutTarget;
                 }
-                timeoutTarget = distributionTimeoutTarget;
-              } else {
-                boolean disconnectNeeded = false;
-                long curQueuedBytes = queuedBytes;
-                if (curQueuedBytes > asyncMaxQueueSize) {
-                  logger.warn(
-                      "Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
-                      curQueuedBytes, asyncMaxQueueSize, remoteAddr);
-                  stats.incAsyncQueueSizeExceeded(1);
-                  disconnectNeeded = true;
-                }
-                if (now > queueTimeoutTarget) {
-                  // we have waited long enough the pusher has been idle too long!
-                  long blockedMs = now - queueTimeoutTarget;
-                  blockedMs += asyncQueueTimeout;
-                  logger.warn(
-                      "Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
-                      blockedMs,
-                      asyncQueueTimeout, remoteAddr);
-                  stats.incAsyncQueueTimeouts(1);
-                  disconnectNeeded = true;
-                }
-                if (disconnectNeeded) {
-                  disconnectSlowReceiver();
-                  synchronized (outgoingQueue) {
-                    asyncQueuingInProgress = false;
-                    outgoingQueue.notifyAll();
+                {
+                  long msToWait = waitTime;
+                  long msRemaining = timeoutTarget - now;
+                  if (msRemaining > 0) {
+                    msRemaining /= 2;
                   }
-                  return;
-                }
-                timeoutTarget = queueTimeoutTarget;
-              }
-              {
-                long msToWait = waitTime;
-                long msRemaining = timeoutTarget - now;
-                if (msRemaining > 0) {
-                  msRemaining /= 2;
-                }
-                if (msRemaining < msToWait) {
-                  msToWait = msRemaining;
-                }
-                if (msToWait <= 0) {
-                  Thread.yield();
-                } else {
-                  boolean interrupted = Thread.interrupted();
-                  try {
-                    Thread.sleep(msToWait);
-                  } catch (InterruptedException ex) {
-                    interrupted = true;
-                    owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
-                  } finally {
-                    if (interrupted) {
-                      Thread.currentThread().interrupt();
+                  if (msRemaining < msToWait) {
+                    msToWait = msRemaining;
+                  }
+                  if (msToWait <= 0) {
+                    Thread.yield();
+                  } else {
+                    boolean interrupted = Thread.interrupted();
+                    try {
+                      Thread.sleep(msToWait);
+                    } catch (InterruptedException ex) {
+                      interrupted = true;
+                      owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+                    } finally {
+                      if (interrupted) {
+                        Thread.currentThread().interrupt();
+                      }
                     }
                   }
                 }
+                if (waitTime < MAX_WAIT_TIME) {
+                  // double it since it is not yet the max
+                  waitTime <<= 1;
+                }
+              } // amtWritten == 0
+              else {
+                totalAmtWritten += amtWritten;
+                // reset queueTimeoutTarget since we made some progress
+                queueTimeoutTarget = System.currentTimeMillis() + asyncQueueTimeout;
+                waitTime = 1;
               }
-              if (waitTime < MAX_WAIT_TIME) {
-                // double it since it is not yet the max
-                waitTime <<= 1;
-              }
-            } // amtWritten == 0
-            else {
-              totalAmtWritten += amtWritten;
-              // reset queueTimeoutTarget since we made some progress
-              queueTimeoutTarget = System.currentTimeMillis() + asyncQueueTimeout;
-              waitTime = 1;
-            }
-          } while (wrappedBuffer.remaining() > 0);
+            } while (wrappedBuffer.remaining() > 0);
+          }
         } finally {
           channel.configureBlocking(true);
         }
@@ -2539,17 +2545,20 @@ public class Connection implements Runnable {
           }
           // fall through
         }
-        ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
-        while (wrappedBuffer.remaining() > 0) {
-          int amtWritten = 0;
-          long start = stats.startSocketWrite(true);
-          try {
-            amtWritten = channel.write(wrappedBuffer);
-          } finally {
-            stats.endSocketWrite(true, start, amtWritten, 0);
+        // synchronize on the ioFilter while using its network buffer
+        synchronized (ioFilter.getSynchObject()) {
+          ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+          while (wrappedBuffer.remaining() > 0) {
+            int amtWritten = 0;
+            long start = stats.startSocketWrite(true);
+            try {
+              amtWritten = channel.write(wrappedBuffer);
+            } finally {
+              stats.endSocketWrite(true, start, amtWritten, 0);
+            }
           }
-        }
 
+        }
       }
     } else {
       writeAsync(channel, buffer, forceAsync, msg, stats);
@@ -2675,68 +2684,70 @@ public class Connection implements Runnable {
   private void processInputBuffer() throws ConnectionException, IOException {
     inputBuffer.flip();
 
-    ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
-    peerDataBuffer.flip();
+    synchronized (ioFilter.getSynchObject()) {
+      ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+      peerDataBuffer.flip();
 
-    boolean done = false;
+      boolean done = false;
 
-    while (!done && connected) {
-      owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
-      int remaining = peerDataBuffer.remaining();
-      if (lengthSet || remaining >= MSG_HEADER_BYTES) {
-        if (!lengthSet) {
-          if (readMessageHeader(peerDataBuffer)) {
-            break;
+      while (!done && connected) {
+        owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+        int remaining = peerDataBuffer.remaining();
+        if (lengthSet || remaining >= MSG_HEADER_BYTES) {
+          if (!lengthSet) {
+            if (readMessageHeader(peerDataBuffer)) {
+              break;
+            }
           }
-        }
-        if (remaining >= messageLength + MSG_HEADER_BYTES) {
-          lengthSet = false;
-          peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
-          // don't trust the message deserialization to leave the position in
-          // the correct spot. Some of the serialization uses buffered
-          // streams that can leave the position at the wrong spot
-          int startPos = peerDataBuffer.position();
-          int oldLimit = peerDataBuffer.limit();
-          peerDataBuffer.limit(startPos + messageLength);
-
-          if (handshakeRead) {
-            try {
-              readMessage(peerDataBuffer);
-            } catch (SerializationException e) {
-              logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
-              throw e;
+          if (remaining >= messageLength + MSG_HEADER_BYTES) {
+            lengthSet = false;
+            peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES);
+            // don't trust the message deserialization to leave the position in
+            // the correct spot. Some of the serialization uses buffered
+            // streams that can leave the position at the wrong spot
+            int startPos = peerDataBuffer.position();
+            int oldLimit = peerDataBuffer.limit();
+            peerDataBuffer.limit(startPos + messageLength);
+
+            if (handshakeRead) {
+              try {
+                readMessage(peerDataBuffer);
+              } catch (SerializationException e) {
+                logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit);
+                throw e;
+              }
+            } else {
+              ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
+              DataInputStream dis = new DataInputStream(bbis);
+              if (!isReceiver) {
+                // we read the handshake and then stop processing since we don't want
+                // to process the input buffer anymore in a handshake thread
+                readHandshakeForSender(dis, peerDataBuffer);
+                return;
+              }
+              if (readHandshakeForReceiver(dis)) {
+                ioFilter.doneReading(peerDataBuffer);
+                return;
+              }
             }
-          } else {
-            ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
-            DataInputStream dis = new DataInputStream(bbis);
-            if (!isReceiver) {
-              // we read the handshake and then stop processing since we don't want
-              // to process the input buffer anymore in a handshake thread
-              readHandshakeForSender(dis, peerDataBuffer);
-              return;
+            if (!connected) {
+              continue;
             }
-            if (readHandshakeForReceiver(dis)) {
+            accessed();
+            peerDataBuffer.limit(oldLimit);
+            peerDataBuffer.position(startPos + messageLength);
+          } else {
+            done = true;
+            if (getConduit().useSSL()) {
               ioFilter.doneReading(peerDataBuffer);
-              return;
+            } else {
+              compactOrResizeBuffer(messageLength);
             }
           }
-          if (!connected) {
-            continue;
-          }
-          accessed();
-          peerDataBuffer.limit(oldLimit);
-          peerDataBuffer.position(startPos + messageLength);
         } else {
+          ioFilter.doneReading(peerDataBuffer);
           done = true;
-          if (getConduit().useSSL()) {
-            ioFilter.doneReading(peerDataBuffer);
-          } else {
-            compactOrResizeBuffer(messageLength);
-          }
         }
-      } else {
-        ioFilter.doneReading(peerDataBuffer);
-        done = true;
       }
     }
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 4561562..396ece2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -54,28 +54,30 @@ public class MsgReader {
   }
 
   Header readHeader() throws IOException {
-    ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+    synchronized (ioFilter.getSynchObject()) {
+      ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
 
-    Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
+      Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
 
-    try {
-      int nioMessageLength = unwrappedBuffer.getInt();
-      /* nioMessageVersion = */
-      Connection.calcHdrVersion(nioMessageLength);
-      nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
-      byte nioMessageType = unwrappedBuffer.get();
-      short nioMsgId = unwrappedBuffer.getShort();
+      try {
+        int nioMessageLength = unwrappedBuffer.getInt();
+        /* nioMessageVersion = */
+        Connection.calcHdrVersion(nioMessageLength);
+        nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
+        byte nioMessageType = unwrappedBuffer.get();
+        short nioMsgId = unwrappedBuffer.getShort();
 
-      boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
-      if (directAck) {
-        nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
-      }
+        boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
+        if (directAck) {
+          nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
+        }
 
-      header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+        header.setFields(nioMessageLength, nioMessageType, nioMsgId);
 
-      return header;
-    } catch (BufferUnderflowException e) {
-      throw e;
+        return header;
+      } catch (BufferUnderflowException e) {
+        throw e;
+      }
     }
 
   }
@@ -87,32 +89,36 @@ public class MsgReader {
    */
   DistributionMessage readMessage(Header header)
       throws IOException, ClassNotFoundException {
-    ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
-    Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
-    this.getStats().incMessagesBeingReceived(true, header.messageLength);
-    long startSer = this.getStats().startMsgDeserialization();
-    try {
-      byteBufferInputStream.setBuffer(nioInputBuffer);
-      ReplyProcessor21.initMessageRPId();
-      return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (IOException e) {
-      throw e;
-    } finally {
-      this.getStats().endMsgDeserialization(startSer);
-      this.getStats().decMessagesBeingReceived(header.messageLength);
-      ioFilter.doneReadingDirectAck(nioInputBuffer);
+    synchronized (ioFilter.getSynchObject()) {
+      ByteBuffer nioInputBuffer = readAtLeast(header.messageLength);
+      Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
+      this.getStats().incMessagesBeingReceived(true, header.messageLength);
+      long startSer = this.getStats().startMsgDeserialization();
+      try {
+        byteBufferInputStream.setBuffer(nioInputBuffer);
+        ReplyProcessor21.initMessageRPId();
+        return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (IOException e) {
+        throw e;
+      } finally {
+        this.getStats().endMsgDeserialization(startSer);
+        this.getStats().decMessagesBeingReceived(header.messageLength);
+        ioFilter.doneReadingDirectAck(nioInputBuffer);
+      }
     }
   }
 
   void readChunk(Header header, MsgDestreamer md)
       throws IOException {
-    ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
-    this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
-    md.addChunk(unwrappedBuffer, header.messageLength);
-    // show that the bytes have been consumed by adjusting the buffer's position
-    unwrappedBuffer.position(unwrappedBuffer.position() + header.messageLength);
+    synchronized (ioFilter.getSynchObject()) {
+      ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
+      this.getStats().incMessagesBeingReceived(md.size() == 0, header.messageLength);
+      md.addChunk(unwrappedBuffer, header.messageLength);
+      // show that the bytes have been consumed by adjusting the buffer's position
+      unwrappedBuffer.position(unwrappedBuffer.position() + header.messageLength);
+    }
   }
 
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index ef16a21..aef1672 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -128,23 +128,6 @@ public class NioSslEngineTest {
     when(mockChannel.socket()).thenReturn(mockSocket);
     when(mockSocket.isClosed()).thenReturn(false);
 
-    // // initial read of handshake status followed by read of handshake status after task execution
-    // when(mockEngine.getHandshakeStatus()).thenReturn(NEED_UNWRAP, NEED_WRAP);
-    //
-    // // interleaved wraps/unwraps/task-execution
-    // when(mockEngine.unwrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
-    // new SSLEngineResult(OK, NEED_WRAP, 100, 100),
-    // new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0),
-    // new SSLEngineResult(OK, NEED_TASK, 100, 0));
-    //
-    // when(mockEngine.getDelegatedTask()).thenReturn(() -> {
-    // }, (Runnable) null);
-    //
-    // when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn(
-    // new SSLEngineResult(OK, NEED_UNWRAP, 100, 100),
-    // new SSLEngineResult(BUFFER_OVERFLOW, NEED_WRAP, 0, 0),
-    // new SSLEngineResult(CLOSED, FINISHED, 100, 0));
-    //
     assertThatThrownBy(() -> spyNioSslEngine.handshake(mockChannel, 10000,
         ByteBuffer.allocate(netBufferSize / 2))).isExactlyInstanceOf(IllegalArgumentException.class)
             .hasMessageContaining("Provided buffer is too small");
@@ -210,6 +193,15 @@ public class NioSslEngineTest {
   }
 
   @Test
+  public void synchObjectIsSelf() {
+    // for thread-safety the synchronization object given to outside entities
+    // must be the the engine itself. This allows external manipulation or
+    // use of the engine's buffers to be protected in the same way as its synchronized
+    // methods
+    assertThat(nioSslEngine.getSynchObject()).isSameAs(nioSslEngine);
+  }
+
+  @Test
   public void wrap() throws Exception {
     // make the application data too big to fit into the engine's encryption buffer
     ByteBuffer appData = ByteBuffer.allocate(nioSslEngine.myNetData.capacity() + 100);