You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/08 22:58:29 UTC

[02/50] [abbrv] incubator-geode git commit: ViewCreator thread sends another message when any member doesn't ack back to prepared view. And then it waits on future as this can happen for multiple members. In this case, If those members are not responsive

ViewCreator thread sends another message when any member doesn't ack back to
prepared view. And then it waits on future as this can happen for multiple members.
In this case, If those members are not responsive and other thread already determined that,
then we don't need to wait for those members. Thus now viewCreator thread checks
RemoveMember message for those members while waiting for response.
Another minor fix in same area.
And added unit test for that


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b11113fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b11113fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b11113fb

Branch: refs/heads/feature/GEODE-773-2
Commit: b11113fb78c43803bcc0e155b7ada15e290078a4
Parents: 43a2f41
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Wed Jan 27 15:51:27 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Wed Feb 3 09:18:22 2016 -0800

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     |   7 +-
 .../membership/gms/membership/GMSJoinLeave.java | 139 +++++++++++--------
 .../gms/membership/GMSJoinLeaveJUnitTest.java   | 139 ++++++++++++++++++-
 3 files changed, 224 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b11113fb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index dc549bf..172926b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1149,7 +1149,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   private void doFinalCheck(final InternalDistributedMember initiator,
-      List<SuspectRequest> sMembers, NetView cv, InternalDistributedMember localAddress) {
+      List<SuspectRequest> sMembers, final NetView cv, InternalDistributedMember localAddress) {
 
 //    List<InternalDistributedMember> membersChecked = new ArrayList<>(10);
     try {
@@ -1192,12 +1192,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
               logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
               boolean pinged;
-              NetView view = currentView;
-              int port = view.getFailureDetectionPort(mbr);
+              int port = cv.getFailureDetectionPort(mbr);
               if (port <= 0) {
                 logger.info("Unable to locate failure detection port - requesting a heartbeat");
                 if (logger.isDebugEnabled()) {
-                  logger.debug("\ncurrent view: {}\nports: {}", view, Arrays.toString(view.getFailureDetectionPorts()));
+                  logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
                 }
                 pinged = GMSHealthMonitor.this.doCheckMember(mbr);
                 GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b11113fb/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index e4925d7..0b0cfa0 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -632,7 +632,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       installView(newView);
       isJoined = true;
       if (viewCreator == null || viewCreator.isShutdown()) {
-        viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup());
+        createViewCreator();
         viewCreator.setDaemon(true);
         viewCreator.start();
         startViewBroadcaster();
@@ -669,7 +669,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         newView.setFailureDetectionPort(this.localAddress, services.getHealthMonitor().getFailureDetectionPort());
       }
       if (viewCreator == null || viewCreator.isShutdown()) {
-        viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup());
+        createViewCreator();
         viewCreator.setInitialView(newView, newView.getNewMembers(), leaving, removals);
         viewCreator.setDaemon(true);
         viewCreator.start();
@@ -678,6 +678,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }
 
+  protected void createViewCreator() {
+    viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup());
+  }
+
   private void sendRemoveMessages(List<InternalDistributedMember> removals, List<String> reasons, NetView newView) {
     Iterator<String> reason = reasons.iterator();
     for (InternalDistributedMember mbr : removals) {
@@ -1686,6 +1690,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   class ViewCreator extends Thread {
     boolean shutdown = false;
     volatile boolean waiting = false;
+    volatile boolean testFlagForRemovalRequest = false;
 
     /**
      * initial view to install.  guarded by synch on ViewCreator
@@ -2146,50 +2151,29 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
      * 
      * @param mbrs
      */
-    private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
+    private void removeHealthyMembers(final Collection<InternalDistributedMember> mbrs) throws InterruptedException {
       List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
 
       Set<InternalDistributedMember> newRemovals = new HashSet<>();
       Set<InternalDistributedMember> newLeaves = new HashSet<>();
 
-      synchronized (viewRequests) {
-        for (DistributionMessage msg : viewRequests) {
-          switch (msg.getDSFID()) {
-          case LEAVE_REQUEST_MESSAGE:
-            newLeaves.add(((LeaveRequestMessage) msg).getMemberID());
-            break;
-          case REMOVE_MEMBER_REQUEST:
-            newRemovals.add(((RemoveMemberMessage) msg).getMemberID());
-            break;
-          default:
-            break;
-          }
-        }
-      }
-
+      filterMembers(mbrs, newRemovals, REMOVE_MEMBER_REQUEST);
+      filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);   
+      
       for (InternalDistributedMember mbr : mbrs) {
-        if (newRemovals.contains(mbr)) {
-          // no need to do a health check on a member who is already leaving
-          logger.info("member {} is already scheduled for removal", mbr);
-          continue;
-        }
-        if (newLeaves.contains(mbr)) {
-          // no need to do a health check on a member that is declared crashed
-          logger.info("member {} has already sent a leave-request", mbr);
-          continue;
-        }
         final InternalDistributedMember fmbr = mbr;
         checkers.add(new Callable<InternalDistributedMember>() {
           @Override
           public InternalDistributedMember call() throws Exception {
             // return the member id if it fails health checks
-            logger.info("checking state of member " + fmbr);
-            if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
-              logger.info("member " + fmbr + " passed availability check");
-              return fmbr;
+            InternalDistributedMember mbr = GMSJoinLeave.this.checkIfAvailable(fmbr);
+            
+            synchronized (viewRequests) {
+              if(mbr != null)
+                mbrs.remove(mbr);
+              viewRequests.notifyAll();
             }
-            logger.info("member " + fmbr + " failed availability check");
-            return null;
+            return mbr;
           }
         });
       }
@@ -2199,7 +2183,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       if (mbrs.isEmpty()) {
         return;
       }
-
+      
       ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() {
         AtomicInteger i = new AtomicInteger();
 
@@ -2211,35 +2195,78 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       });
 
       try {
-        List<Future<InternalDistributedMember>> futures;
-        futures = svc.invokeAll(checkers);
         long giveUpTime = System.currentTimeMillis() + viewAckTimeout;
-        for (Future<InternalDistributedMember> future: futures) {
-          long now = System.currentTimeMillis();
-          try {
-            InternalDistributedMember mbr = null;
-            long timeToWait = giveUpTime - now;
-            if (timeToWait <= 0) {
-              // TODO if timeToWait==0 is future.get() guaranteed to return immediately?
-              // It looks like some code paths invoke Object.wait(0), which waits forever.
-              timeToWait = 1;
-            }
-            mbr = future.get(timeToWait, TimeUnit.MILLISECONDS);
-            if (mbr != null) {
-              mbrs.remove(mbr);
+        List<Future<InternalDistributedMember>> futures;
+        futures = submitAll(svc, checkers);
+        long waitTime = giveUpTime - System.currentTimeMillis();
+        synchronized (viewRequests) {
+          while(waitTime>0 ) {
+            logger.debug("removeHealthyMembers: mbrs" + mbrs.size());
+            
+            filterMembers(mbrs, newRemovals, REMOVE_MEMBER_REQUEST);
+            filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);   
+            
+            if(mbrs.isEmpty()) {
+              break;
             }
-          } catch (java.util.concurrent.TimeoutException e) {
-            // timeout - member didn't pass the final check and will not be removed
-            // from the collection of members
-          } catch (ExecutionException e) {
-            logger.info("unexpected exception caught during member verification", e);
+            
+            viewRequests.wait(waitTime);
+            waitTime = giveUpTime - System.currentTimeMillis();
           }
         }
+        
+        //we have waited for all members, now check if we considered any removeRequest;
+        //add them back to create new view
+        if(!newRemovals.isEmpty()) {
+          newRemovals.removeAll(newLeaves);
+          mbrs.addAll(newRemovals);
+        }
+        
       } finally {
         svc.shutdownNow();
       }
     }
+
+    protected void filterMembers(Collection<InternalDistributedMember> mbrs, Set<InternalDistributedMember> removalRequestForMembers, short requestType) {
+      Set<InternalDistributedMember> gotRemovalRequests = getPendingRequestIDs(requestType);
+      
+      if(!gotRemovalRequests.isEmpty()) {
+        logger.debug("removeHealthyMembers: gotRemovalRequests " + gotRemovalRequests.size());
+        Iterator<InternalDistributedMember> itr = gotRemovalRequests.iterator();
+        while(itr.hasNext()) {
+          InternalDistributedMember removeMember = itr.next();
+          if(mbrs.contains(removeMember)) {
+            testFlagForRemovalRequest = true;
+            removalRequestForMembers.add(removeMember);
+            mbrs.remove(removeMember);
+          }
+        }
+      }
+    }
+    
+    private <T> List<Future<T>> submitAll ( ExecutorService executor, Collection<? extends Callable<T> > tasks ) {
+      List<Future<T>> result = new ArrayList<Future<T>>( tasks.size() );
+
+      for ( Callable<T> task : tasks ) {
+        result.add(executor.submit(task));
+      }
+
+      return result;
+    }
+    
+    boolean getTestFlageForRemovalRequest() {
+      return testFlagForRemovalRequest;
+    }
   }
   
-  
+  InternalDistributedMember checkIfAvailable(InternalDistributedMember fmbr) {
+ // return the member id if it fails health checks
+    logger.info("checking state of member " + fmbr);
+    if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
+      logger.info("member " + fmbr + " passed availability check");
+      return fmbr;
+    }
+    logger.info("member " + fmbr + " failed availability check");
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b11113fb/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 5b53290..5becc6a 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -86,12 +86,18 @@ public class GMSJoinLeaveJUnitTest {
   private GMSJoinLeave gmsJoinLeave;
   private Manager manager;
   private Stopper stopper;
+  private InternalDistributedMember removeMember = null;
+  private InternalDistributedMember leaveMember = null;
   
   public void initMocks() throws IOException {
     initMocks(false);
   }
   
   public void initMocks(boolean enableNetworkPartition) throws UnknownHostException {
+    initMocks(enableNetworkPartition, false);
+  }
+  
+  public void initMocks(boolean enableNetworkPartition, boolean useTestGMSJoinLeave) throws UnknownHostException {
     mockDistConfig = mock(DistributionConfig.class);
     when(mockDistConfig.getEnableNetworkPartitionDetection()).thenReturn(enableNetworkPartition);
     when(mockDistConfig.getLocators()).thenReturn("localhost[8888]");
@@ -133,7 +139,10 @@ public class GMSJoinLeaveJUnitTest {
     }
     mockOldMember = new InternalDistributedMember("localhost", 8700, Version.GFE_56);
 
-    gmsJoinLeave = new GMSJoinLeave();
+    if(useTestGMSJoinLeave)
+      gmsJoinLeave = new GMSJoinLeaveTest();
+    else
+      gmsJoinLeave = new GMSJoinLeave();
     gmsJoinLeave.init(services);
     gmsJoinLeave.start();
     gmsJoinLeave.started();
@@ -1074,5 +1083,133 @@ public class GMSJoinLeaveJUnitTest {
       
     }   
   }
+  
+  private void waitForViewAndFinalCheckInProgress(int viewId) throws InterruptedException {
+    // wait for the view processing thread to collect and process the requests
+    int sleeps = 0;
+    while( !gmsJoinLeave.isStopping() 
+        && ( gmsJoinLeave.getView().getViewId() == viewId) ) {
+      if (sleeps++ > 20) {
+        System.out.println("view requests: " + gmsJoinLeave.getViewRequests());
+        System.out.println("current view: " + gmsJoinLeave.getView());
+        throw new RuntimeException("timeout waiting for view #" + viewId);
+      }
+       
+      Thread.sleep(1000);
+      System.out.println("Empty sleeps " + sleeps +  " stoppping: " + gmsJoinLeave.isStopping() );
+    }
+  }
+  
+  class GMSJoinLeaveTest extends GMSJoinLeave {
+    public GMSJoinLeaveTest() {
+      super();
+    }
+    @Override
+    InternalDistributedMember checkIfAvailable(InternalDistributedMember fmbr) {
+      if(removeMember != null) {
+        try {
+          if(removeMember.equals(fmbr)) {
+            GMSJoinLeaveJUnitTest.this.processRemoveMessage(fmbr);
+            Thread.sleep(1000000);
+          }
+        } catch (InterruptedException e) {
+        }
+        return fmbr;
+      }else if(leaveMember != null) {
+        try {
+          if(leaveMember.equals(fmbr)) {
+            GMSJoinLeaveJUnitTest.this.processLeaveMessage(fmbr);
+            Thread.sleep(1000000);
+          }
+        } catch (InterruptedException e) {
+        }
+        return fmbr;
+      }else {
+        return super.checkIfAvailable(fmbr);
+      }
+    }
+  }
+  
+  @Test
+  public void testRemoveRequestWhileWaitingForFinalResponse() throws Exception {
+    String reason = "testing";
+    initMocks(true, true);
+    
+    gmsJoinLeave.becomeCoordinatorForTest();
+    
+    installView();
+    
+    int viewId = gmsJoinLeave.getView().getViewId();
+    System.out.println("Current viewid " + viewId);
+    
+    this.removeMember = mockMembers[0]; 
+    
+    processJoinMessage(gmsJoinLeave.getMemberID(), mockMembers[2], 98989);
+    
+    waitForViewAndFinalCheckInProgress(viewId);
+    
+    this.removeMember = null;
+    
+    assertTrue("testFlagForRemovalRequest should be true", gmsJoinLeave.getViewCreator().getTestFlageForRemovalRequest());
+  }
+  
+  @Test
+  public void testLeaveRequestWhileWaitingForFinalResponse() throws Exception {
+    String reason = "testing";
+    initMocks(true, true);
+    
+    gmsJoinLeave.becomeCoordinatorForTest();
+    
+    installView();
+    
+    int viewId = gmsJoinLeave.getView().getViewId();
+    System.out.println("Current viewid " + viewId);
+    
+    this.leaveMember = mockMembers[0]; 
+    
+    processJoinMessage(gmsJoinLeave.getMemberID(), mockMembers[2], 98989);
+    
+    waitForViewAndFinalCheckInProgress(viewId);
+    
+    this.leaveMember = null;
+    
+    assertTrue("testFlagForRemovalRequest should be true", gmsJoinLeave.getViewCreator().getTestFlageForRemovalRequest());
+  }
+  
+  private void installView() throws Exception{
+    final int viewInstallationTime = 15000;
+    
+    NetView oldView = null;
+    long giveup = System.currentTimeMillis() + viewInstallationTime;
+    while (System.currentTimeMillis() < giveup  &&  oldView == null) {
+      Thread.sleep(500);
+      oldView = gmsJoinLeave.getView();
+    }
+    assertTrue(oldView != null);  // it should have become coordinator and installed a view
+    
+    NetView newView = new NetView(oldView, oldView.getViewId()+1);
+    newView.add(mockMembers[0]);
+    newView.add(mockMembers[1]);
+    gmsJoinLeave.installView(newView);
+  }
+  
+  private void processJoinMessage(InternalDistributedMember coordinator, InternalDistributedMember newMember, int port) {
+    JoinRequestMessage reqMsg = new JoinRequestMessage(coordinator, newMember, null, port);
+    gmsJoinLeave.processMessage(reqMsg);
+  }
+  
+  private void processRemoveMessage( InternalDistributedMember rMember) {
+    RemoveMemberMessage msg = new RemoveMemberMessage(gmsJoinLeave.getMemberID(), rMember, "testing");
+    msg.setSender(gmsJoinLeave.getMemberID());
+    
+    gmsJoinLeave.processMessage(msg);
+  }
+  
+  private void processLeaveMessage( InternalDistributedMember rMember) {
+    LeaveRequestMessage msg = new LeaveRequestMessage(gmsJoinLeave.getMemberID(), rMember, "testing");
+    msg.setSender(rMember);
+    
+    gmsJoinLeave.processMessage(msg);
+  }
 }