You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/10/27 23:18:10 UTC

[geode] branch support/1.13 updated: GEODE-8651: MsgReader's readHeader and readMessage should be synchron… (#5665)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 174c7cb  GEODE-8651: MsgReader's readHeader and readMessage should be synchron… (#5665)
174c7cb is described below

commit 174c7cbca04204d3a130c377e39e041f8b55515f
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Tue Oct 27 14:16:04 2020 -0700

    GEODE-8651: MsgReader's readHeader and readMessage should be synchron… (#5665)
    
    
        Co-authored-by: Anil <ag...@pivotal.io>
        Co-authored-by: Darrel Schneider <da...@vmware.com>
        Co-authored-by: Bill Burcham <bi...@gmail.com>
        Co-authored-by: Ernie Burghardt <eb...@pivotal.io>
    
    (cherry picked from commit c6a0b31cd663b43fb67d00ac3de863644c0bb6cc)
---
 .../org/apache/geode/internal/tcp/Connection.java  | 46 +++++++++++-------
 .../apache/geode/internal/tcp/ConnectionTest.java  | 54 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 16 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 5dcaa7c..9292727 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -797,7 +797,8 @@ public class Connection implements Runnable {
     }
   }
 
-  private void notifyHandshakeWaiter(boolean success) {
+  @VisibleForTesting
+  void clearSSLInputBuffer() {
     if (getConduit().useSSL() && ioFilter != null) {
       synchronized (ioFilter.getSynchObject()) {
         if (!ioFilter.isClosed()) {
@@ -807,7 +808,18 @@ public class Connection implements Runnable {
         }
       }
     }
+  }
+
+  @VisibleForTesting
+  void notifyHandshakeWaiter(boolean success) {
     synchronized (handshakeSync) {
+      /*
+       * Return early to avoid modifying ioFilter's buffer more than once.
+       */
+      if (handshakeRead || handshakeCancelled) {
+        return;
+      }
+      clearSSLInputBuffer();
       if (success) {
         handshakeRead = true;
       } else {
@@ -2646,25 +2658,27 @@ public class Connection implements Runnable {
     final Version version = getRemoteVersion();
     try {
       msgReader = new MsgReader(this, ioFilter, version);
-
-      Header header = msgReader.readHeader();
-
       ReplyMessage msg;
       int len;
-      if (header.getMessageType() == NORMAL_MSG_TYPE) {
-        msg = (ReplyMessage) msgReader.readMessage(header);
-        len = header.getMessageLength();
-      } else {
-        MsgDestreamer destreamer = obtainMsgDestreamer(header.getMessageId(), version);
-        while (header.getMessageType() == CHUNKED_MSG_TYPE) {
+
+      synchronized (ioFilter.getSynchObject()) {
+        Header header = msgReader.readHeader();
+
+        if (header.getMessageType() == NORMAL_MSG_TYPE) {
+          msg = (ReplyMessage) msgReader.readMessage(header);
+          len = header.getMessageLength();
+        } else {
+          MsgDestreamer destreamer = obtainMsgDestreamer(header.getMessageId(), version);
+          while (header.getMessageType() == CHUNKED_MSG_TYPE) {
+            msgReader.readChunk(header, destreamer);
+            header = msgReader.readHeader();
+          }
           msgReader.readChunk(header, destreamer);
-          header = msgReader.readHeader();
+          msg = (ReplyMessage) destreamer.getMessage();
+          releaseMsgDestreamer(header.getMessageId(), destreamer);
+          len = destreamer.size();
         }
-        msgReader.readChunk(header, destreamer);
-        msg = (ReplyMessage) destreamer.getMessage();
-        releaseMsgDestreamer(header.getMessageId(), destreamer);
-        len = destreamer.size();
-      }
+      } // sync
       // I'd really just like to call dispatchMessage here. However,
       // that call goes through a bunch of checks that knock about
       // 10% of the performance. Since this direct-ack stuff is all
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index 233de78..c064afb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -19,10 +19,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
@@ -113,4 +115,56 @@ public class ConnectionTest {
       assertThat(connection.getP2PConnectTimeout(distributionConfig)).isEqualTo(100);
     });
   }
+
+  private Connection createSpiedConnection() throws IOException {
+    ConnectionTable connectionTable = mock(ConnectionTable.class);
+    Distribution distribution = mock(Distribution.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    DMStats dmStats = mock(DMStats.class);
+    CancelCriterion stopper = mock(CancelCriterion.class);
+    SocketCloser socketCloser = mock(SocketCloser.class);
+    TCPConduit tcpConduit = mock(TCPConduit.class);
+
+    when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+    when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(connectionTable.getDM()).thenReturn(distributionManager);
+    when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+    when(distributionManager.getDistribution()).thenReturn(distribution);
+    when(stopper.cancelInProgress()).thenReturn(null);
+    when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+    when(tcpConduit.getDM()).thenReturn(distributionManager);
+    when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 10337));
+    when(tcpConduit.getStats()).thenReturn(dmStats);
+
+    SocketChannel channel = SocketChannel.open();
+
+    Connection connection = new Connection(connectionTable, channel.socket());
+    connection = spy(connection);
+    return connection;
+  }
+
+  @Test
+  public void firstCallToNotifyHandshakeWaiterWillClearSSLInputBuffer() throws Exception {
+    Connection connection = createSpiedConnection();
+    connection.notifyHandshakeWaiter(true);
+    verify(connection, times(1)).clearSSLInputBuffer();
+  }
+
+  @Test
+  public void secondCallWithTrueToNotifyHandshakeWaiterShouldNotClearSSLInputBuffer()
+      throws Exception {
+    Connection connection = createSpiedConnection();
+    connection.notifyHandshakeWaiter(true);
+    connection.notifyHandshakeWaiter(true);
+    verify(connection, times(1)).clearSSLInputBuffer();
+  }
+
+  @Test
+  public void secondCallWithFalseToNotifyHandshakeWaiterShouldNotClearSSLInputBuffer()
+      throws Exception {
+    Connection connection = createSpiedConnection();
+    connection.notifyHandshakeWaiter(true);
+    connection.notifyHandshakeWaiter(false);
+    verify(connection, times(1)).clearSSLInputBuffer();
+  }
 }