You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ma...@apache.org on 2015/11/23 21:48:35 UTC

[05/50] [abbrv] incubator-geode git commit: GEODE-77: members unfairly forced out of the distributed system

GEODE-77: members unfairly forced out of the distributed system

n integration testing members were, under heavy load, being forced out
of the distributed system in spite of the new tcp/ip final-checks.

This change set adds a recent-activity check to the doTCPCheckMember method
of GMSHealthMonitor.  It also corrects this method to use SocketCreator with
a background timeout task in case the tcp/ip socket timeout fails to work
(as it sometimes does).

During testing I also saw that GMSJoinLeave was not initiating a forced-
disconnect if it received a membership view showing that it was no longer
in the distributed system and modified it to do so.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/28d273cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/28d273cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/28d273cb

Branch: refs/heads/feature/GEODE-53
Commit: 28d273cb0f19ff2d6b2b506b1d82b91b5ce2ce36
Parents: 026d149
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon Nov 9 08:44:17 2015 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon Nov 9 08:44:17 2015 -0800

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 107 ++++++++++++++-----
 .../membership/gms/membership/GMSJoinLeave.java |   6 +-
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |  19 +++-
 3 files changed, 99 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/28d273cb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 0307f5b..9e410e2 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -54,6 +56,8 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.Heartbe
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
 import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.ConnectionWatcher;
+import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.Version;
 
 /**
@@ -262,6 +266,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       try {
         DataInputStream in = new DataInputStream(socket.getInputStream());
         OutputStream out = socket.getOutputStream();
+        @SuppressWarnings("unused")
         short version = in.readShort();
         int  vmViewId = in.readInt();
         long uuidLSBs = in.readLong();
@@ -309,7 +314,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           }
         }
       } catch (IOException e) {
-        logger.trace("Unexpected exception", e);
+        logger.debug("Unexpected exception", e);
+      } catch (RuntimeException e) {
+        logger.debug("Unexpected runtime exception", e);
+        throw e;
+      } catch (Error e) {
+        logger.debug("Unexpected error", e);
+        throw e;
       } finally {
         if (socket != null) {
           try {
@@ -369,7 +380,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         if (!pinged) {
           suspectedMemberInView.put(mbr, currentView);
           String reason = "Member isn't responding to heartbeat requests";
-          GMSHealthMonitor.this.sendSuspectMessage(mbr, reason);
+          GMSHealthMonitor.this.initiateSuspicion(mbr, reason);
         } else {
           logger.trace("Setting next neighbor as member {} has responded.", mbr);
           suspectedMemberInView.remove(mbr);
@@ -381,7 +392,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   }
 
-  private void sendSuspectMessage(InternalDistributedMember mbr, String reason) {
+  private void initiateSuspicion(InternalDistributedMember mbr, String reason) {
     SuspectRequest sr = new SuspectRequest(mbr, reason);
     List<SuspectRequest> sl = new ArrayList<SuspectRequest>();
     sl.add(sr);
@@ -418,16 +429,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           TimeStamp ts = memberTimeStamps.get(member);
           if (pingResp.getResponseMsg() == null) {
             // double check the activity map
-            long now = System.currentTimeMillis();
             if (isStopping) {
               return true;
             }
-            if (ts != null && (now - ts.getTime()) <= memberTimeout) {
-              logger.trace("detected message traffic from member {}ms ago.  member-timeout is {}", now - ts.getTime(),
-                  memberTimeout);
+            if (checkRecentActivity(member)) {
               return true;
             }
-            logger.trace("no heartbeat response received from {}", member);
+            logger.trace("no heartbeat response received from {} and no recent activity", member);
             return false;
           } else {
             logger.trace("received heartbeat from {}", member);
@@ -447,6 +455,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   /**
+   * Check for recent messaging activity from the given member
+   * @param suspectMember
+   * @return whether there has been activity within memberTimeout ms
+   */
+  private boolean checkRecentActivity(InternalDistributedMember suspectMember) {
+    TimeStamp ts = memberTimeStamps.get(suspectMember);
+    return (ts != null && (System.currentTimeMillis() - ts.getTime()) <= memberTimeout);
+  }
+
+  /**
    * During final check, establish TCP connection between current member and suspect member.
    * And exchange PING/PONG message to see if the suspect member is still alive.
    * 
@@ -454,15 +472,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
    */
   private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
-    Socket clientSocket = new Socket();
+    Socket clientSocket = null;
+    // first check for a recent timestamp
+    if (checkRecentActivity(suspectMember)) {
+      return true;
+    }
     try {
       // establish TCP connection
 //      for (Map.Entry<InternalDistributedMember, InetSocketAddress> entry : socketInfo.entrySet()) {
 //        logger.info("socketInfo member:" + entry.getKey() + " port:" + entry.getValue().getPort());
 //      }
       logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
-      InetSocketAddress addr = new InetSocketAddress(suspectMember.getInetAddress(), port);
-      clientSocket.connect(addr, (int) services.getConfig().getMemberTimeout());
+      clientSocket = SocketCreator.getDefaultInstance().connect(suspectMember.getInetAddress(), port,
+          (int)memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
       if (clientSocket.isConnected()) {
         clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
         InputStream in = clientSocket.getInputStream();
@@ -485,15 +507,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           return true;
         } else {
           //received ERROR
-          return false;
+          return checkRecentActivity(suspectMember);
         }
       } else {// cannot establish TCP connection with suspect member
-        return false;
+        return checkRecentActivity(suspectMember);
       }
     } catch (SocketTimeoutException e) {
+      logger.debug("tcp/ip connection timed out");
       return false;
     } catch (IOException e) {
-      logger.trace("Unexpected exception", e);
+      logger.debug("Unexpected exception", e);
     } finally {
       try {
         if (clientSocket != null) {
@@ -515,7 +538,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    */
   @Override
   public void suspect(InternalDistributedMember mbr, String reason) {
-    sendSuspectMessage(mbr, reason);
+    initiateSuspicion(mbr, reason);
     // Background suspect-collecting thread is currently disabled - it takes too long
 //    synchronized (suspectRequests) {
 //      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
@@ -898,13 +921,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   @Override
   public void beSick() {
     this.beingSick = true;
-    sendSuspectMessage(localAddress, "beSick invoked on GMSHealthMonitor");
+    initiateSuspicion(localAddress, "beSick invoked on GMSHealthMonitor");
   }
 
   @Override
   public void playDead() {
     this.playingDead = true;
-    sendSuspectMessage(localAddress, "playDead invoked on GMSHealthMonitor");
+    initiateSuspicion(localAddress, "playDead invoked on GMSHealthMonitor");
   }
 
   @Override
@@ -1163,11 +1186,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
               }
               if (!failed) {
                 logger.info("Final check passed");
+                contactedBy(mbr);
               }
               // whether it's alive or not, at this point we allow it to
               // be watched again
               suspectedMemberInView.remove(mbr);
-              contactedBy(mbr);
             } catch (DistributedSystemDisconnectedException e) {
               return;
             } catch (Exception e) {
@@ -1183,6 +1206,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       membersInFinalCheck.removeAll(membersChecked);
     }
   }
+  @Override
+  public void memberShutdown(DistributedMember mbr, String reason) {
+  }
+  
+  @Override
+  public int getFailureDetectionPort() {
+    return this.socketPort;
+  }
 
   interface Callback<T> {
     public void process(List<T> requests);
@@ -1318,13 +1349,37 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     }
   }
 
-  @Override
-  public void memberShutdown(DistributedMember mbr, String reason) {
-  }
-  
-  @Override
-  public int getFailureDetectionPort() {
-    return this.socketPort;
+  private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher {
+    Timer scheduler;
+    Socket socket;
+    long timeout;
+    
+    ConnectTimeoutTask(Timer scheduler, long timeout) {
+      this.scheduler = scheduler;
+      this.timeout = timeout;
+    }
+    
+    @Override
+    public void beforeConnect(Socket socket) {
+      this.socket = socket;
+      scheduler.schedule(this, timeout);
+    }
+
+    @Override
+    public void afterConnect(Socket socket) {
+      cancel();
+    }
+    
+    @Override
+    public void run() {
+      try {
+        if (socket != null) {
+          socket.close();
+        }
+      } catch (IOException e) {
+        // ignored - nothing useful to do here
+      }
+    }
+    
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/28d273cb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 4b1993f..a6121dd 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -782,10 +782,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         }
       }
     } else { // !preparing
-      if (currentView != null && !view.contains(this.localAddress)) {
-        if (quorumRequired) {
-          forceDisconnect("This node is no longer in the membership view");
-        }
+      if (isJoined && currentView != null && !view.contains(this.localAddress)) {
+        forceDisconnect("This node is no longer in the membership view");
       } else {
         if (!m.isRebroadcast()) { // no need to ack a rebroadcast view
           ackView(m);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/28d273cb/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 89282c0..562fd70 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -1,5 +1,6 @@
 package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -7,12 +8,12 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -170,7 +171,19 @@ public class GMSJoinLeaveJUnitTest {
     assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
     verify(messenger).send(any(JoinResponseMessage.class));
   }
-  
+  @Test
+  public void testViewWithoutMemberInitiatesForcedDisconnect() throws Exception {
+    initMocks();
+    gmsJoinLeave.becomeCoordinatorForTest();
+    List<InternalDistributedMember> members = Arrays.asList(mockMembers);
+    Set<InternalDistributedMember> empty = Collections.<InternalDistributedMember>emptySet();
+    NetView v = new NetView(mockMembers[0], 2, members, empty, empty);
+    InstallViewMessage message = new InstallViewMessage(v, null);
+    gmsJoinLeave.processMessage(message);
+    verify(manager).forceDisconnect(any(String.class));
+  }
+
+
   @Test
   public void testProcessJoinMessageWithBadAuthentication() throws IOException {
     initMocks();