You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/06/15 16:17:51 UTC

[18/33] incubator-geode git commit: GEODE-1542 shared/unordered tcp/ip connection times out, initiating suspicion

GEODE-1542 shared/unordered tcp/ip connection times out, initiating suspicion

This disables timing out of shared/unordered TcpConduit connections.  We don't
want them to time out because we are using them to initiate suspect processing
on other members.

The ticket also pointed out a problem with the "final check" mechanism in
the health monitor.  I tracked that down to improper use of SocketCreator
to create the server-socket in GMSHealthMonitor.  It was creating sn SSL
socket if SSL is enabled but the client-side of the check uses non-SSL
sockets.  I changed the server to use non-SSL sockets as well since no
useful information is sent over the final-check TCP/IP connections & they
need to be lightweight and fast.

While looking at logs I also found that the heartbeat request sent at the
beginning of a final-check had a request-ID even though it's not waiting
for a response.  That causes processing of the response to do more work
than necessary so I changed it to remove the request-ID from the message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/33ceb371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/33ceb371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/33ceb371

Branch: refs/heads/feature/GEODE-420
Commit: 33ceb371554a13c7643ddaf9488ffa83963de1e7
Parents: 4afc5b1
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Jun 14 10:46:16 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Jun 14 10:47:53 2016 -0700

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 75 ++++++++++----------
 .../membership/gms/membership/GMSJoinLeave.java |  8 +--
 .../gms/messages/HeartbeatRequestMessage.java   |  8 +++
 .../gemfire/internal/SocketCreator.java         | 35 +++++++--
 .../gemfire/internal/tcp/Connection.java        |  4 +-
 5 files changed, 81 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index f27e0b8..203d9ce 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -19,6 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.fd;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+import static com.sun.corba.se.impl.naming.cosnaming.NamingUtils.debug;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -195,7 +196,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       return timeStamp;
     }
 
-    public void setTimeStamp(long timeStamp) {
+    public void setTime(long timeStamp) {
       this.timeStamp = timeStamp;
     }
   }
@@ -289,48 +290,41 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         long uuidMSBs = in.readLong();
         GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived();
         GMSHealthMonitor.this.stats.incTcpFinalCheckRequestsReceived();
-        boolean debug = logger.isDebugEnabled();
         GMSMember gmbr = (GMSMember) GMSHealthMonitor.this.localAddress.getNetMember();
         UUID myUUID = gmbr.getUUID();
         // during reconnect or rapid restart we will have a zero viewId but there may still
         // be an old ID in the membership view that we do not want to respond to
         int myVmViewId = gmbr.getVmViewId();
-        if (debug) {
-          if (playingDead) {
-            logger.debug("simulating sick member in health check");
-          } else if (vmViewId == myVmViewId
-            && uuidLSBs == myUUID.getLeastSignificantBits()
-            && uuidMSBs == myUUID.getMostSignificantBits()) {
-            logger.debug("UUID matches my own - sending OK reply");
-          } else {
-            logger.debug("GMSHealthMonitor my UUID is {},{} received is {},{}.  My viewID is {} received is {}",
-              Long.toHexString(myUUID.getMostSignificantBits()),
-              Long.toHexString(myUUID.getLeastSignificantBits()),
-              Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs),
-              myVmViewId, vmViewId);
-          }
-        }
-        if (!playingDead
-            && uuidLSBs == myUUID.getLeastSignificantBits()
-            && uuidMSBs == myUUID.getMostSignificantBits()
-            && vmViewId == myVmViewId) {
+        if (playingDead) {
+          logger.debug("HealthMonitor: simulating sick member in health check");
+        } else if (uuidLSBs == myUUID.getLeastSignificantBits()
+                   && uuidMSBs == myUUID.getMostSignificantBits()
+                   && vmViewId == myVmViewId) {
+          logger.debug("HealthMonitor: sending OK reply");
           out.write(OK);
           out.flush();
           socket.shutdownOutput();
           GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
           GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
           if (debug) {
-            logger.debug("GMSHealthMonitor server socket replied OK.");
+            logger.debug("HealthMonitor: server replied OK.");
+          }
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}.  My viewID is {} received is {}", 
+              Long.toHexString(myUUID.getMostSignificantBits()),
+              Long.toHexString(myUUID.getLeastSignificantBits()), 
+              Long.toHexString(uuidMSBs),
+              Long.toHexString(uuidLSBs),
+              myVmViewId, vmViewId);
           }
-        }
-        else {
           out.write(ERROR);
           out.flush();
           socket.shutdownOutput();
           GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
           GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
           if (debug) {
-            logger.debug("GMSHealthMonitor server socket replied ERROR.");
+            logger.debug("HealthMonitor: server replied ERROR.");
           }
         }
       } catch (IOException e) {
@@ -376,8 +370,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   private void contactedBy(InternalDistributedMember sender, long timeStamp) {
     TimeStamp cTS = new TimeStamp(timeStamp);
     cTS = memberTimeStamps.putIfAbsent(sender, cTS);
-    if (cTS != null) {
-      cTS.setTimeStamp(timeStamp);
+    if (cTS != null && cTS.getTime() < timeStamp) {
+      cTS.setTime(timeStamp);
     }
     if (suspectedMemberInView.remove(sender) != null) {
       logger.info("No longer suspecting {}", sender);
@@ -405,8 +399,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
       @Override
       public void run() {
-        // TODO GemFire used the tcp/ip connection but this is using heartbeats
-
         boolean pinged = false;
         try {
           pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
@@ -455,9 +447,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     long startTime = System.currentTimeMillis();
     logger.trace("Checking member {}", member);
     final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
-    final Response pingResp = new Response();
-    if(waitForResponse) {
+    Response pingResp = null;
+    if (waitForResponse) {
+      pingResp = new Response();
       requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+    } else {
+      hrm.clearRequestId();
     }
     try {
       Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
@@ -484,7 +479,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
             logger.trace("received heartbeat from {}", member);
             this.stats.incHeartbeatsReceived();
             if (ts != null) {
-              ts.setTimeStamp(System.currentTimeMillis());
+              ts.setTime(System.currentTimeMillis());
             }
             return true;
           }
@@ -547,12 +542,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
         GMSMember gmbr = (GMSMember) suspectMember.getNetMember();
         writeMemberToStream(gmbr, out);
-        clientSocket.shutdownOutput();
         this.stats.incFinalCheckRequestsSent();
         this.stats.incTcpFinalCheckRequestsSent();
-        logger.debug("Connected - reading response from suspect member {}", suspectMember);
+        logger.debug("Connected to suspect member - reading response");
         int b = in.read();
-        logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + b)));
+        }
         if (b >= 0) {
           this.stats.incFinalCheckResponsesReceived();
           this.stats.incTcpFinalCheckResponsesReceived();
@@ -560,7 +556,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         if (b == OK) {
           TimeStamp ts = memberTimeStamps.get(suspectMember);
           if (ts != null) {
-            ts.setTimeStamp(System.currentTimeMillis());
+            ts.setTime(System.currentTimeMillis());
           }
           return true;
         } else {
@@ -659,7 +655,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
     ServerSocket serverSocket = null;
     try {
-      serverSocket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange);
+      serverSocket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, 
+        true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false);
       socketPort = serverSocket.getLocalPort();
     } catch (IOException e) {
       throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e);
@@ -1285,8 +1282,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         //this will just send heartbeat request, it will not wait for response
         //if we will get heartbeat then it will change the timestamp, which we are 
         //checking below in case of tcp check failure..
-        GMSHealthMonitor.this.doCheckMember(mbr, false);
-        pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
+        doCheckMember(mbr, false);
+        pinged = doTCPCheckMember(mbr, port);
       }
   
       if (!pinged && !isStopping) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 8dce1a5..87fac53 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -533,7 +533,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
 
     if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-      logger.debug("JoinLeave is checking to see if I should become coordinator");
+      logger.debug("Checking to see if I should become coordinator");
       NetView check = new NetView(v, v.getViewId() + 1);
       check.remove(incomingRequest.getMemberID());
       synchronized (removedMembers) {
@@ -600,7 +600,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
 
     if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-      logger.debug("JoinLeave is checking to see if I should become coordinator");
+      logger.debug("Checking to see if I should become coordinator");
       NetView check = new NetView(v, v.getViewId() + 1);
       synchronized (removedMembers) {
         removedMembers.add(mbr);
@@ -630,7 +630,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   }
 
   private void recordViewRequest(DistributionMessage request) {
-    logger.debug("JoinLeave is recording the request to be processed in the next membership view");
+    logger.debug("Recording the request to be processed in the next membership view");
     synchronized (viewRequests) {
       viewRequests.add(request);
       viewRequests.notifyAll();
@@ -1441,7 +1441,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       if (view != null) {
         if (view.size() > 1) {
           List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 5);
-          logger.debug("JoinLeave sending a leave request to {}", coords);
+          logger.debug("Sending my leave request to {}", coords);
           LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down");
           services.getMessenger().send(m);
         } // view.size

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
index 3c08e33..e0f4515 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
@@ -41,6 +41,14 @@ public class HeartbeatRequestMessage extends HighPriorityDistributionMessage{
   public InternalDistributedMember getTarget() {
     return target;
   }
+
+  /**
+   * If no response is desired the requestId can be reset by invoking
+   * this method
+   */
+  public void clearRequestId() {
+    requestId = -1;
+  }
   
   @Override
   public int getDSFID() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index 367d4a7..739028c 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -16,6 +16,8 @@
  */
 package com.gemstone.gemfire.internal;
 
+import static javafx.scene.input.KeyCode.L;
+
 import com.gemstone.gemfire.GemFireConfigException;
 import com.gemstone.gemfire.SystemConnectException;
 import com.gemstone.gemfire.SystemFailure;
@@ -765,16 +767,22 @@ public class SocketCreator {
    *  SSL configuration is left up to JSSE properties in java.security file.
    */
   public ServerSocket createServerSocket( int nport, int backlog, InetAddress bindAddr ) throws IOException {
-    return createServerSocket( nport, backlog, bindAddr, -1 );
+    return createServerSocket( nport, backlog, bindAddr, -1, useSSL);
   }
 
   public ServerSocket createServerSocket(int nport, int backlog,
-      InetAddress bindAddr, int socketBufferSize)
+                                         InetAddress bindAddr, int socketBufferSize)
+    throws IOException {
+    return createServerSocket(nport, backlog, bindAddr, socketBufferSize, useSSL);
+  }
+  
+  private ServerSocket createServerSocket(int nport, int backlog,
+      InetAddress bindAddr, int socketBufferSize, boolean sslConnection)
       throws IOException {
     //       rw.readLock().lockInterruptibly();
 //       try {
         printConfig();
-        if ( this.useSSL ) {
+        if ( sslConnection ) {
           if (this.sslContext == null) {
             throw new GemFireConfigException("SSL not configured correctly, Please look at previous error");
           }
@@ -830,7 +838,23 @@ public class SocketCreator {
   public ServerSocket createServerSocketUsingPortRange(InetAddress ba, int backlog,
       boolean isBindAddress, boolean useNIO, int tcpBufferSize, int[] tcpPortRange)
       throws IOException {
-    
+    return createServerSocketUsingPortRange(ba, backlog, isBindAddress, useNIO, tcpBufferSize, tcpPortRange, this.useSSL);
+  }
+
+    /**
+     * Creates or bind server socket to a random port selected
+     * from tcp-port-range which is same as membership-port-range.
+     * @param ba
+     * @param backlog
+     * @param isBindAddress
+     * @param tcpBufferSize
+     * @param sslConnection whether to connect using SSL
+     * @return Returns the new server socket.
+     * @throws IOException
+     */
+    public ServerSocket createServerSocketUsingPortRange(InetAddress ba, int backlog,
+    boolean isBindAddress, boolean useNIO, int tcpBufferSize, int[] tcpPortRange, boolean sslConnection)
+    throws IOException {
     ServerSocket socket = null;
     int localPort = 0;
     int startingPort = 0;
@@ -862,7 +886,8 @@ public class SocketCreator {
           InetSocketAddress addr = new InetSocketAddress(isBindAddress ? ba : null, localPort);
           socket.bind(addr, backlog);
         } else {
-          socket = SocketCreator.getDefaultInstance().createServerSocket(localPort, backlog, isBindAddress? ba : null, tcpBufferSize);
+          socket = SocketCreator.getDefaultInstance()
+                                .createServerSocket(localPort, backlog, isBindAddress? ba : null, tcpBufferSize, sslConnection);
         }
         break;
       } catch (java.net.SocketException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 85e3511..6528877 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -569,7 +569,9 @@ public class Connection implements Runnable {
     if (isSocketClosed()) {
       return true;
     }
-    if (isSocketInUse()) {
+    if (isSocketInUse()
+        || (this.sharedResource && !this.preserveOrder)) { // shared/unordered connections are used for failure-detection
+                                                           // and are not subject to idle-timeout
       return false;
     }
     boolean isIdle = !this.accessed;