You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/10/27 23:18:10 UTC
[geode] branch support/1.13 updated: GEODE-8651: MsgReader's readHeader and readMessage should be synchron… (#5665)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 174c7cb GEODE-8651: MsgReader's readHeader and readMessage should be synchron… (#5665)
174c7cb is described below
commit 174c7cbca04204d3a130c377e39e041f8b55515f
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Tue Oct 27 14:16:04 2020 -0700
GEODE-8651: MsgReader's readHeader and readMessage should be synchron… (#5665)
Co-authored-by: Anil <ag...@pivotal.io>
Co-authored-by: Darrel Schneider <da...@vmware.com>
Co-authored-by: Bill Burcham <bi...@gmail.com>
Co-authored-by: Ernie Burghardt <eb...@pivotal.io>
(cherry picked from commit c6a0b31cd663b43fb67d00ac3de863644c0bb6cc)
---
.../org/apache/geode/internal/tcp/Connection.java | 46 +++++++++++-------
.../apache/geode/internal/tcp/ConnectionTest.java | 54 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 16 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 5dcaa7c..9292727 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -797,7 +797,8 @@ public class Connection implements Runnable {
}
}
- private void notifyHandshakeWaiter(boolean success) {
+ @VisibleForTesting
+ void clearSSLInputBuffer() {
if (getConduit().useSSL() && ioFilter != null) {
synchronized (ioFilter.getSynchObject()) {
if (!ioFilter.isClosed()) {
@@ -807,7 +808,18 @@ public class Connection implements Runnable {
}
}
}
+ }
+
+ @VisibleForTesting
+ void notifyHandshakeWaiter(boolean success) {
synchronized (handshakeSync) {
+ /*
+ * Return early to avoid modifying ioFilter's buffer more than once.
+ */
+ if (handshakeRead || handshakeCancelled) {
+ return;
+ }
+ clearSSLInputBuffer();
if (success) {
handshakeRead = true;
} else {
@@ -2646,25 +2658,27 @@ public class Connection implements Runnable {
final Version version = getRemoteVersion();
try {
msgReader = new MsgReader(this, ioFilter, version);
-
- Header header = msgReader.readHeader();
-
ReplyMessage msg;
int len;
- if (header.getMessageType() == NORMAL_MSG_TYPE) {
- msg = (ReplyMessage) msgReader.readMessage(header);
- len = header.getMessageLength();
- } else {
- MsgDestreamer destreamer = obtainMsgDestreamer(header.getMessageId(), version);
- while (header.getMessageType() == CHUNKED_MSG_TYPE) {
+
+ synchronized (ioFilter.getSynchObject()) {
+ Header header = msgReader.readHeader();
+
+ if (header.getMessageType() == NORMAL_MSG_TYPE) {
+ msg = (ReplyMessage) msgReader.readMessage(header);
+ len = header.getMessageLength();
+ } else {
+ MsgDestreamer destreamer = obtainMsgDestreamer(header.getMessageId(), version);
+ while (header.getMessageType() == CHUNKED_MSG_TYPE) {
+ msgReader.readChunk(header, destreamer);
+ header = msgReader.readHeader();
+ }
msgReader.readChunk(header, destreamer);
- header = msgReader.readHeader();
+ msg = (ReplyMessage) destreamer.getMessage();
+ releaseMsgDestreamer(header.getMessageId(), destreamer);
+ len = destreamer.size();
}
- msgReader.readChunk(header, destreamer);
- msg = (ReplyMessage) destreamer.getMessage();
- releaseMsgDestreamer(header.getMessageId(), destreamer);
- len = destreamer.size();
- }
+ } // sync
// I'd really just like to call dispatchMessage here. However,
// that call goes through a bunch of checks that knock about
// 10% of the performance. Since this direct-ack stuff is all
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index 233de78..c064afb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -19,10 +19,12 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
@@ -113,4 +115,56 @@ public class ConnectionTest {
assertThat(connection.getP2PConnectTimeout(distributionConfig)).isEqualTo(100);
});
}
+
+ private Connection createSpiedConnection() throws IOException {
+ ConnectionTable connectionTable = mock(ConnectionTable.class);
+ Distribution distribution = mock(Distribution.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ DMStats dmStats = mock(DMStats.class);
+ CancelCriterion stopper = mock(CancelCriterion.class);
+ SocketCloser socketCloser = mock(SocketCloser.class);
+ TCPConduit tcpConduit = mock(TCPConduit.class);
+
+ when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+ when(connectionTable.getConduit()).thenReturn(tcpConduit);
+ when(connectionTable.getDM()).thenReturn(distributionManager);
+ when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+ when(distributionManager.getDistribution()).thenReturn(distribution);
+ when(stopper.cancelInProgress()).thenReturn(null);
+ when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+ when(tcpConduit.getDM()).thenReturn(distributionManager);
+ when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 10337));
+ when(tcpConduit.getStats()).thenReturn(dmStats);
+
+ SocketChannel channel = SocketChannel.open();
+
+ Connection connection = new Connection(connectionTable, channel.socket());
+ connection = spy(connection);
+ return connection;
+ }
+
+ @Test
+ public void firstCallToNotifyHandshakeWaiterWillClearSSLInputBuffer() throws Exception {
+ Connection connection = createSpiedConnection();
+ connection.notifyHandshakeWaiter(true);
+ verify(connection, times(1)).clearSSLInputBuffer();
+ }
+
+ @Test
+ public void secondCallWithTrueToNotifyHandshakeWaiterShouldNotClearSSLInputBuffer()
+ throws Exception {
+ Connection connection = createSpiedConnection();
+ connection.notifyHandshakeWaiter(true);
+ connection.notifyHandshakeWaiter(true);
+ verify(connection, times(1)).clearSSLInputBuffer();
+ }
+
+ @Test
+ public void secondCallWithFalseToNotifyHandshakeWaiterShouldNotClearSSLInputBuffer()
+ throws Exception {
+ Connection connection = createSpiedConnection();
+ connection.notifyHandshakeWaiter(true);
+ connection.notifyHandshakeWaiter(false);
+ verify(connection, times(1)).clearSSLInputBuffer();
+ }
}