You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/08/15 17:18:46 UTC

[geode] branch develop updated: GEODE-3780 suspected member is never watched again after passing final check

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new cd3f372  GEODE-3780 suspected member is never watched again after passing final check
cd3f372 is described below

commit cd3f372906501f0f936ba96d6907d972d5c2d478
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Aug 15 10:17:38 2018 -0700

    GEODE-3780 suspected member is never watched again after passing final check
    
    Consolidated "unsuspect" processing into a memberUnsuspected() method.
    Modified "final check" method to not unsuspect a member that fails the check.
---
 .../gms/fd/GMSHealthMonitorJUnitTest.java          |  37 ++++-
 .../internal/membership/gms/ServiceConfig.java     |  27 +---
 .../membership/gms/fd/GMSHealthMonitor.java        | 166 ++++++++++-----------
 .../membership/gms/membership/GMSJoinLeave.java    |   4 +-
 4 files changed, 121 insertions(+), 113 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 56bd7da..8c42739 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -107,6 +107,7 @@ public class GMSHealthMonitorJUnitTest {
   final long memberTimeout = 1000l;
   private int[] portRange = new int[] {0, 65535};
   private boolean useGMSHealthMonitorTestClass = false;
+  private boolean simulateHeartbeatInGMSHealthMonitorTestClass = true;
   private final int myAddressIndex = 3;
 
   @Before
@@ -540,6 +541,34 @@ public class GMSHealthMonitorJUnitTest {
 
 
   @Test
+  public void testNeighborChangesAfterFailedFinalCheck() {
+    useGMSHealthMonitorTestClass = true;
+    simulateHeartbeatInGMSHealthMonitorTestClass = false;
+
+    try {
+      NetView v = installAView();
+
+      setFailureDetectionPorts(v);
+
+      InternalDistributedMember memberToCheck = gmsHealthMonitor.getNextNeighbor();
+
+      boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+
+      assertFalse("checkIfAvailable should have return false", retVal);
+      // we should now be watching the same member
+      int failedIndex = v.getMembers().indexOf(memberToCheck);
+      assertEquals("neighbor was " + gmsHealthMonitor.getNextNeighbor() + " but expected "
+          + mockMembers.get(failedIndex + 1), mockMembers.get(failedIndex + 1),
+          gmsHealthMonitor.getNextNeighbor());
+
+    } finally {
+      useGMSHealthMonitorTestClass = false;
+      simulateHeartbeatInGMSHealthMonitorTestClass = true;
+    }
+  }
+
+
+  @Test
   public void testExonerationMessageIsSentAfterSuccessfulFinalCheck() {
     useGMSHealthMonitorTestClass = true;
 
@@ -817,9 +846,11 @@ public class GMSHealthMonitorJUnitTest {
     @Override
     boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
       if (useGMSHealthMonitorTestClass) {
-        HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
-        fakeHeartbeat.setSender(suspectMember);
-        gmsHealthMonitor.processMessage(fakeHeartbeat);
+        if (simulateHeartbeatInGMSHealthMonitorTestClass) {
+          HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
+          fakeHeartbeat.setSender(suspectMember);
+          gmsHealthMonitor.processMessage(fakeHeartbeat);
+        }
         return false;
       }
       return super.doTCPCheckMember(suspectMember, port);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
index 395e94d..09e2ed8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
@@ -27,11 +27,12 @@ public class ServiceConfig {
   public static final long MEMBER_REQUEST_COLLECTION_INTERVAL =
       Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "member-request-collection-interval", 300);
 
+  /** in a small cluster we might want to involve all members in operations */
+  public static final int SMALL_CLUSTER_SIZE = 9;
+
   /** various settings from Geode configuration */
   private final long joinTimeout;
   private final int[] membershipPortRange;
-  private final int udpRecvBufferSize;
-  private final int udpSendBufferSize;
   private final long memberTimeout;
   private Integer lossThreshold;
   private final Integer memberWeight;
@@ -79,12 +80,8 @@ public class ServiceConfig {
     return networkPartitionDetectionEnabled;
   }
 
-  public void setNetworkPartitionDetectionEnabled(boolean enabled) {
-    this.networkPartitionDetectionEnabled = enabled;
-  }
-
   public boolean areLocatorsPreferredAsCoordinators() {
-    boolean locatorsAreCoordinators = false;
+    boolean locatorsAreCoordinators;
 
     if (networkPartitionDetectionEnabled) {
       locatorsAreCoordinators = true;
@@ -139,24 +136,8 @@ public class ServiceConfig {
 
     membershipPortRange = theConfig.getMembershipPortRange();
 
-    udpRecvBufferSize = DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE_REDUCED;
-    udpSendBufferSize = theConfig.getUdpSendBufferSize();
-
     memberTimeout = theConfig.getMemberTimeout();
 
-    // The default view-ack timeout in 7.0 is 12347 ms but is adjusted based on the member-timeout.
-    // We don't want a longer timeout than 12437 because new members will likely time out trying to
-    // connect because their join timeouts are set to expect a shorter period
-    int ackCollectionTimeout = theConfig.getMemberTimeout() * 2 * 12437 / 10000;
-    if (ackCollectionTimeout < 1500) {
-      ackCollectionTimeout = 1500;
-    } else if (ackCollectionTimeout > 12437) {
-      ackCollectionTimeout = 12437;
-    }
-    ackCollectionTimeout = Integer
-        .getInteger(DistributionConfig.GEMFIRE_PREFIX + "VIEW_ACK_TIMEOUT", ackCollectionTimeout)
-        .intValue();
-
     lossThreshold =
         Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "network-partition-threshold", 51);
     if (lossThreshold < 51)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 13af6d4..f31a0c3 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -66,6 +66,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.distributed.internal.membership.NetView;
 import org.apache.geode.distributed.internal.membership.gms.GMSMember;
+import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
@@ -148,7 +149,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   /**
    * Members currently being suspected and the view they were suspected in
    */
-  private final ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView =
+  private final ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberIds =
       new ConcurrentHashMap<>();
 
   /**
@@ -165,7 +166,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   /**
    * Members suspected in a particular view
    */
-  private final Map<NetView, Set<SuspectRequest>> viewVsSuspectedMembers = new HashMap<>();
+  private final Map<NetView, Set<SuspectRequest>> suspectRequestsInView = new HashMap<>();
 
   private ScheduledExecutorService scheduler;
 
@@ -387,8 +388,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     if (cTS != null && cTS.getTime() < timeStamp) {
       cTS.setTime(timeStamp);
     }
-    if (suspectedMemberInView.remove(sender) != null) {
-      logger.info("No longer suspecting {}", sender);
+    if (suspectedMemberIds.containsKey(sender)) {
+      memberUnsuspected(sender);
       setNextNeighbor(currentView, null);
     }
   }
@@ -419,14 +420,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       }
 
       if (!pinged) {
-        suspectedMemberInView.put(mbr, currentView);
+        suspectedMemberIds.put(mbr, currentView);
         String reason = "Member isn't responding to heartbeat requests";
-        GMSHealthMonitor.this.initiateSuspicion(mbr, reason);
+        initiateSuspicion(mbr, reason);
       } else {
         logger.trace("Setting next neighbor as member {} has responded.", mbr);
-        suspectedMemberInView.remove(mbr);
+        memberUnsuspected(mbr);
         // back to previous one
-        setNextNeighbor(GMSHealthMonitor.this.currentView, null);
+        setNextNeighbor(currentView, null);
       }
     });
 
@@ -701,14 +702,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
             if (GMSHealthMonitor.this.playingDead) {
               continue;
             }
-            serverSocketExecutor.execute(new ClientSocketHandler(socket)); // start(); [bruce] I'm
-            // seeing a lot of
-            // failures due to this
-            // thread not being
-            // created fast enough,
-            // sometimes as long as
-            // 30 seconds
-
+            serverSocketExecutor.execute(new ClientSocketHandler(socket));
           } catch (RejectedExecutionException e) {
             // this can happen during shutdown
 
@@ -823,8 +817,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   public synchronized void installView(NetView newView) {
-    synchronized (viewVsSuspectedMembers) {
-      viewVsSuspectedMembers.clear();
+    synchronized (suspectRequestsInView) {
+      suspectRequestsInView.clear();
     }
     for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it
         .hasNext();) {
@@ -832,7 +826,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         it.remove();
       }
     }
-    for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator(); it
+    for (Iterator<InternalDistributedMember> it = suspectedMemberIds.keySet().iterator(); it
         .hasNext();) {
       if (!newView.contains(it.next())) {
         it.remove();
@@ -873,23 +867,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     }
 
     List<InternalDistributedMember> allMembers = newView.getMembers();
-    //
-    // Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
-    // checkAllSuspected.removeAll(suspectedMemberInView.keySet());
-    // checkAllSuspected.remove(localAddress);
-    // if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
-    // logger.info("All other members are suspect at this point");
-    // nextNeighbor = null;
-    // return;
-    // }
 
-    if (allMembers.size() > 1 && suspectedMemberInView.size() >= allMembers.size() - 1) {
+    if (allMembers.size() > 1 && suspectedMemberIds.size() >= allMembers.size() - 1) {
       boolean nonSuspectFound = false;
       for (InternalDistributedMember member : allMembers) {
         if (member.equals(localAddress)) {
           continue;
         }
-        if (!suspectedMemberInView.containsKey(member)) {
+        if (!suspectedMemberIds.containsKey(member)) {
           nonSuspectFound = true;
           break;
         }
@@ -905,13 +890,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     if (index != -1) {
       int nextNeighborIndex = (index + 1) % allMembers.size();
       InternalDistributedMember newNeighbor = allMembers.get(nextNeighborIndex);
-      if (suspectedMemberInView.containsKey(newNeighbor)) {
+      if (suspectedMemberIds.containsKey(newNeighbor)) {
         setNextNeighbor(newView, newNeighbor);
         return;
       }
       InternalDistributedMember oldNeighbor = nextNeighbor;
       if (oldNeighbor != newNeighbor) {
-        logger.debug("Failure detection is now watching {}", newNeighbor);
+        logger.info("Failure detection is now watching {}", newNeighbor);
         nextNeighbor = newNeighbor;
       }
     }
@@ -1006,7 +991,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * Test method - check to see if a member is under suspicion
    */
   public boolean isSuspectMember(InternalDistributedMember m) {
-    return this.suspectedMemberInView.containsKey(m);
+    return this.suspectedMemberIds.containsKey(m);
   }
 
   @Override
@@ -1017,7 +1002,42 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   @Override
   public void memberSuspected(InternalDistributedMember initiator,
       InternalDistributedMember suspect, String reason) {
-    suspectedMemberInView.putIfAbsent(suspect, currentView);
+    synchronized (suspectRequestsInView) {
+      suspectedMemberIds.putIfAbsent(suspect, currentView);
+      Collection<SuspectRequest> requests = suspectRequestsInView.get(currentView);
+      boolean found = false;
+      if (requests == null) {
+        requests = new HashSet<>();
+        requests.add(new SuspectRequest(suspect, reason));
+      }
+      for (SuspectRequest request : requests) {
+        if (suspect.equals(request.getSuspectMember())) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        requests.add(new SuspectRequest(suspect, reason));
+      }
+    }
+  }
+
+  private void memberUnsuspected(InternalDistributedMember mbr) {
+    synchronized (suspectRequestsInView) {
+      if (suspectedMemberIds.remove(mbr) != null) {
+        logger.info("No longer suspecting {}", mbr);
+      }
+      Collection<SuspectRequest> suspectRequests = suspectRequestsInView.get(currentView);
+      if (suspectRequests != null) {
+        Collection<SuspectRequest> removals = new ArrayList<>(suspectRequests.size());
+        for (SuspectRequest suspectRequest : suspectRequests) {
+          if (mbr.equals(suspectRequest.getSuspectMember())) {
+            removals.add(suspectRequest);
+          }
+        }
+        suspectRequests.removeAll(removals);
+      }
+    }
   }
 
   @Override
@@ -1140,7 +1160,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       return;
     }
 
-    List<SuspectRequest> sMembers = incomingRequest.getMembers();
+    List<SuspectRequest> suspectRequests = incomingRequest.getMembers();
 
     InternalDistributedMember sender = incomingRequest.getSender();
     int viewId = sender.getVmViewId();
@@ -1175,18 +1195,18 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
             req.getReason());
       }
-      checkIfAvailable(sender, sMembers, cv);
+      checkIfAvailable(sender, suspectRequests, cv);
     } // coordinator ends
     else {
 
       NetView check = new NetView(cv, cv.getViewId() + 1);
-      ArrayList<SuspectRequest> smbr = new ArrayList<>();
-      synchronized (viewVsSuspectedMembers) {
-        recordSuspectRequests(sMembers, cv);
-        Set<SuspectRequest> viewVsMembers = viewVsSuspectedMembers.get(cv);
-        for (final SuspectRequest sr : viewVsMembers) {
+      ArrayList<SuspectRequest> membersToCheck = new ArrayList<>();
+      synchronized (suspectRequestsInView) {
+        recordSuspectRequests(suspectRequests, cv);
+        Set<SuspectRequest> suspectsInView = suspectRequestsInView.get(cv);
+        for (final SuspectRequest sr : suspectsInView) {
           check.remove(sr.getSuspectMember());
-          smbr.add(sr);
+          membersToCheck.add(sr);
         }
       }
 
@@ -1197,9 +1217,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
               req.getReason());
         }
-        checkIfAvailable(sender, smbr, cv);
-      } else {
-        recordSuspectRequests(sMembers, cv);
+        checkIfAvailable(sender, membersToCheck, cv);
       }
     }
 
@@ -1209,18 +1227,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * This method make sure that records suspectRequest. We need to make sure this on preferred
    * coordinators, as elder coordinator might be in suspected list next.
    */
-  private void recordSuspectRequests(List<SuspectRequest> sMembers, NetView cv) {
+  private void recordSuspectRequests(List<SuspectRequest> suspectRequests, NetView cv) {
     // record suspect requests
-    Set<SuspectRequest> viewVsMembers;
-    synchronized (viewVsSuspectedMembers) {
-      viewVsMembers = viewVsSuspectedMembers.get(cv);
-      if (viewVsMembers == null) {
-        viewVsMembers = new HashSet<>();
-        viewVsSuspectedMembers.put(cv, viewVsMembers);
-      }
-      for (SuspectRequest sr : sMembers) {
-        viewVsMembers.add(sr);
+    Set<SuspectRequest> suspectedMembers;
+    synchronized (suspectRequestsInView) {
+      suspectedMembers = suspectRequestsInView.get(cv);
+      if (suspectedMembers == null) {
+        suspectedMembers = new HashSet<>();
+        suspectRequestsInView.put(cv, suspectedMembers);
       }
+      suspectedMembers.addAll(suspectRequests);
     }
   }
 
@@ -1243,15 +1259,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         continue;// self
       }
 
-      // suspectMemberInView is now set by the heartbeat monitoring code
-      // to allow us to move on from watching members we've already
-      // suspected. Since that code is updating this collection we
-      // cannot use it here as an indication that a member is currently
-      // undergoing a final check.
-      // NetView view;
-      // view = suspectedMemberInView.putIfAbsent(mbr, cv);
-
-      // if (view == null || !view.equals(cv)) {
       final String reason = sr.getReason();
       logger.debug("Scheduling final check for member {}; reason={}", mbr, reason);
       // its a coordinator
@@ -1262,11 +1269,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           // shutting down
         } catch (Exception e) {
           logger.info("Unexpected exception while verifying member", e);
-        } finally {
-          GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
         }
       });
-      // }// scheduling for final check and removing it..
     }
   }
 
@@ -1280,6 +1284,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
     boolean failed = false;
 
+    logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
     membersInFinalCheck.add(mbr);
     setNextNeighbor(currentView, mbr);
 
@@ -1290,7 +1295,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       // with the startTime, but we don't want to do that because it looks
       // like a heartbeat has been received
 
-      logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
       boolean pinged;
       int port = cv.getFailureDetectionPort(mbr);
       if (port <= 0) {
@@ -1339,10 +1343,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         logger.info("Final check passed for suspect member " + mbr);
       }
     } finally {
-      // whether it's alive or not, at this point we allow it to
-      // be watched again
-      suspectedMemberInView.remove(mbr);
-      setNextNeighbor(currentView, null);
+      if (!failed) {
+        memberUnsuspected(mbr);
+        setNextNeighbor(currentView, null);
+      }
       membersInFinalCheck.remove(mbr);
     }
     return !failed;
@@ -1357,29 +1361,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   private void sendSuspectRequest(final List<SuspectRequest> requests) {
-    // the background suspect-collector thread is currently disabled
-    // synchronized (suspectRequests) {
-    // if (suspectRequests.size() > 0) {
-    // for (SuspectRequest sr: suspectRequests) {
-    // if (!requests.contains(sr)) {
-    // requests.add(sr);
-    // }
-    // }
-    // suspectRequests.clear();
-    // }
-    // }
     logger.debug("Sending suspect request for members {}", requests);
     List<InternalDistributedMember> recipients;
-    if (currentView.size() > 4) {
+    if (currentView.size() > ServiceConfig.SMALL_CLUSTER_SIZE) {
       HashSet<InternalDistributedMember> filter = new HashSet<>();
-      for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e
+      for (Enumeration<InternalDistributedMember> e = suspectedMemberIds.keys(); e
           .hasMoreElements();) {
         filter.add(e.nextElement());
       }
       filter.addAll(
           requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
       recipients =
-          currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(), 5);
+          currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(),
+              ServiceConfig.SMALL_CLUSTER_SIZE + 1);
     } else {
       recipients = currentView.getMembers();
     }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index d364f2c..b6213aa 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -65,6 +65,7 @@ import org.apache.geode.distributed.internal.membership.NetMember;
 import org.apache.geode.distributed.internal.membership.NetView;
 import org.apache.geode.distributed.internal.membership.gms.GMSMember;
 import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
+import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
 import org.apache.geode.distributed.internal.membership.gms.Services;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
 import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
@@ -1670,7 +1671,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       processRemoveRequest(msg);
       if (!this.isCoordinator) {
         msg.resetRecipients();
-        msg.setRecipients(v.getPreferredCoordinators(Collections.emptySet(), localAddress, 10));
+        msg.setRecipients(v.getPreferredCoordinators(Collections.emptySet(), localAddress,
+            ServiceConfig.SMALL_CLUSTER_SIZE + 1));
         services.getMessenger().send(msg);
       }
     } else {