You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/04/12 23:22:20 UTC

[geode] branch develop updated: GEODE-3948 fixing handling of sotimeout in Message.receive()

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2758f58  GEODE-3948 fixing handling of sotimeout in Message.receive()
2758f58 is described below

commit 2758f58b9df310a79d7b77daa234398a1dbf8e0e
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Apr 12 16:20:24 2019 -0700

    GEODE-3948 fixing handling of sotimeout in Message.receive()
    
    Changes made for GEODE-3948 caused the Message.receive() method to block
    indefinitely waiting for a response.  That wasn't the intent - it should
    honor the current setting of sotimeout.
---
 .../geode/internal/cache/tier/sockets/Message.java | 26 ++++++++----
 .../cache/tier/sockets/MessageJUnitTest.java       | 46 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index b9c4a03..40d09e0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -111,8 +111,6 @@ public class Message {
   @Immutable
   private static final byte[] FALSE = defineFalse();
 
-  private static final int NO_HEADER_READ_TIMEOUT = 0;
-
   private static byte[] defineTrue() {
     try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) {
       BlobHelper.serializeTo(Boolean.TRUE, hdos);
@@ -665,16 +663,22 @@ public class Message {
     cb.clear();
   }
 
-  private void readHeaderAndBody(int headerReadTimeoutMillis) throws IOException {
+  private void readHeaderAndBody(boolean setHeaderReadTimeout, int headerReadTimeoutMillis)
+      throws IOException {
     clearParts();
     // TODO: for server changes make sure sc is not null as this class also used by client
 
-    int timeout = socket.getSoTimeout();
-    try {
+    int oldTimeout = -1;
+    if (setHeaderReadTimeout) {
+      oldTimeout = socket.getSoTimeout();
       socket.setSoTimeout(headerReadTimeoutMillis);
+    }
+    try {
       fetchHeader();
     } finally {
-      socket.setSoTimeout(timeout);
+      if (setHeaderReadTimeout) {
+        socket.setSoTimeout(oldTimeout);
+      }
     }
 
     final ByteBuffer cb = getCommBuffer();
@@ -1133,7 +1137,7 @@ public class Message {
   public void receiveWithHeaderReadTimeout(int timeoutMillis) throws IOException {
     if (this.socket != null) {
       synchronized (getCommBuffer()) {
-        readHeaderAndBody(timeoutMillis);
+        readHeaderAndBody(true, timeoutMillis);
       }
     } else {
       throw new IOException("Dead Connection");
@@ -1144,7 +1148,13 @@ public class Message {
    * Populates the state of this {@code Message} with information received via its socket
    */
   public void receive() throws IOException {
-    receiveWithHeaderReadTimeout(NO_HEADER_READ_TIMEOUT);
+    if (this.socket != null) {
+      synchronized (getCommBuffer()) {
+        readHeaderAndBody(false, -1);
+      }
+    } else {
+      throw new IOException("Dead Connection");
+    }
   }
 
   public void receive(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter,
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
index 7937cda..1eded76 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -172,4 +172,50 @@ public class MessageJUnitTest {
       }
     }
   }
+
+  @Test(expected = SocketTimeoutException.class)
+  public void messageWillTimeoutDuringRecvOnInactiveSocketWithoutExplicitTimeoutSetting()
+      throws Exception {
+    final ServerSocket serverSocket = new ServerSocket();
+    serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+    Thread serverThread = new Thread("acceptor thread") {
+      @Override
+      public void run() {
+        Socket client = null;
+        try {
+          client = serverSocket.accept();
+          Thread.sleep(12000);
+        } catch (InterruptedException e) {
+
+        } catch (IOException e) {
+
+        } finally {
+          if (client != null && !client.isClosed()) {
+            try {
+              client.close();
+            } catch (IOException e) {
+            }
+          }
+        }
+      }
+    };
+    serverThread.setDaemon(true);
+    serverThread.start();
+
+    try {
+      Socket socket = new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
+      socket.setSoTimeout(500);
+      MessageStats messageStats = mock(MessageStats.class);
+
+      message.setComms(socket, ByteBuffer.allocate(100), messageStats);
+      message.receive();
+
+    } finally {
+      serverThread.interrupt();
+      if (serverSocket != null && !serverSocket.isClosed()) {
+        serverSocket.close();
+      }
+    }
+  }
+
 }