You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/10/26 19:28:52 UTC

incubator-geode git commit: GEODE-77: send Remove messages to members that are kicked out during view installation

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-77 63802dab6 -> 2847235c5


GEODE-77: send Remove messages to members that are kicked out during view installation

This also changes view preparation to only attempt to prepare the view once.
If the initial preparation fails we create the new view kicking out
unresponsive members and just send it out, along with the Remove messages.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2847235c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2847235c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2847235c

Branch: refs/heads/feature/GEODE-77
Commit: 2847235c5d1574031f431bae0c4eff8ed636f31e
Parents: 63802da
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon Oct 26 11:28:37 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon Oct 26 11:28:37 2015 -0700

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 21 ++++++-------
 .../gms/locator/FindCoordinatorResponse.java    |  1 +
 .../membership/gms/locator/GMSLocator.java      |  4 +++
 .../membership/gms/membership/GMSJoinLeave.java | 32 ++++++++++++++++----
 .../gms/messenger/JGroupsMessenger.java         | 11 -------
 5 files changed, 41 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2847235c/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 774ab37..7e11655 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.logging.log4j.Logger;
+import org.jgroups.util.UUID;
 
 import com.gemstone.gemfire.SystemConnectException;
 import com.gemstone.gemfire.distributed.DistributedMember;
@@ -363,9 +364,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     Socket clientSocket = new Socket();
     try {
       // establish TCP connection
-      for (Map.Entry<InternalDistributedMember, InetSocketAddress> entry : socketInfo.entrySet()) {
-        logger.info("socketInfo member:" + entry.getKey() + " port:" + entry.getValue().getPort());
-      }
+//      for (Map.Entry<InternalDistributedMember, InetSocketAddress> entry : socketInfo.entrySet()) {
+//        logger.info("socketInfo member:" + entry.getKey() + " port:" + entry.getValue().getPort());
+//      }
       logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, addr.getAddress(), addr.getPort());
       clientSocket.connect(addr, (int) services.getConfig().getMemberTimeout());
       if (clientSocket.isConnected()) {
@@ -568,10 +569,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         short version = in.readShort();
         long uuidLSBs = in.readLong();
         long uuidMSBs = in.readLong();
-        logger.debug("GMSHealthMonitor server socket received {} and {}.", uuidMSBs, uuidLSBs);
-        logger.debug("GMSHealthMonitor member uuid is {}", ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUUID());
-        if (uuidLSBs == ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUuidLSBs()
-            && uuidMSBs == ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUuidMSBs()) {
+        logger.debug("GMSHealthMonitor received health check UUID {}:{}.", uuidMSBs, uuidLSBs);
+        UUID myUUID = ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUUID();
+        logger.debug("GMSHealthMonitor my UUID is {}:{}", myUUID.getMostSignificantBits(), myUUID.getLeastSignificantBits());
+        if (uuidLSBs == myUUID.getLeastSignificantBits()
+            && uuidMSBs == myUUID.getMostSignificantBits()) {
           out.write(OK);
           out.flush();
           socket.shutdownOutput();
@@ -967,6 +969,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
               boolean pinged;
               InetSocketAddress addr = socketInfo.get(mbr);
               if (addr == null || addr.getPort() < 0) {
+                logger.info("Unable to locate failure detection port - requesting a heartbeat");
                 pinged = GMSHealthMonitor.this.doCheckMember(mbr);
               } else {
                 pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, addr);
@@ -1090,10 +1093,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   private void sendSuspectRequest(final List<SuspectRequest> requests) {
-    if (beingSick || playingDead) {
-      logger.debug("sick member is not sending suspect request");
-      return;
-    }
     logger.debug("Sending suspect request for members {}", requests);
     synchronized (suspectRequests) {
       if (suspectRequests.size() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2847235c/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
index 0d2ba68..3a12654 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
@@ -94,6 +94,7 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
     } else {
       return "FindCoordinatorResponse(coordinator="+coordinator+", fromView="+fromView+", viewId="+(view==null? "nul" : view.getViewId())
         +", registrants=" + (registrants == null? 0 : registrants.size())
+        +", senderId=" + senderId
         +", network partition detection enabled="+this.networkPartitionDetectionEnabled
         +", locators preferred as coordinators="+this.usePreferredCoordinators+")";
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2847235c/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
index 89f4d97..26d406a 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java
@@ -141,6 +141,10 @@ public class GMSLocator implements Locator, NetLocator {
       logger.debug("Peer locator processing {}", request);
     }
     
+    if (localAddress == null && services != null) {
+      localAddress = services.getMessenger().getMemberID();
+    }
+    
     if (request instanceof GetViewRequest) {
       if (view != null) {
         response = new GetViewResponse(view);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2847235c/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 6d39a6a..af050be 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -153,7 +153,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   /** a collection used to detect unit testing */
   Set<String> unitTesting = new HashSet<>();
-
+  
+  /** a test hook to make this member unresponsive */
+  private volatile boolean playingDead;
+  
   /** the view where quorum was most recently lost */
   NetView quorumLostView;
 
@@ -778,7 +781,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   }
 
   private void ackView(InstallViewMessage m) {
-    if (m.getView().contains(m.getView().getCreator())) {
+    if (!playingDead && m.getView().contains(m.getView().getCreator())) {
       services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
     }
   }
@@ -830,12 +833,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
               addr.getAddress(), addr.getPort(), request, connectTimeout, 
               true);
           FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
-          // TODO we don't want to give up on the locators if we receive
-          // a response from a locator that's joined the system.  Otherwise
-          // we'll give up and cause a split-brain
           if (response != null) {
             state.locatorsContacted++;
             if (response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
+              logger.debug("Locator's address indicates it is part of a distributed system "
+                  + "so I will not become membership coordinator on this attempt to join");
               state.hasContactedAJoinedLocator = true;
             }
             if (response.getCoordinator() != null) {
@@ -1170,7 +1172,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     int oldWeight = currentView.memberWeight();
     int failedWeight = newView.getCrashedMemberWeight(currentView);
     if (failedWeight > 0) {
-      if (logger.isInfoEnabled() && !newView.getCreator().equals(localAddress)) { // view-creator logs this
+      if (logger.isInfoEnabled()
+          && newView.getCreator().equals(localAddress)) { // view-creator logs this
         newView.logCrashedMemberWeights(currentView, logger);
       }
       int failurePoint = (int) (Math.round(51 * oldWeight) / 100.0);
@@ -1205,9 +1208,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   }
 
   public void playDead() {
+    playingDead = true;
   }
 
   public void beHealthy() {
+    playingDead = false;
   }
 
   @Override
@@ -1849,7 +1854,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           List<InternalDistributedMember> newMembers = new ArrayList<>(newView.getMembers());
           newMembers.removeAll(removalReqs);
           newView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs);
+          int size = failures.size();
+          List<String> reasons = new ArrayList<>(size);
+          for (int i=0; i<size; i++) {
+            reasons.add("Failed to acknowledge a new membership view and then failed tcp/ip connection attempt");
+          }
+          sendRemoveMessages(failures, reasons, newView);
+        }
+        
+        // if there is no conflicting view then we can count
+        // the current state as being prepared.  All members
+        // who are going to ack have already done so or passed
+        // a liveness test
+        if (conflictingView == null) {
+          prepared = true;
         }
+        
       } while (!prepared);
 
       lastConflictingView = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2847235c/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 14dfd28..ba941f8 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -477,17 +477,6 @@ public class JGroupsMessenger implements Messenger {
       throw new DistributedSystemDisconnectedException("Distributed System is shutting down");
     }
     
-    if (playingDead) {
-      Set result = new HashSet<>();
-      InternalDistributedMember[] rec = msg.getRecipients();
-      if (rec != null) {
-        for (int i=0; i<rec.length; i++) {
-          result.add(rec[i]);
-        }
-      }
-      return result;
-    }
-    
     filterOutgoingMessage(msg);
     
     InternalDistributedMember[] destinations = msg.getRecipients();