You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by wi...@apache.org on 2018/06/01 16:29:17 UTC

[geode] branch develop updated: GEODE-5268: fixing race condition in GMSHealthMonitor (#2005)

This is an automated email from the ASF dual-hosted git repository.

wirebaron pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new d3a9f75  GEODE-5268: fixing race condition in GMSHealthMonitor (#2005)
d3a9f75 is described below

commit d3a9f75157fac632ca0f82aab1426e2f9beab9ab
Author: Brian Rowe <br...@pivotal.io>
AuthorDate: Fri Jun 1 09:28:43 2018 -0700

    GEODE-5268: fixing race condition in GMSHealthMonitor (#2005)
    
    * GEODE-5268: fixing race condition in GMSHealthMonitor
---
 .../membership/gms/fd/GMSHealthMonitor.java        |  6 --
 .../gms/fd/GMSHealthMonitorJUnitTest.java          | 95 ++++++++++++++++++++++
 2 files changed, 95 insertions(+), 6 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 9378ec6..13af6d4 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -731,7 +731,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         if (!ssocket.isClosed()) {
           try {
             ssocket.close();
-            serverSocket = null;
             logger.info("GMSHealthMonitor server socket closed.");
           } catch (IOException e) {
             logger.debug("Unexpected exception", e);
@@ -979,7 +978,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       if (serverSocket != null && !serverSocket.isClosed()) {
         try {
           serverSocket.close();
-          serverSocket = null;
           logger.info("GMSHealthMonitor server socket is closed in stopServices().");
         } catch (IOException e) {
           logger.trace("Unexpected exception", e);
@@ -994,10 +992,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       logger.info("GMSHealthMonitor serverSocketExecutor is "
           + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
     }
-
-    // if (suspectRequestCollectorThread != null) {
-    // suspectRequestCollectorThread.shutdown();
-    // }
   }
 
   /***
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 468943a..3ed2537 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -44,11 +44,15 @@ import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.awaitility.Awaitility;
 import org.jgroups.util.UUID;
@@ -186,6 +190,16 @@ public class GMSHealthMonitorJUnitTest {
     assertEquals(1, gmsHealthMonitor.getStats().getHeartbeatsSent());
   }
 
+  @Test
+  public void testHMServiceHandlesShutdownRace() throws IOException {
+    // The health monitor starts a thread to monitor the tcp socket, both that thread and the
+    // stopServices call will attempt to shut down the socket during a normal close. This test tries
+    // to create a problematic ordering to make sure we still shutdown properly.
+    ((GMSHealthMonitorTest) gmsHealthMonitor).useBlockingSocket = true;
+    gmsHealthMonitor.started();
+    gmsHealthMonitor.stop();
+  }
+
   /**
    * checks who is next neighbor
    */
@@ -799,6 +813,8 @@ public class GMSHealthMonitorJUnitTest {
   }
 
   public class GMSHealthMonitorTest extends GMSHealthMonitor {
+    public boolean useBlockingSocket = false;
+
     @Override
     boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
       if (useGMSHealthMonitorTestClass) {
@@ -809,5 +825,84 @@ public class GMSHealthMonitorJUnitTest {
       }
       return super.doTCPCheckMember(suspectMember, port);
     }
+
+    @Override
+    ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
+      final ServerSocket serverSocket = super.createServerSocket(socketAddress, portRange);
+      if (useBlockingSocket) {
+        try {
+          return new TrickySocket(serverSocket);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        return serverSocket;
+      }
+    }
+  }
+
+  public class TrickySocket extends ServerSocket {
+    ServerSocket wrappedSocket;
+    final Lock lock = new ReentrantLock();
+    boolean firstWait = true;
+    final Condition block = lock.newCondition();
+
+    public TrickySocket(ServerSocket wrappee) throws IOException {
+      wrappedSocket = wrappee;
+    }
+
+    @Override
+    public void bind(SocketAddress endpoint) throws IOException {
+      wrappedSocket.bind(endpoint);
+    }
+
+    @Override
+    public void bind(SocketAddress endpoint, int backlog) throws IOException {
+      wrappedSocket.bind(endpoint, backlog);
+    }
+
+    @Override
+    public InetAddress getInetAddress() {
+      return wrappedSocket.getInetAddress();
+    }
+
+    @Override
+    public int getLocalPort() {
+      return wrappedSocket.getLocalPort();
+    }
+
+    @Override
+    public SocketAddress getLocalSocketAddress() {
+      return wrappedSocket.getLocalSocketAddress();
+    }
+
+    @Override
+    public Socket accept() throws IOException {
+      return wrappedSocket.accept();
+    }
+
+    @Override
+    public void close() throws IOException {
+      wrappedSocket.close();
+      lock.lock();
+      block.signal();
+      lock.unlock();
+    }
+
+    @Override
+    public boolean isClosed() {
+      final boolean closed = wrappedSocket.isClosed();
+      lock.lock();
+      if (firstWait) {
+        firstWait = false;
+        try {
+          block.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      lock.unlock();
+      return closed;
+    }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
wirebaron@apache.org.