You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/08/30 15:54:11 UTC
[geode] branch develop updated: GEODE-3780 member is not considered
suspect after failing final check
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 9ee8cbf GEODE-3780 member is not considered suspect after failing final check
9ee8cbf is described below
commit 9ee8cbfe3000df4db87a7388d0123aa40e42b7ec
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Thu Aug 30 08:52:19 2018 -0700
GEODE-3780 member is not considered suspect after failing final check
Ensure that a member is in the suspects collection after it fails a
final check. This allows processSuspectMessage to know if it should
perform the duty of a membership-coordinator and initiate final
checks based on Suspect messages.
I've also done a little bit of refactoring in processSuspectMessage and
have removed commented-out code.
This closes #2380
---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 16 +++++
.../membership/gms/fd/GMSHealthMonitor.java | 79 +++++++++-------------
2 files changed, 48 insertions(+), 47 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 8c13813..6f4b86f 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -49,6 +49,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -145,6 +146,7 @@ public class GMSHealthMonitorJUnitTest {
when(services.getCancelCriterion()).thenReturn(stopper);
when(services.getManager()).thenReturn(manager);
when(services.getStatistics()).thenReturn(new DistributionStats(system, statsId));
+ when(services.getTimer()).thenReturn(new Timer("Geode Membership Timer", true));
when(stopper.isCancelInProgress()).thenReturn(false);
if (mockMembers == null) {
@@ -624,6 +626,20 @@ public class GMSHealthMonitorJUnitTest {
@Test
+ public void testFinalCheckFailureLeavesMemberAsSuspect() {
+ NetView v = installAView();
+
+ setFailureDetectionPorts(v);
+
+ InternalDistributedMember memberToCheck = gmsHealthMonitor.getNextNeighbor();
+ boolean available = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+ assertFalse(available);
+ verify(joinLeave).remove(isA(InternalDistributedMember.class), isA(String.class));
+ assertTrue(gmsHealthMonitor.isSuspectMember(memberToCheck));
+ }
+
+
+ @Test
public void testShutdown() {
installAView();
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index f31a0c3..789f8d5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -263,11 +263,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
long lastTS = currentTime - nextNeighborTS.getTime();
if (lastTS + interval >= memberTimeoutInMillis) {
- logger.trace("Checking member {} ", neighbour);
+ logger.debug("Checking member {} ", neighbour);
// now do check request for this member;
checkMember(neighbour);
}
- setNextNeighbor(currentView, null);
}
}
}
@@ -412,7 +411,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// we need to check this member
checkExecutor.execute(() -> {
- boolean pinged = false;
+ boolean pinged;
try {
pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
} catch (CancelException e) {
@@ -420,9 +419,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
if (!pinged) {
- suspectedMemberIds.put(mbr, currentView);
String reason = "Member isn't responding to heartbeat requests";
+ memberSuspected(localAddress, mbr, reason);
initiateSuspicion(mbr, reason);
+ setNextNeighbor(currentView, null);
} else {
logger.trace("Setting next neighbor as member {} has responded.", mbr);
memberUnsuspected(mbr);
@@ -455,7 +455,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return true;
}
long startTime = System.currentTimeMillis();
- logger.trace("Checking member {}", member);
+ logger.debug("Requesting heartbeat from {}", member);
final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
Response pingResp = null;
if (waitForResponse) {
@@ -483,7 +483,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (isStopping) {
return true;
}
- logger.trace("no heartbeat response received from {} and no recent activity", member);
+ logger.debug("no heartbeat response received from {} and no recent activity", member);
return false;
} else {
logger.trace("received heartbeat from {}", member);
@@ -601,15 +601,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void suspect(InternalDistributedMember mbr, String reason) {
initiateSuspicion(mbr, reason);
- // Background suspect-collecting thread is currently disabled - it takes too long
- // synchronized (suspectRequests) {
- // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
- // if (!suspectRequests.contains(sr)) {
- // logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
- // suspectRequests.add(sr);
- // suspectRequests.notify();
- // }
- // }
}
@Override
@@ -641,18 +632,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
long delay = memberTimeout / LOGICAL_INTERVAL;
monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
- // suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect
- // Message Collector", Services.getThreadGroup(), suspectRequests,
- // new Callback<SuspectRequest>() {
- // @Override
- // public void process(List<SuspectRequest> requests) {
- // GMSHealthMonitor.this.sendSuspectRequest(requests);
- //
- // }
- // }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
- // suspectRequestCollectorThread.setDaemon(true);
- // suspectRequestCollectorThread.start()
-
serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
final AtomicInteger threadIdx = new AtomicInteger();
@@ -832,12 +811,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
it.remove();
}
}
- // for (InternalDistributedMember mbr: newView.getMembers()) {
- // if (!memberVsLastMsgTS.containsKey(mbr)) {
- // CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
- // memberVsLastMsgTS.put(mbr, customTS);
- // }
- // }
currentView = newView;
setNextNeighbor(newView, null);
}
@@ -907,7 +880,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
- /*** test method */
+ /** test method */
public InternalDistributedMember getNextNeighbor() {
return nextNeighbor;
}
@@ -984,7 +957,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
*/
public boolean isShutdown() {
return scheduler.isShutdown() && checkExecutor.isShutdown()
- && serverSocketExecutor.isShutdown() /* && !suspectRequestCollectorThread.isAlive() */;
+ && serverSocketExecutor.isShutdown();
}
/**
@@ -1003,7 +976,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
public void memberSuspected(InternalDistributedMember initiator,
InternalDistributedMember suspect, String reason) {
synchronized (suspectRequestsInView) {
- suspectedMemberIds.putIfAbsent(suspect, currentView);
+ suspectedMemberIds.put(suspect, currentView);
Collection<SuspectRequest> requests = suspectRequestsInView.get(currentView);
boolean found = false;
if (requests == null) {
@@ -1190,39 +1163,48 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
+ logger.debug("Processing suspect requests {}", suspectRequests);
if (cv.getCoordinator().equals(localAddress)) {
- for (SuspectRequest req : incomingRequest.getMembers()) {
- logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
- req.getReason());
- }
+ // This process is the membership coordinator and should perform a final check
+ logSuspectRequests(incomingRequest, sender);
checkIfAvailable(sender, suspectRequests, cv);
- } // coordinator ends
- else {
+ } else {
+ // Another process has raised suspicion - check to see if
+ // this process should become the membership coordinator if
+ // all current suspects are gone
NetView check = new NetView(cv, cv.getViewId() + 1);
ArrayList<SuspectRequest> membersToCheck = new ArrayList<>();
synchronized (suspectRequestsInView) {
recordSuspectRequests(suspectRequests, cv);
Set<SuspectRequest> suspectsInView = suspectRequestsInView.get(cv);
+ logger.debug("Current suspects for view #{} are {}", cv.getViewId(), suspectsInView);
for (final SuspectRequest sr : suspectsInView) {
check.remove(sr.getSuspectMember());
membersToCheck.add(sr);
}
}
+ logger.trace("Trial view with suspects removed is {}\nmy address is {}", check, localAddress);
InternalDistributedMember coordinator = check.getCoordinator();
if (coordinator != null && coordinator.equals(localAddress)) {
// new coordinator
- for (SuspectRequest req : incomingRequest.getMembers()) {
- logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
- req.getReason());
- }
+ logSuspectRequests(incomingRequest, sender);
checkIfAvailable(sender, membersToCheck, cv);
}
}
}
+ private void logSuspectRequests(SuspectMembersMessage incomingRequest,
+ InternalDistributedMember sender) {
+ for (SuspectRequest req : incomingRequest.getMembers()) {
+ String who = sender.equals(localAddress) ? "myself" : sender.toString();
+ logger.info("received suspect message from {} for {}: {}", who, req.getSuspectMember(),
+ req.getReason());
+ }
+ }
+
/***
* This method make sure that records suspectRequest. We need to make sure this on preferred
* coordinators, as elder coordinator might be in suspected list next.
@@ -1321,10 +1303,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (!pinged && !isStopping) {
TimeStamp ts = memberTimeStamps.get(mbr);
if (ts == null || ts.getTime() < startTime) {
- logger.info("Final check failed - requesting removal of suspect member " + mbr);
+ logger.info("Final check failed for member {}", mbr);
if (initiateRemoval) {
+ logger.info("Requesting removal of suspect member {}", mbr);
services.getJoinLeave().remove(mbr, reason);
}
+ // make sure he's still suspected
+ memberSuspected(localAddress, mbr, reason);
failed = true;
} else {
logger.info(