You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/01 01:09:36 UTC

[27/34] incubator-geode git commit: GEODE-870: test case fixes.

GEODE-870: test case fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0af59683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0af59683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0af59683

Branch: refs/heads/feature/GEODE-949-2
Commit: 0af59683b847ea431e4b653867e74c3d714c8ac1
Parents: 8b24a4f
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Wed Feb 17 14:51:19 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Mon Feb 29 16:36:08 2016 +1100

----------------------------------------------------------------------
 .../membership/gms/membership/GMSJoinLeave.java | 212 ++++++++++++-------
 .../cache30/CacheSerializableRunnable.java      |  16 +-
 .../test/dunit/SerializableRunnable.java        |   2 +-
 3 files changed, 156 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0af59683/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 8ba4952..dcb2721 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -55,104 +55,166 @@ import static com.gemstone.gemfire.internal.DataSerializableFixedID.*;
  * that Geode formerly used for this purpose.
  */
 public class GMSJoinLeave implements JoinLeave, MessageHandler {
-  
+
   public static final String BYPASS_DISCOVERY_PROPERTY = "gemfire.bypass-discovery";
 
-  /** amount of time to wait for responses to FindCoordinatorRequests */
+  /**
+   * amount of time to wait for responses to FindCoordinatorRequests
+   */
   private static final int DISCOVERY_TIMEOUT = Integer.getInteger("gemfire.discovery-timeout", 3000);
 
-  /** amount of time to sleep before trying to join after a failed attempt */
+  /**
+   * amount of time to sleep before trying to join after a failed attempt
+   */
   private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
 
-  /** time to wait for a broadcast message to be transmitted by jgroups */
+  /**
+   * time to wait for a broadcast message to be transmitted by jgroups
+   */
   private static final long BROADCAST_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.broadcast-message-sleep-time", 1000);
 
-  /** if the locators don't know who the coordinator is we send find-coord requests to this many nodes */
+  /**
+   * if the locators don't know who the coordinator is we send find-coord requests to this many nodes
+   */
   private static final int MAX_DISCOVERY_NODES = Integer.getInteger("gemfire.max-discovery-nodes", 30);
 
-  /** interval for broadcasting the current view to members in case they didn't get it the first time */
+  /**
+   * interval for broadcasting the current view to members in case they didn't get it the first time
+   */
   private static final long VIEW_BROADCAST_INTERVAL = Long.getLong("gemfire.view-broadcast-interval", 60000);
 
-  /** membership logger */
+  /**
+   * membership logger
+   */
   private static final Logger logger = Services.getLogger();
 
-  /** the view ID where I entered into membership */
+  /**
+   * the view ID where I entered into membership
+   */
   private int birthViewId;
 
-  /** my address */
+  /**
+   * my address
+   */
   private InternalDistributedMember localAddress;
 
   private Services services;
 
-  /** have I connected to the distributed system? */
+  /**
+   * have I connected to the distributed system?
+   */
   private volatile boolean isJoined;
 
-  /** guarded by viewInstallationLock */
+  /**
+   * guarded by viewInstallationLock
+   */
   private boolean isCoordinator;
 
-  /** a synch object that guards view installation */
+  /**
+   * a synch object that guards view installation
+   */
   private final Object viewInstallationLock = new Object();
-  
-  /** the currently installed view.  Guarded by viewInstallationLock */
+
+  /**
+   * the currently installed view.  Guarded by viewInstallationLock
+   */
   private volatile NetView currentView;
 
-  /** the previous view **/
+  /**
+   * the previous view
+   **/
   private volatile NetView previousView;
 
-  /** members who we have been declared dead in the current view */
+  /**
+   * members who we have been declared dead in the current view
+   */
   private final Set<InternalDistributedMember> removedMembers = new HashSet<>();
 
-  /** members who we've received a leave message from **/
+  /**
+   * members who we've received a leave message from
+   **/
   private final Set<InternalDistributedMember> leftMembers = new HashSet<>();
 
-  /** a new view being installed */
+  /**
+   * a new view being installed
+   */
   private NetView preparedView;
 
-  /** the last view that conflicted with view preparation */
+  /**
+   * the last view that conflicted with view preparation
+   */
   private NetView lastConflictingView;
 
   private List<InetSocketAddress> locators;
 
-  /** a list of join/leave/crashes */
+  /**
+   * a list of join/leave/crashes
+   */
   private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
 
-  /** the established request collection jitter.  This can be overridden for testing with delayViewCreationForTest */
+  /**
+   * the established request collection jitter.  This can be overridden for testing with delayViewCreationForTest
+   */
   long requestCollectionInterval = MEMBER_REQUEST_COLLECTION_INTERVAL;
 
-  /** collects the response to a join request */
+  /**
+   * collects the response to a join request
+   */
   private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
 
-  /** collects responses to new views */
+  /**
+   * collects responses to new views
+   */
   private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
 
-  /** collects responses to view preparation messages */
+  /**
+   * collects responses to view preparation messages
+   */
   private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
 
-  /** whether quorum checks can cause a forced-disconnect */
+  /**
+   * whether quorum checks can cause a forced-disconnect
+   */
   private boolean quorumRequired = false;
 
-  /** timeout in receiving view acknowledgement */
+  /**
+   * timeout in receiving view acknowledgement
+   */
   private int viewAckTimeout;
 
-  /** background thread that creates new membership views */
+  /**
+   * background thread that creates new membership views
+   */
   private ViewCreator viewCreator;
 
-  /** am I shutting down? */
+  /**
+   * am I shutting down?
+   */
   private volatile boolean isStopping;
 
-  /** state of collected artifacts during discovery */
+  /**
+   * state of collected artifacts during discovery
+   */
   final SearchState searchState = new SearchState();
 
-  /** a collection used to detect unit testing */
+  /**
+   * a collection used to detect unit testing
+   */
   Set<String> unitTesting = new HashSet<>();
-  
-  /** a test hook to make this member unresponsive */
+
+  /**
+   * a test hook to make this member unresponsive
+   */
   private volatile boolean playingDead;
-  
-  /** the view where quorum was most recently lost */
+
+  /**
+   * the view where quorum was most recently lost
+   */
   NetView quorumLostView;
 
-  /** a flag to mark a coordinator's viewCreator for shutdown */
+  /**
+   * a flag to mark a coordinator's viewCreator for shutdown
+   */
   private boolean markViewCreatorForShutdown = false;
 
   static class SearchState {
@@ -182,13 +244,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   /**
    * attempt to join the distributed system
    * loop
-   *   send a join request to a locator & get a response
-   *
+   * send a join request to a locator & get a response
+   * <p>
    * If the response indicates there's no coordinator it
    * will contain a set of members that have recently contacted
    * it.  The "oldest" member is selected as the coordinator
    * based on ID sort order.
-   * 
+   *
    * @return true if successful, false if not
    */
   public boolean join() {
@@ -372,12 +434,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         || !currentView.contains(mbr)) {
       return true;
     }
-    synchronized(removedMembers) {
+    synchronized (removedMembers) {
       if (removedMembers.contains(mbr)) {
         return true;
       }
     }
-    synchronized(leftMembers) {
+    synchronized (leftMembers) {
       if (leftMembers.contains(mbr)) {
         return true;
       }
@@ -419,13 +481,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       return;
     }
 
-//      Remove JoinResponseMessage to fix GEODE-870
-//        if (!this.localAddress.getNetMember().preferredForCoordinator() &&
-//            incomingRequest.getMemberID().getNetMember().preferredForCoordinator()){
-//          JoinResponseMessage joinResponseMessage = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
-//          services.getMessenger().send(joinResponseMessage);
-//          return;
-//        }
+    //      Remove JoinResponseMessage to fix GEODE-870
+//    if (!this.localAddress.getNetMember().preferredForCoordinator() &&
+//        incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) {
+//      JoinResponseMessage joinResponseMessage = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
+//      services.getMessenger().send(joinResponseMessage);
+//      return;
+//    }
     recordViewRequest(incomingRequest);
   }
 
@@ -449,8 +511,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     InternalDistributedMember mbr = incomingRequest.getMemberID();
 
     if (logger.isDebugEnabled()) {
-      logger.debug("JoinLeave.processLeaveRequest invoked.  isCoordinator="+isCoordinator+ "; isStopping="+isStopping
-          +"; cancelInProgress="+ services.getCancelCriterion().isCancelInProgress());
+      logger.debug("JoinLeave.processLeaveRequest invoked.  isCoordinator=" + isCoordinator + "; isStopping=" + isStopping
+          + "; cancelInProgress=" + services.getCancelCriterion().isCancelInProgress());
     }
 
     if (!v.contains(mbr) && mbr.getVmViewId() < v.getViewId()) {
@@ -472,12 +534,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         check.removeAll(removedMembers);
         check.addCrashedMembers(removedMembers);
       }
-      synchronized(leftMembers) {
+      synchronized (leftMembers) {
         leftMembers.add(mbr);
         check.removeAll(leftMembers);
       }
       if (check.getCoordinator().equals(localAddress)) {
-        synchronized(viewInstallationLock) {
+        synchronized (viewInstallationLock) {
           becomeCoordinator(incomingRequest.getMemberID());
         }
       }
@@ -516,8 +578,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
     if (!fromMe) {
       logger.info("Membership received a request to remove " + mbr
-        + " from " + incomingRequest.getSender()
-        + " reason="+incomingRequest.getReason());
+          + " from " + incomingRequest.getSender()
+          + " reason=" + incomingRequest.getReason());
     }
 
     if (mbr.equals(this.localAddress)) {
@@ -754,7 +816,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     if (preparing) {
       logger.debug("waiting for view responses");
 
-      Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
+      Set<InternalDistributedMember> failedToRespond = viewReplyProcessor.waitForResponses();
 
       logger.info("finished waiting for responses to view preparation");
 
@@ -775,7 +837,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     return true;
   }
 
-  private void processViewMessage(final InstallViewMessage installViewMessage) {
+  private void processViewMessage(final InstallViewMessage m) {
 
     NetView view = m.getView();
 
@@ -789,7 +851,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     if (!this.isJoined) {
       // if we're still waiting for a join response and we're in this view we
       // should install the view so join() can finish its work
-      for (InternalDistributedMember mbr: view.getMembers()) {
+      for (InternalDistributedMember mbr : view.getMembers()) {
         if (localAddress.compareTo(mbr) == 0) {
           viewContainsMyUnjoinedAddress = true;
           break;
@@ -940,7 +1002,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
     InternalDistributedMember coord = null;
     boolean coordIsNoob = true;
-    for (; it.hasNext();) {
+    for (; it.hasNext(); ) {
       InternalDistributedMember mbr = it.next();
       if (!state.alreadyTried.contains(mbr)) {
         boolean mbrIsNoob = (mbr.getVmViewId() < 0);
@@ -1603,13 +1665,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       }
     }
 
-    /** call with synchronized(this) */
+    /**
+     * call with synchronized(this)
+     */
     private void stopWaitingFor(InternalDistributedMember mbr) {
       notRepliedYet.remove(mbr);
       checkIfDone();
     }
 
-    /** call with synchronized(this) */
+    /**
+     * call with synchronized(this)
+     */
     private void checkIfDone() {
       if (notRepliedYet.isEmpty() || (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) {
         logger.debug("All anticipated view responses received - notifying waiting thread");
@@ -1828,7 +1894,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       sendInitialView();
       long okayToCreateView = System.currentTimeMillis() + requestCollectionInterval;
       try {
-        for (;;) {
+        for (; ; ) {
           synchronized (viewRequests) {
             if (shutdown) {
               return;
@@ -2168,8 +2234,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
      *
      * @param mbrs
      */
-    private void removeHealthyMembers(final Collection<InternalDistributedMember> mbrs) throws InterruptedException {
-      List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
+    private void removeHealthyMembers(final Set<InternalDistributedMember> suspects) throws InterruptedException {
+      List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(suspects.size());
 
       Set<InternalDistributedMember> newRemovals = new HashSet<>();
       Set<InternalDistributedMember> newLeaves = new HashSet<>();
@@ -2197,6 +2263,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
             }
             return mbr;
           }
+
           @Override
           public String toString() {
             return mbr.toString();
@@ -2228,7 +2295,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         // now wait for the tasks to do their work
         long waitTime = giveUpTime - System.currentTimeMillis();
         synchronized (viewRequests) {
-          while ( waitTime > 0 ) {
+          while (waitTime > 0) {
             logger.debug("removeHealthyMembers: mbrs" + suspects.size());
 
             filterMembers(suspects, newRemovals, REMOVE_MEMBER_REQUEST);
@@ -2237,7 +2304,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
             suspects.removeAll(newLeaves);
 
-            if(suspects.isEmpty() || newRemovals.containsAll(suspects)) {
+            if (suspects.isEmpty() || newRemovals.containsAll(suspects)) {
               break;
             }
 
@@ -2252,19 +2319,20 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
     /**
      * This gets pending requests and returns the IDs of any that are in the given collection
-     * @param mbrs collection of IDs to search for
+     *
+     * @param mbrs            collection of IDs to search for
      * @param matchingMembers collection to store matching IDs in
-     * @param requestType leave/remove/join
+     * @param requestType     leave/remove/join
      */
     protected void filterMembers(Collection<InternalDistributedMember> mbrs, Set<InternalDistributedMember> matchingMembers, short requestType) {
       Set<InternalDistributedMember> requests = getPendingRequestIDs(requestType);
 
-      if(!requests.isEmpty()) {
+      if (!requests.isEmpty()) {
         logger.debug("filterMembers: processing " + requests.size() + " requests for type " + requestType);
         Iterator<InternalDistributedMember> itr = requests.iterator();
-        while(itr.hasNext()) {
+        while (itr.hasNext()) {
           InternalDistributedMember memberID = itr.next();
-          if(mbrs.contains(memberID)) {
+          if (mbrs.contains(memberID)) {
             testFlagForRemovalRequest = true;
             matchingMembers.add(memberID);
           }
@@ -2272,8 +2340,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       }
     }
 
-    private <T> List<Future<T>> submitAll ( ExecutorService executor, Collection<? extends Callable<T> > tasks ) {
-      List<Future<T>> result = new ArrayList<Future<T>>( tasks.size() );
+    private <T> List<Future<T>> submitAll(ExecutorService executor, Collection<? extends Callable<T>> tasks) {
+      List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
 
       for (Callable<T> task : tasks) {
         result.add(executor.submit(task));
@@ -2288,7 +2356,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   }
 
   boolean checkIfAvailable(InternalDistributedMember fmbr) {
- // return the member id if it fails health checks
+    // return the member id if it fails health checks
     logger.info("checking state of member " + fmbr);
     if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
       logger.info("member " + fmbr + " passed availability check");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0af59683/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
index 8b80def..3745928 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/CacheSerializableRunnable.java
@@ -48,13 +48,25 @@ public abstract class CacheSerializableRunnable
   }
 
   /**
+   * Creates a new <code>CacheSerializableRunnable</code> with the
+   * given name
+   */
+  public CacheSerializableRunnable(String name,Object[] args) {
+    super(name);
+    this.args = args;
+  }
+
+  /**
    * Invokes the {@link #run2} method and will wrap any {@link
    * CacheException} thrown by <code>run2</code> in a {@link
    * CacheSerializableRunnableException}. 
    */
   public final void run() {
     try {
-      run2();
+      if(args == null){
+      run2();}else{
+        run3();
+      }
 
     } catch (CacheException ex) {
       String s = "While invoking \"" + this + "\"";
@@ -96,6 +108,8 @@ public abstract class CacheSerializableRunnable
    */
   public abstract void run2() throws CacheException;
 
+  public void run3() throws CacheException{}
+
   /////////////////////////  Inner Classes  /////////////////////////
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0af59683/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
index 4caf815..2e24cfd 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/test/dunit/SerializableRunnable.java
@@ -51,7 +51,7 @@ public abstract class SerializableRunnable implements SerializableRunnableIF {
   private static final long serialVersionUID = 7584289978241650456L;
 
   private String name;
-  private Object[] args;
+  protected Object[] args;
 
   public SerializableRunnable() {
     this.name = null;