You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/04/12 23:22:20 UTC
[geode] branch develop updated: GEODE-3948 fixing handling of
sotimeout in Message.receive()
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 2758f58 GEODE-3948 fixing handling of sotimeout in Message.receive()
2758f58 is described below
commit 2758f58b9df310a79d7b77daa234398a1dbf8e0e
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Apr 12 16:20:24 2019 -0700
GEODE-3948 fixing handling of sotimeout in Message.receive()
Changes made for GEODE-3948 caused the Message.receive() method to block
indefinitely waiting for a response. That wasn't the intent - it should
honor the current setting of sotimeout.
---
.../geode/internal/cache/tier/sockets/Message.java | 26 ++++++++----
.../cache/tier/sockets/MessageJUnitTest.java | 46 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 8 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index b9c4a03..40d09e0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -111,8 +111,6 @@ public class Message {
@Immutable
private static final byte[] FALSE = defineFalse();
- private static final int NO_HEADER_READ_TIMEOUT = 0;
-
private static byte[] defineTrue() {
try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) {
BlobHelper.serializeTo(Boolean.TRUE, hdos);
@@ -665,16 +663,22 @@ public class Message {
cb.clear();
}
- private void readHeaderAndBody(int headerReadTimeoutMillis) throws IOException {
+ private void readHeaderAndBody(boolean setHeaderReadTimeout, int headerReadTimeoutMillis)
+ throws IOException {
clearParts();
// TODO: for server changes make sure sc is not null as this class also used by client
- int timeout = socket.getSoTimeout();
- try {
+ int oldTimeout = -1;
+ if (setHeaderReadTimeout) {
+ oldTimeout = socket.getSoTimeout();
socket.setSoTimeout(headerReadTimeoutMillis);
+ }
+ try {
fetchHeader();
} finally {
- socket.setSoTimeout(timeout);
+ if (setHeaderReadTimeout) {
+ socket.setSoTimeout(oldTimeout);
+ }
}
final ByteBuffer cb = getCommBuffer();
@@ -1133,7 +1137,7 @@ public class Message {
public void receiveWithHeaderReadTimeout(int timeoutMillis) throws IOException {
if (this.socket != null) {
synchronized (getCommBuffer()) {
- readHeaderAndBody(timeoutMillis);
+ readHeaderAndBody(true, timeoutMillis);
}
} else {
throw new IOException("Dead Connection");
@@ -1144,7 +1148,13 @@ public class Message {
* Populates the state of this {@code Message} with information received via its socket
*/
public void receive() throws IOException {
- receiveWithHeaderReadTimeout(NO_HEADER_READ_TIMEOUT);
+ if (this.socket != null) {
+ synchronized (getCommBuffer()) {
+ readHeaderAndBody(false, -1);
+ }
+ } else {
+ throw new IOException("Dead Connection");
+ }
}
public void receive(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter,
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
index 7937cda..1eded76 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -172,4 +172,50 @@ public class MessageJUnitTest {
}
}
}
+
+ @Test(expected = SocketTimeoutException.class)
+ public void messageWillTimeoutDuringRecvOnInactiveSocketWithoutExplicitTimeoutSetting()
+ throws Exception {
+ final ServerSocket serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+ Thread serverThread = new Thread("acceptor thread") {
+ @Override
+ public void run() {
+ Socket client = null;
+ try {
+ client = serverSocket.accept();
+ Thread.sleep(12000);
+ } catch (InterruptedException e) {
+
+ } catch (IOException e) {
+
+ } finally {
+ if (client != null && !client.isClosed()) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+ };
+ serverThread.setDaemon(true);
+ serverThread.start();
+
+ try {
+ Socket socket = new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
+ socket.setSoTimeout(500);
+ MessageStats messageStats = mock(MessageStats.class);
+
+ message.setComms(socket, ByteBuffer.allocate(100), messageStats);
+ message.receive();
+
+ } finally {
+ serverThread.interrupt();
+ if (serverSocket != null && !serverSocket.isClosed()) {
+ serverSocket.close();
+ }
+ }
+ }
+
}