You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/11/20 22:02:08 UTC

[14/50] [abbrv] incubator-geode git commit: GEODE-77: avoids creating a split-brain if joining times out and network partition detection is not enabled

GEODE-77: avoids creating a split-brain if joining times out and network partition detection is not enabled


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/5feab824
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/5feab824
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/5feab824

Branch: refs/heads/develop
Commit: 5feab8241deb507c11152236b152b91d983eeace
Parents: cfba7ae
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Oct 23 13:05:46 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Oct 23 13:06:11 2015 -0700

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     |  4 -
 .../gms/locator/FindCoordinatorResponse.java    |  6 ++
 .../membership/gms/membership/GMSJoinLeave.java | 95 ++++++++++++--------
 .../membership/gms/messenger/Transport.java     | 15 ++++
 .../gemfire/internal/tcp/Connection.java        | 15 +++-
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |  2 +-
 6 files changed, 92 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index ed9f214..1ca206f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -266,10 +266,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   private void sendSuspectMessage(InternalDistributedMember mbr, String reason) {
-    if (beingSick || playingDead) {
-      logger.debug("sick member is not sending suspect message concerning {}", mbr);
-      return;
-    }
     logger.info("Sending suspect request {} reason=\"{}\"", mbr, reason);
     SuspectRequest sr = new SuspectRequest(mbr, reason);
     List<SuspectRequest> sl = new ArrayList<SuspectRequest>();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
index 5f9576b..0d2ba68 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java
@@ -65,6 +65,12 @@ public class FindCoordinatorResponse  extends HighPriorityDistributionMessage
     return coordinator;
   }
   
+  /**
+   * When the response comes from a locator via TcpClient this
+   * will return the locators member ID.  If the locator hasn't
+   * yet joined this may be null.
+   * @return
+   */
   public InternalDistributedMember getSenderId() {
     return senderId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 57611e6..5a792eb 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -163,7 +163,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     Set<InternalDistributedMember> registrants = new HashSet<>();
     InternalDistributedMember possibleCoordinator;
     int viewId = -1;
-    boolean hasContactedALocator;
+    int locatorsContacted = 0;
+    boolean hasContactedAJoinedLocator;
     NetView view;
     Set<FindCoordinatorResponse> responses = new HashSet<>();
     
@@ -201,10 +202,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       
       SearchState state = searchState;
       
+      long locatorWaitTime = services.getConfig().getLocatorWaitTime() * 1000;
       long timeout = services.getConfig().getJoinTimeout();
       logger.debug("join timeout is set to {}", timeout);
       long retrySleep =  JOIN_RETRY_SLEEP;
       long startTime = System.currentTimeMillis();
+      long locatorGiveUpTime = startTime + locatorWaitTime;
       long giveupTime = startTime + timeout;
   
       for (int tries=0; !this.isJoined; tries++) {
@@ -232,7 +235,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
             }
           }
         } else {
-          if (System.currentTimeMillis() > giveupTime) {
+          long now = System.currentTimeMillis();
+          if (state.locatorsContacted <= 0) {
+            if (now > locatorGiveUpTime) {
+              // break out of the loop and return false
+              break;
+            }
+            // reset the tries count and timer since we haven't actually tried to join yet
+            tries = 0;
+            giveupTime = now + timeout;
+          } else if (System.currentTimeMillis() > giveupTime) {
             break;
           }
         }
@@ -251,7 +263,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       
       // to preserve old behavior we need to throw a SystemConnectException if
       // unable to contact any of the locators
-      if (!this.isJoined && state.hasContactedALocator) {
+      if (!this.isJoined && state.hasContactedAJoinedLocator) {
         throw new SystemConnectException("Unable to join the distributed system in "
            + (System.currentTimeMillis()-startTime) + "ms");
       }
@@ -750,24 +762,27 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     
     assert this.localAddress != null;
     
-    // TODO - should we try more than one preferred coordinator
-    // before jumping to asking view-members who the coordinator is?
-    if ( !state.alreadyTried.isEmpty() && state.view != null) {
+    // If we've already tried to bootstrap from locators that
+    // haven't joined the system (e.g., a collocated locator)
+    // then jump to using the membership view to try to find
+    // the coordinator
+    if ( !state.hasContactedAJoinedLocator && state.view != null) {
       return findCoordinatorFromView();
     }
     
     FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
     Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
-    long waitTime = services.getConfig().getLocatorWaitTime() * 1000;
-    if (waitTime <= 0) {
-      waitTime = services.getConfig().getMemberTimeout() * 2;
-    }
-    long giveUpTime = System.currentTimeMillis() + waitTime;
+    
+    long giveUpTime = System.currentTimeMillis() + services.getConfig().getLocatorWaitTime() * 1000;
+    
     int connectTimeout = (int)services.getConfig().getMemberTimeout();
     boolean anyResponses = false;
     boolean flagsSet = false;
     
     logger.debug("sending {} to {}", request, locators);
+
+    state.hasContactedAJoinedLocator = false;
+    state.locatorsContacted = 0;
     
     do {
       for (InetSocketAddress addr: locators) { 
@@ -776,42 +791,46 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
               addr.getAddress(), addr.getPort(), request, connectTimeout, 
               true);
           FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
-          if (response != null && response.getCoordinator() != null) {
-            anyResponses = true;
-            NetView v = response.getView();
-            int viewId = v == null? -1 : v.getViewId();
-            if (viewId > state.viewId) {
-              // if the view has changed it is possible that a member
-              // that we already tried to join with will become coordinator
-              state.alreadyTried.clear();
-              state.viewId = viewId;
-              state.view = v;
-              state.registrants.clear();
-              if (response.getRegistrants() != null) {
-                state.registrants.addAll(response.getRegistrants());
-              }
+          // TODO we don't want to give up on the locators if we receive
+          // a response from a locator that's joined the system.  Otherwise
+          // we'll give up and cause a split-brain
+          if (response != null) {
+            state.locatorsContacted++;
+            if (response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
+              state.hasContactedAJoinedLocator = true;
             }
-            coordinators.add(response.getCoordinator());
-            if (!flagsSet) {
-              flagsSet = true;
-              inheritSettingsFromLocator(addr, response);
+            if (response.getCoordinator() != null) {
+              anyResponses = true;
+              NetView v = response.getView();
+              int viewId = v == null? -1 : v.getViewId();
+              if (viewId > state.viewId) {
+                // if the view has changed it is possible that a member
+                // that we already tried to join with will become coordinator
+                state.alreadyTried.clear();
+                state.viewId = viewId;
+                state.view = v;
+                state.registrants.clear();
+                if (response.getRegistrants() != null) {
+                  state.registrants.addAll(response.getRegistrants());
+                }
+              }
+              coordinators.add(response.getCoordinator());
+              if (!flagsSet) {
+                flagsSet = true;
+                inheritSettingsFromLocator(addr, response);
+              }
             }
           }
         } catch (IOException | ClassNotFoundException problem) {
         }
       }
-      if (coordinators.isEmpty()) {
-        return false;
-      }
-      if (!anyResponses) {
-        try { Thread.sleep(1000); } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return false;
-        }
-      }
     } while (!anyResponses && System.currentTimeMillis() < giveUpTime);
     
     
+    if (coordinators.isEmpty()) {
+      return false;
+    }
+
     Iterator<InternalDistributedMember> it = coordinators.iterator();
     if (coordinators.size() == 1) {
       state.possibleCoordinator = it.next();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
index adb49b9..9f91a74 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java
@@ -61,4 +61,19 @@ public class Transport extends UDP {
     super.init();
   }
 
+  /*
+   * (non-Javadoc)
+   * @see org.jgroups.protocols.UDP#stop()
+   * JGroups is not terminating its timer.  I contacted the jgroups-users
+   * email list about this.
+   */
+  @Override
+  public void stop() {
+    super.stop();
+    if (!getTimer().isShutdown()) {
+      getTimer().stop();
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 2e903f7..9f079fa 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -1884,12 +1884,18 @@ public class Connection implements Runnable {
         }
         catch (ClosedChannelException e) {
           this.readerShuttingDown = true;
+          if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+            initiateSuspicionIfShared();
+          }
           try { 
             requestClose(LocalizedStrings.Connection_CLOSEDCHANNELEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
           } catch (Exception ex) {}
           return;
         }
         catch (IOException e) {
+          if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+            initiateSuspicionIfShared();
+          }
           if (! isSocketClosed()
                 && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for Solaris jdk 1.4.2_08
                 ) {
@@ -1902,7 +1908,6 @@ public class Connection implements Runnable {
               }
             }
           }
-          initiateSuspicionIfShared();
           this.readerShuttingDown = true;
           try { 
             requestClose(LocalizedStrings.Connection_IOEXCEPTION_IN_CHANNEL_READ_0.toLocalizedString(e));
@@ -1914,6 +1919,9 @@ public class Connection implements Runnable {
           if (!stopped && ! isSocketClosed() ) {
             logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ, p2pReaderName()), e);
           }
+          if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+            initiateSuspicionIfShared();
+          }
           this.readerShuttingDown = true;
           try { 
             requestClose(LocalizedStrings.Connection_0_EXCEPTION_IN_CHANNEL_READ.toLocalizedString(e)); 
@@ -2434,6 +2442,9 @@ public class Connection implements Runnable {
         this.stopped = true;
       }
       catch (IOException io) {
+        if (this.owner.getConduit().getCancelCriterion().cancelInProgress() != null) {
+          initiateSuspicionIfShared();
+        }
         boolean closed = isSocketClosed()
                 || "Socket closed".equalsIgnoreCase(io.getMessage()); // needed for Solaris jdk 1.4.2_08
         if (!closed) {
@@ -2441,7 +2452,6 @@ public class Connection implements Runnable {
             logger.debug("{} io exception for {}", p2pReaderName(), this, io);
           }
         }
-        initiateSuspicionIfShared();
         this.readerShuttingDown = true;
         try { 
           requestClose(LocalizedStrings.Connection_IOEXCEPTION_RECEIVED_0.toLocalizedString(io));
@@ -2468,6 +2478,7 @@ public class Connection implements Runnable {
         if (!stopped && !(e instanceof InterruptedException) ) {
           logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_0_EXCEPTION_RECEIVED, p2pReaderName()), e);
         }
+        initiateSuspicionIfShared();
         if (isSocketClosed()) {
           stopped = true;
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5feab824/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 42594cf..41b0df7 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -136,7 +136,7 @@ public class GMSJoinLeaveJUnitTest {
     mockMembers[1].setVmViewId(viewId-1);
     set.add(mockMembers[1]);
     state.alreadyTried = set;
-    state.hasContactedALocator = true;
+    state.hasContactedAJoinedLocator = true;
     
     // simulate a response being received
     InternalDistributedMember sender = mockMembers[2];