You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/08/15 17:18:46 UTC
[geode] branch develop updated: GEODE-3780 suspected member is
never watched again after passing final check
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new cd3f372 GEODE-3780 suspected member is never watched again after passing final check
cd3f372 is described below
commit cd3f372906501f0f936ba96d6907d972d5c2d478
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Aug 15 10:17:38 2018 -0700
GEODE-3780 suspected member is never watched again after passing final check
Consolidated "unsuspect" processing into a memberUnsuspected() method.
Modified "final check" method to not unsuspect a member that fails the check.
---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 37 ++++-
.../internal/membership/gms/ServiceConfig.java | 27 +---
.../membership/gms/fd/GMSHealthMonitor.java | 166 ++++++++++-----------
.../membership/gms/membership/GMSJoinLeave.java | 4 +-
4 files changed, 121 insertions(+), 113 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 56bd7da..8c42739 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -107,6 +107,7 @@ public class GMSHealthMonitorJUnitTest {
final long memberTimeout = 1000l;
private int[] portRange = new int[] {0, 65535};
private boolean useGMSHealthMonitorTestClass = false;
+ private boolean simulateHeartbeatInGMSHealthMonitorTestClass = true;
private final int myAddressIndex = 3;
@Before
@@ -540,6 +541,34 @@ public class GMSHealthMonitorJUnitTest {
@Test
+ public void testNeighborChangesAfterFailedFinalCheck() {
+ useGMSHealthMonitorTestClass = true;
+ simulateHeartbeatInGMSHealthMonitorTestClass = false;
+
+ try {
+ NetView v = installAView();
+
+ setFailureDetectionPorts(v);
+
+ InternalDistributedMember memberToCheck = gmsHealthMonitor.getNextNeighbor();
+
+ boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+
+ assertFalse("checkIfAvailable should have return false", retVal);
+ // we should now be watching the same member
+ int failedIndex = v.getMembers().indexOf(memberToCheck);
+ assertEquals("neighbor was " + gmsHealthMonitor.getNextNeighbor() + " but expected "
+ + mockMembers.get(failedIndex + 1), mockMembers.get(failedIndex + 1),
+ gmsHealthMonitor.getNextNeighbor());
+
+ } finally {
+ useGMSHealthMonitorTestClass = false;
+ simulateHeartbeatInGMSHealthMonitorTestClass = true;
+ }
+ }
+
+
+ @Test
public void testExonerationMessageIsSentAfterSuccessfulFinalCheck() {
useGMSHealthMonitorTestClass = true;
@@ -817,9 +846,11 @@ public class GMSHealthMonitorJUnitTest {
@Override
boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
if (useGMSHealthMonitorTestClass) {
- HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
- fakeHeartbeat.setSender(suspectMember);
- gmsHealthMonitor.processMessage(fakeHeartbeat);
+ if (simulateHeartbeatInGMSHealthMonitorTestClass) {
+ HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
+ fakeHeartbeat.setSender(suspectMember);
+ gmsHealthMonitor.processMessage(fakeHeartbeat);
+ }
return false;
}
return super.doTCPCheckMember(suspectMember, port);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
index 395e94d..09e2ed8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/ServiceConfig.java
@@ -27,11 +27,12 @@ public class ServiceConfig {
public static final long MEMBER_REQUEST_COLLECTION_INTERVAL =
Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "member-request-collection-interval", 300);
+ /** in a small cluster we might want to involve all members in operations */
+ public static final int SMALL_CLUSTER_SIZE = 9;
+
/** various settings from Geode configuration */
private final long joinTimeout;
private final int[] membershipPortRange;
- private final int udpRecvBufferSize;
- private final int udpSendBufferSize;
private final long memberTimeout;
private Integer lossThreshold;
private final Integer memberWeight;
@@ -79,12 +80,8 @@ public class ServiceConfig {
return networkPartitionDetectionEnabled;
}
- public void setNetworkPartitionDetectionEnabled(boolean enabled) {
- this.networkPartitionDetectionEnabled = enabled;
- }
-
public boolean areLocatorsPreferredAsCoordinators() {
- boolean locatorsAreCoordinators = false;
+ boolean locatorsAreCoordinators;
if (networkPartitionDetectionEnabled) {
locatorsAreCoordinators = true;
@@ -139,24 +136,8 @@ public class ServiceConfig {
membershipPortRange = theConfig.getMembershipPortRange();
- udpRecvBufferSize = DistributionConfig.DEFAULT_UDP_RECV_BUFFER_SIZE_REDUCED;
- udpSendBufferSize = theConfig.getUdpSendBufferSize();
-
memberTimeout = theConfig.getMemberTimeout();
- // The default view-ack timeout in 7.0 is 12347 ms but is adjusted based on the member-timeout.
- // We don't want a longer timeout than 12437 because new members will likely time out trying to
- // connect because their join timeouts are set to expect a shorter period
- int ackCollectionTimeout = theConfig.getMemberTimeout() * 2 * 12437 / 10000;
- if (ackCollectionTimeout < 1500) {
- ackCollectionTimeout = 1500;
- } else if (ackCollectionTimeout > 12437) {
- ackCollectionTimeout = 12437;
- }
- ackCollectionTimeout = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "VIEW_ACK_TIMEOUT", ackCollectionTimeout)
- .intValue();
-
lossThreshold =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "network-partition-threshold", 51);
if (lossThreshold < 51)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 13af6d4..f31a0c3 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -66,6 +66,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.GMSMember;
+import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
@@ -148,7 +149,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
/**
* Members currently being suspected and the view they were suspected in
*/
- private final ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView =
+ private final ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberIds =
new ConcurrentHashMap<>();
/**
@@ -165,7 +166,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
/**
* Members suspected in a particular view
*/
- private final Map<NetView, Set<SuspectRequest>> viewVsSuspectedMembers = new HashMap<>();
+ private final Map<NetView, Set<SuspectRequest>> suspectRequestsInView = new HashMap<>();
private ScheduledExecutorService scheduler;
@@ -387,8 +388,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (cTS != null && cTS.getTime() < timeStamp) {
cTS.setTime(timeStamp);
}
- if (suspectedMemberInView.remove(sender) != null) {
- logger.info("No longer suspecting {}", sender);
+ if (suspectedMemberIds.containsKey(sender)) {
+ memberUnsuspected(sender);
setNextNeighbor(currentView, null);
}
}
@@ -419,14 +420,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
if (!pinged) {
- suspectedMemberInView.put(mbr, currentView);
+ suspectedMemberIds.put(mbr, currentView);
String reason = "Member isn't responding to heartbeat requests";
- GMSHealthMonitor.this.initiateSuspicion(mbr, reason);
+ initiateSuspicion(mbr, reason);
} else {
logger.trace("Setting next neighbor as member {} has responded.", mbr);
- suspectedMemberInView.remove(mbr);
+ memberUnsuspected(mbr);
// back to previous one
- setNextNeighbor(GMSHealthMonitor.this.currentView, null);
+ setNextNeighbor(currentView, null);
}
});
@@ -701,14 +702,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (GMSHealthMonitor.this.playingDead) {
continue;
}
- serverSocketExecutor.execute(new ClientSocketHandler(socket)); // start(); [bruce] I'm
- // seeing a lot of
- // failures due to this
- // thread not being
- // created fast enough,
- // sometimes as long as
- // 30 seconds
-
+ serverSocketExecutor.execute(new ClientSocketHandler(socket));
} catch (RejectedExecutionException e) {
// this can happen during shutdown
@@ -823,8 +817,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
public synchronized void installView(NetView newView) {
- synchronized (viewVsSuspectedMembers) {
- viewVsSuspectedMembers.clear();
+ synchronized (suspectRequestsInView) {
+ suspectRequestsInView.clear();
}
for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it
.hasNext();) {
@@ -832,7 +826,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
it.remove();
}
}
- for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator(); it
+ for (Iterator<InternalDistributedMember> it = suspectedMemberIds.keySet().iterator(); it
.hasNext();) {
if (!newView.contains(it.next())) {
it.remove();
@@ -873,23 +867,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
List<InternalDistributedMember> allMembers = newView.getMembers();
- //
- // Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
- // checkAllSuspected.removeAll(suspectedMemberInView.keySet());
- // checkAllSuspected.remove(localAddress);
- // if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
- // logger.info("All other members are suspect at this point");
- // nextNeighbor = null;
- // return;
- // }
- if (allMembers.size() > 1 && suspectedMemberInView.size() >= allMembers.size() - 1) {
+ if (allMembers.size() > 1 && suspectedMemberIds.size() >= allMembers.size() - 1) {
boolean nonSuspectFound = false;
for (InternalDistributedMember member : allMembers) {
if (member.equals(localAddress)) {
continue;
}
- if (!suspectedMemberInView.containsKey(member)) {
+ if (!suspectedMemberIds.containsKey(member)) {
nonSuspectFound = true;
break;
}
@@ -905,13 +890,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (index != -1) {
int nextNeighborIndex = (index + 1) % allMembers.size();
InternalDistributedMember newNeighbor = allMembers.get(nextNeighborIndex);
- if (suspectedMemberInView.containsKey(newNeighbor)) {
+ if (suspectedMemberIds.containsKey(newNeighbor)) {
setNextNeighbor(newView, newNeighbor);
return;
}
InternalDistributedMember oldNeighbor = nextNeighbor;
if (oldNeighbor != newNeighbor) {
- logger.debug("Failure detection is now watching {}", newNeighbor);
+ logger.info("Failure detection is now watching {}", newNeighbor);
nextNeighbor = newNeighbor;
}
}
@@ -1006,7 +991,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* Test method - check to see if a member is under suspicion
*/
public boolean isSuspectMember(InternalDistributedMember m) {
- return this.suspectedMemberInView.containsKey(m);
+ return this.suspectedMemberIds.containsKey(m);
}
@Override
@@ -1017,7 +1002,42 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void memberSuspected(InternalDistributedMember initiator,
InternalDistributedMember suspect, String reason) {
- suspectedMemberInView.putIfAbsent(suspect, currentView);
+ synchronized (suspectRequestsInView) {
+ suspectedMemberIds.putIfAbsent(suspect, currentView);
+ Collection<SuspectRequest> requests = suspectRequestsInView.get(currentView);
+ boolean found = false;
+ if (requests == null) {
+ requests = new HashSet<>();
+ requests.add(new SuspectRequest(suspect, reason));
+ }
+ for (SuspectRequest request : requests) {
+ if (suspect.equals(request.getSuspectMember())) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ requests.add(new SuspectRequest(suspect, reason));
+ }
+ }
+ }
+
+ private void memberUnsuspected(InternalDistributedMember mbr) {
+ synchronized (suspectRequestsInView) {
+ if (suspectedMemberIds.remove(mbr) != null) {
+ logger.info("No longer suspecting {}", mbr);
+ }
+ Collection<SuspectRequest> suspectRequests = suspectRequestsInView.get(currentView);
+ if (suspectRequests != null) {
+ Collection<SuspectRequest> removals = new ArrayList<>(suspectRequests.size());
+ for (SuspectRequest suspectRequest : suspectRequests) {
+ if (mbr.equals(suspectRequest.getSuspectMember())) {
+ removals.add(suspectRequest);
+ }
+ }
+ suspectRequests.removeAll(removals);
+ }
+ }
}
@Override
@@ -1140,7 +1160,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return;
}
- List<SuspectRequest> sMembers = incomingRequest.getMembers();
+ List<SuspectRequest> suspectRequests = incomingRequest.getMembers();
InternalDistributedMember sender = incomingRequest.getSender();
int viewId = sender.getVmViewId();
@@ -1175,18 +1195,18 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
req.getReason());
}
- checkIfAvailable(sender, sMembers, cv);
+ checkIfAvailable(sender, suspectRequests, cv);
} // coordinator ends
else {
NetView check = new NetView(cv, cv.getViewId() + 1);
- ArrayList<SuspectRequest> smbr = new ArrayList<>();
- synchronized (viewVsSuspectedMembers) {
- recordSuspectRequests(sMembers, cv);
- Set<SuspectRequest> viewVsMembers = viewVsSuspectedMembers.get(cv);
- for (final SuspectRequest sr : viewVsMembers) {
+ ArrayList<SuspectRequest> membersToCheck = new ArrayList<>();
+ synchronized (suspectRequestsInView) {
+ recordSuspectRequests(suspectRequests, cv);
+ Set<SuspectRequest> suspectsInView = suspectRequestsInView.get(cv);
+ for (final SuspectRequest sr : suspectsInView) {
check.remove(sr.getSuspectMember());
- smbr.add(sr);
+ membersToCheck.add(sr);
}
}
@@ -1197,9 +1217,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
req.getReason());
}
- checkIfAvailable(sender, smbr, cv);
- } else {
- recordSuspectRequests(sMembers, cv);
+ checkIfAvailable(sender, membersToCheck, cv);
}
}
@@ -1209,18 +1227,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* This method make sure that records suspectRequest. We need to make sure this on preferred
* coordinators, as elder coordinator might be in suspected list next.
*/
- private void recordSuspectRequests(List<SuspectRequest> sMembers, NetView cv) {
+ private void recordSuspectRequests(List<SuspectRequest> suspectRequests, NetView cv) {
// record suspect requests
- Set<SuspectRequest> viewVsMembers;
- synchronized (viewVsSuspectedMembers) {
- viewVsMembers = viewVsSuspectedMembers.get(cv);
- if (viewVsMembers == null) {
- viewVsMembers = new HashSet<>();
- viewVsSuspectedMembers.put(cv, viewVsMembers);
- }
- for (SuspectRequest sr : sMembers) {
- viewVsMembers.add(sr);
+ Set<SuspectRequest> suspectedMembers;
+ synchronized (suspectRequestsInView) {
+ suspectedMembers = suspectRequestsInView.get(cv);
+ if (suspectedMembers == null) {
+ suspectedMembers = new HashSet<>();
+ suspectRequestsInView.put(cv, suspectedMembers);
}
+ suspectedMembers.addAll(suspectRequests);
}
}
@@ -1243,15 +1259,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
continue;// self
}
- // suspectMemberInView is now set by the heartbeat monitoring code
- // to allow us to move on from watching members we've already
- // suspected. Since that code is updating this collection we
- // cannot use it here as an indication that a member is currently
- // undergoing a final check.
- // NetView view;
- // view = suspectedMemberInView.putIfAbsent(mbr, cv);
-
- // if (view == null || !view.equals(cv)) {
final String reason = sr.getReason();
logger.debug("Scheduling final check for member {}; reason={}", mbr, reason);
// its a coordinator
@@ -1262,11 +1269,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// shutting down
} catch (Exception e) {
logger.info("Unexpected exception while verifying member", e);
- } finally {
- GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
}
});
- // }// scheduling for final check and removing it..
}
}
@@ -1280,6 +1284,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
boolean failed = false;
+ logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
membersInFinalCheck.add(mbr);
setNextNeighbor(currentView, mbr);
@@ -1290,7 +1295,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// with the startTime, but we don't want to do that because it looks
// like a heartbeat has been received
- logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
boolean pinged;
int port = cv.getFailureDetectionPort(mbr);
if (port <= 0) {
@@ -1339,10 +1343,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.info("Final check passed for suspect member " + mbr);
}
} finally {
- // whether it's alive or not, at this point we allow it to
- // be watched again
- suspectedMemberInView.remove(mbr);
- setNextNeighbor(currentView, null);
+ if (!failed) {
+ memberUnsuspected(mbr);
+ setNextNeighbor(currentView, null);
+ }
membersInFinalCheck.remove(mbr);
}
return !failed;
@@ -1357,29 +1361,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
private void sendSuspectRequest(final List<SuspectRequest> requests) {
- // the background suspect-collector thread is currently disabled
- // synchronized (suspectRequests) {
- // if (suspectRequests.size() > 0) {
- // for (SuspectRequest sr: suspectRequests) {
- // if (!requests.contains(sr)) {
- // requests.add(sr);
- // }
- // }
- // suspectRequests.clear();
- // }
- // }
logger.debug("Sending suspect request for members {}", requests);
List<InternalDistributedMember> recipients;
- if (currentView.size() > 4) {
+ if (currentView.size() > ServiceConfig.SMALL_CLUSTER_SIZE) {
HashSet<InternalDistributedMember> filter = new HashSet<>();
- for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e
+ for (Enumeration<InternalDistributedMember> e = suspectedMemberIds.keys(); e
.hasMoreElements();) {
filter.add(e.nextElement());
}
filter.addAll(
requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
recipients =
- currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(), 5);
+ currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(),
+ ServiceConfig.SMALL_CLUSTER_SIZE + 1);
} else {
recipients = currentView.getMembers();
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index d364f2c..b6213aa 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -65,6 +65,7 @@ import org.apache.geode.distributed.internal.membership.NetMember;
import org.apache.geode.distributed.internal.membership.NetView;
import org.apache.geode.distributed.internal.membership.gms.GMSMember;
import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
+import org.apache.geode.distributed.internal.membership.gms.ServiceConfig;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.interfaces.JoinLeave;
import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
@@ -1670,7 +1671,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
processRemoveRequest(msg);
if (!this.isCoordinator) {
msg.resetRecipients();
- msg.setRecipients(v.getPreferredCoordinators(Collections.emptySet(), localAddress, 10));
+ msg.setRecipients(v.getPreferredCoordinators(Collections.emptySet(), localAddress,
+ ServiceConfig.SMALL_CLUSTER_SIZE + 1));
services.getMessenger().send(msg);
}
} else {