You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/10/12 00:54:49 UTC
incubator-geode git commit: GEODE-1874: Changed setNextNeighbor to
not create HashMap for every p2p invocation
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1874 [created] edaa462eb
GEODE-1874: Changed setNextNeighbor to not create HashMap for every p2p invocation
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/edaa462e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/edaa462e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/edaa462e
Branch: refs/heads/feature/GEODE-1874
Commit: edaa462eb41c9380d9fd4eca889ccc269fcdbdf3
Parents: 280d2d8
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Wed Oct 12 11:54:33 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Wed Oct 12 11:54:33 2016 +1100
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 394 ++++++++++---------
1 file changed, 210 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edaa462e/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index aafb498..97a413c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -16,9 +16,7 @@
*/
package org.apache.geode.distributed.internal.membership.gms.fd;
-import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
-import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
-import static org.apache.geode.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+import static org.apache.geode.internal.DataSerializableFixedID.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -29,7 +27,19 @@ import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -40,7 +50,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.*;
+import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
import org.jgroups.util.UUID;
@@ -69,7 +79,7 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel;
/**
* Failure Detection
- *
+ * <p>
* This class make sure that each member is alive and communicating to this member.
* To make sure that we create the ring of members based on current view. On this
* ring, each member make sure that next-member in ring is communicating with it.
@@ -77,17 +87,16 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel;
* member has not communicated in last period(member-timeout) then we check whether
* this member is still alive or not. Based on that we informed probable coordinators
* to remove that member from view.
- *
+ * <p>
* It has {@link #suspect(InternalDistributedMember, String)} api, which can be used
* to initiate suspect processing for any member. First is checks whether the member is
* responding or not. Then it informs probable coordinators to remove that member from
* view.
- *
+ * <p>
* It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see
* if that member is alive. Then based on removal flag it initiates the suspect processing
* for that member.
- *
- * */
+ */
@SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems" })
public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@@ -99,9 +108,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
volatile private boolean isStopping = false;
private final AtomicInteger requestId = new AtomicInteger();
- /** membership logger */
+ /**
+ * membership logger
+ */
private static final Logger logger = Services.getLogger();
-
+
/**
* The number of recipients of periodic heartbeats. The recipients will
* be selected from the members that are likely to be monitoring this member.
@@ -115,24 +126,28 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
*/
public static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval", 2);
- /** stall time to wait for members leaving concurrently */
+ /**
+ * stall time to wait for members leaving concurrently
+ */
public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200);
private volatile long currentTimeStamp;
-
- /** this member's ID */
+
+ /**
+ * this member's ID
+ */
private InternalDistributedMember localAddress;
/**
* Timestamp at which we last had contact from a member
*/
final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = new ConcurrentHashMap<>();
-
+
/**
* Members currently being suspected and the view they were suspected in
*/
final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView = new ConcurrentHashMap<>();
-
+
/**
* Members undergoing final checks
*/
@@ -142,7 +157,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* Replies to messages
*/
final private Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>();
-
+
/**
* Members suspected in a particular view
*/
@@ -156,29 +171,36 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* to stop check scheduler
*/
private ScheduledFuture<?> monitorFuture;
-
- /** test hook */
+
+ /**
+ * test hook
+ */
private volatile boolean playingDead = false;
- /** test hook */
+ /**
+ * test hook
+ */
private volatile boolean beingSick = false;
-
+
// For TCP check
private ExecutorService serverSocketExecutor;
static final int OK = 0x7B;
- static final int ERROR = 0x00;
+ static final int ERROR = 0x00;
private volatile int socketPort;
private volatile ServerSocket serverSocket;
-
- /** Statistics about health monitor */
+
+ /**
+ * Statistics about health monitor
+ */
private DMStats stats;
/**
* this class is to avoid garbage
*/
private static class TimeStamp {
+
private volatile long timeStamp;
-
+
TimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
@@ -196,11 +218,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* This class sets start interval timestamp to record the activity of all members.
* That is used by {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to
* record the activity of member.
- *
+ *
* It initiates the suspect processing for next neighbour if it doesn't see any activity from that
* member in last interval(member-timeout)
*/
private class Monitor implements Runnable {
+
final long memberTimeoutInMillis;
public Monitor(long memberTimeout) {
@@ -213,25 +236,25 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (GMSHealthMonitor.this.isStopping) {
return;
}
-
+
InternalDistributedMember neighbour = nextNeighbor;
-
+
long currentTime = System.currentTimeMillis();
//this is the start of interval to record member activity
GMSHealthMonitor.this.currentTimeStamp = currentTime;
if (neighbour != null) {
TimeStamp nextNeighborTS;
- synchronized(GMSHealthMonitor.this) {
+ synchronized (GMSHealthMonitor.this) {
nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
}
-
+
if (nextNeighborTS == null) {
TimeStamp customTS = new TimeStamp(currentTime);
memberTimeStamps.put(neighbour, customTS);
return;
}
-
+
long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
long lastTS = currentTime - nextNeighborTS.getTime();
if (lastTS + interval >= memberTimeoutInMillis) {
@@ -249,6 +272,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* notify waiting thread.
*/
private class Response {
+
private DistributionMessage responseMsg;
public DistributionMessage getResponseMsg() {
@@ -276,7 +300,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
OutputStream out = socket.getOutputStream();
@SuppressWarnings("UnusedAssignment")
short version = in.readShort();
- int vmViewId = in.readInt();
+ int vmViewId = in.readInt();
long uuidLSBs = in.readLong();
long uuidMSBs = in.readLong();
GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived();
@@ -288,9 +312,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
int myVmViewId = gmbr.getVmViewId();
if (playingDead) {
logger.debug("HealthMonitor: simulating sick member in health check");
- } else if (uuidLSBs == myUUID.getLeastSignificantBits()
- && uuidMSBs == myUUID.getMostSignificantBits()
- && vmViewId == myVmViewId) {
+ } else if (uuidLSBs == myUUID.getLeastSignificantBits() && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId == myVmViewId) {
logger.debug("HealthMonitor: sending OK reply");
out.write(OK);
out.flush();
@@ -300,12 +322,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.debug("HealthMonitor: server replied OK.");
} else {
if (logger.isDebugEnabled()) {
- logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}",
- Long.toHexString(myUUID.getMostSignificantBits()),
- Long.toHexString(myUUID.getLeastSignificantBits()),
- Long.toHexString(uuidMSBs),
- Long.toHexString(uuidLSBs),
- myVmViewId, vmViewId);
+ logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}", Long.toHexString(myUUID.getMostSignificantBits()), Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
}
out.write(ERROR);
out.flush();
@@ -350,13 +367,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
public void contactedBy(InternalDistributedMember sender) {
contactedBy(sender, currentTimeStamp);
}
-
-
+
+
/**
* Record member activity at a specified time
*/
private void contactedBy(InternalDistributedMember sender, long timeStamp) {
TimeStamp cTS = new TimeStamp(timeStamp);
+ //TODO Udo: why putIfAbsent. Surely only put is required
cTS = memberTimeStamps.putIfAbsent(sender, cTS);
if (cTS != null && cTS.getTime() < timeStamp) {
cTS.setTime(timeStamp);
@@ -367,7 +385,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
setNextNeighbor(currentView, null);
}
-
+
private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember mbr) {
final int reqId = requestId.getAndIncrement();
final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId);
@@ -390,7 +408,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
} catch (CancelException e) {
return;
}
-
+
if (!pinged) {
suspectedMemberInView.put(mbr, currentView);
String reason = "Member isn't responding to heartbeat requests";
@@ -442,7 +460,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
// member is not part of current view.
logger.trace("Member {} is not part of current view.", member);
- } else if (waitForResponse){
+ } else if (waitForResponse) {
synchronized (pingResp) {
if (pingResp.getResponseMsg() == null) {
pingResp.wait(memberTimeout);
@@ -470,7 +488,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
} catch (InterruptedException e) {
logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
} finally {
- if(waitForResponse) {
+ if (waitForResponse) {
requestIdVsResponse.remove(hrm.getRequestId());
}
}
@@ -480,8 +498,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
/**
* During final check, establish TCP connection between current member and suspect member.
* And exchange PING/PONG message to see if the suspect member is still alive.
- *
+ *
* @param suspectMember member that does not respond to HeartbeatRequestMessage
+ *
* @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
*/
boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
@@ -489,21 +508,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
InternalDistributedSystem internalDistributedSystem = InternalDistributedSystem.getConnectedInstance();
try {
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
- clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(), port,
- (int)memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
+ clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(), port, (int) memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
clientSocket.setTcpNoDelay(true);
return doTCPCheckMember(suspectMember, clientSocket);
- }
- catch (IOException e) {
+ } catch (IOException e) {
// this is expected if it is a connection-timeout or other failure
// to connect
- }
- catch (IllegalStateException e) {
+ } catch (IllegalStateException e) {
if (!isStopping) {
logger.trace("Unexpected exception", e);
}
- }
- finally {
+ } finally {
try {
if (clientSocket != null) {
clientSocket.setSoLinger(true, 0); // abort the connection
@@ -554,10 +569,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return false;
} catch (IOException e) {
logger.trace("Unexpected exception", e);
- }
+ }
return false;
}
-
+
void writeMemberToStream(GMSMember gmbr, DataOutputStream out) throws IOException {
out.writeShort(Version.CURRENT_ORDINAL);
out.writeInt(gmbr.getVmViewId());
@@ -565,24 +580,24 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
out.writeLong(gmbr.getUuidMSBs());
out.flush();
}
-
+
@Override
public void suspect(InternalDistributedMember mbr, String reason) {
initiateSuspicion(mbr, reason);
// Background suspect-collecting thread is currently disabled - it takes too long
-// synchronized (suspectRequests) {
-// SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
-// if (!suspectRequests.contains(sr)) {
-// logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
-// suspectRequests.add(sr);
-// suspectRequests.notify();
-// }
-// }
+ // synchronized (suspectRequests) {
+ // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
+ // if (!suspectRequests.contains(sr)) {
+ // logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
+ // suspectRequests.add(sr);
+ // suspectRequests.notify();
+ // }
+ // }
}
@Override
public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval) {
- return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember)mbr, reason);
+ return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember) mbr, reason);
}
public void start() {
@@ -607,17 +622,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
long delay = memberTimeout / LOGICAL_INTERVAL;
monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
-// suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests,
-// new Callback<SuspectRequest>() {
-// @Override
-// public void process(List<SuspectRequest> requests) {
-// GMSHealthMonitor.this.sendSuspectRequest(requests);
-//
-// }
-// }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
-// suspectRequestCollectorThread.setDaemon(true);
-// suspectRequestCollectorThread.start()
-
+ // suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests,
+ // new Callback<SuspectRequest>() {
+ // @Override
+ // public void process(List<SuspectRequest> requests) {
+ // GMSHealthMonitor.this.sendSuspectRequest(requests);
+ //
+ // }
+ // }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
+ // suspectRequestCollectorThread.setDaemon(true);
+ // suspectRequestCollectorThread.start()
+
serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
final AtomicInteger threadIdx = new AtomicInteger();
@@ -635,15 +650,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
ServerSocket serverSocket;
try {
- serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress, 50/*backlog*/,
- true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false);
+ serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false);
socketPort = serverSocket.getLocalPort();
} catch (IOException | SystemConnectException e) {
throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e);
}
return serverSocket;
}
-
+
/**
* start the thread that listens for tcp/ip connections and responds
* to connection attempts
@@ -656,15 +670,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), socketPort);
Socket socket = null;
try {
- while (!services.getCancelCriterion().isCancelInProgress()
- && !GMSHealthMonitor.this.isStopping) {
+ while (!services.getCancelCriterion().isCancelInProgress() && !GMSHealthMonitor.this.isStopping) {
try {
socket = ssocket.accept();
if (GMSHealthMonitor.this.playingDead) {
continue;
}
serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start(); [bruce] I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as long as 30 seconds
-
+
} catch (RejectedExecutionException e) {
// this can happen during shutdown
@@ -696,7 +709,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
});
}
-
+
/**
* start the thread that periodically sends a message to processes
* that might be watching this process
@@ -707,10 +720,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
Thread.currentThread().setName("Geode Heartbeat Sender");
sendPeriodicHeartbeats();
}
+
private void sendPeriodicHeartbeats() {
while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
try {
- Thread.sleep(memberTimeout/LOGICAL_INTERVAL);
+ Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
} catch (InterruptedException e) {
return;
}
@@ -727,7 +741,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
}
-
+
private void sendHeartbeats(List<InternalDistributedMember> mbrs, int startIndex) {
InternalDistributedMember coordinator = currentView.getCoordinator();
if (coordinator != null && !coordinator.equals(localAddress)) {
@@ -746,10 +760,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
int index = startIndex;
int numSent = 0;
- for (;;) {
+ for (; ; ) {
index--;
if (index < 0) {
- index = mbrs.size()-1;
+ index = mbrs.size() - 1;
}
InternalDistributedMember mbr = mbrs.get(index);
if (mbr.equals(localAddress)) {
@@ -782,32 +796,32 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
synchronized (viewVsSuspectedMembers) {
viewVsSuspectedMembers.clear();
}
- for (Iterator<InternalDistributedMember> it=memberTimeStamps.keySet().iterator(); it.hasNext(); ) {
+ for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it.hasNext(); ) {
if (!newView.contains(it.next())) {
it.remove();
}
}
- for (Iterator<InternalDistributedMember> it=suspectedMemberInView.keySet().iterator(); it.hasNext(); ) {
+ for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator(); it.hasNext(); ) {
if (!newView.contains(it.next())) {
it.remove();
}
}
-// for (InternalDistributedMember mbr: newView.getMembers()) {
-// if (!memberVsLastMsgTS.containsKey(mbr)) {
-// CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
-// memberVsLastMsgTS.put(mbr, customTS);
-// }
-// }
+ // for (InternalDistributedMember mbr: newView.getMembers()) {
+ // if (!memberVsLastMsgTS.containsKey(mbr)) {
+ // CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
+ // memberVsLastMsgTS.put(mbr, customTS);
+ // }
+ // }
currentView = newView;
setNextNeighbor(newView, null);
}
/***
* This method sets next neighbour which it needs to watch in current view.
- *
+ *
* if nextTo == null
* then it watches member next to it.
- *
+ *
* It becomes null when we suspect current neighbour, during that time it watches
* member next to suspect member.
*/
@@ -820,16 +834,34 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
List<InternalDistributedMember> allMembers = newView.getMembers();
-
- Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
- checkAllSuspected.removeAll(suspectedMemberInView.keySet());
- checkAllSuspected.remove(localAddress);
- if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
- logger.info("All other members are suspect at this point");
- nextNeighbor = null;
- return;
+
+ // Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
+ // checkAllSuspected.removeAll(suspectedMemberInView.keySet());
+ // checkAllSuspected.remove(localAddress);
+ // if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
+ // logger.info("All other members are suspect at this point");
+ // nextNeighbor = null;
+ // return;
+ // }
+
+ if (allMembers.size() > 1 && suspectedMemberInView.size() >= allMembers.size() - 1) {
+ boolean nonSuspectFound = false;
+ for (InternalDistributedMember member : allMembers) {
+ if (member.equals(localAddress)) {
+ continue;
+ }
+ if (!suspectedMemberInView.containsKey(member)) {
+ nonSuspectFound = true;
+ break;
+ }
+ }
+ if (!nonSuspectFound) {
+ logger.info("All other members are suspect at this point");
+ nextNeighbor = null;
+ return;
+ }
}
-
+
int index = allMembers.indexOf(nextTo);
if (index != -1) {
int nextNeighborIndex = (index + 1) % allMembers.size();
@@ -844,7 +876,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
nextNeighbor = newNeighbor;
}
}
-
+
if (nextNeighbor != null && nextNeighbor.equals(localAddress)) {
nextNeighbor = null;
}
@@ -866,10 +898,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
services.getMessenger().addHandler(HeartbeatMessage.class, this);
services.getMessenger().addHandler(SuspectMembersMessage.class, this);
}
-
+
@Override
public void started() {
- setLocalAddress( services.getMessenger().getMemberID());
+ setLocalAddress(services.getMessenger().getMemberID());
serverSocket = createServerSocket(localAddress.getInetAddress(), services.getConfig().getMembershipPortRange());
startTcpServer(serverSocket);
startHeartbeatThread();
@@ -907,11 +939,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
serverSocket.close();
serverSocket = null;
logger.info("GMSHealthMonitor server socket is closed in stopServices().");
- }
- catch (IOException e) {
+ } catch (IOException e) {
logger.trace("Unexpected exception", e);
}
- }
+ }
serverSocketExecutor.shutdownNow();
try {
serverSocketExecutor.awaitTermination(2000, TimeUnit.MILLISECONDS);
@@ -920,10 +951,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
}
-
-// if (suspectRequestCollectorThread != null) {
-// suspectRequestCollectorThread.shutdown();
-// }
+
+ // if (suspectRequestCollectorThread != null) {
+ // suspectRequestCollectorThread.shutdown();
+ // }
}
/***
@@ -969,7 +1000,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
public void emergencyClose() {
stopServices();
}
-
+
void setLocalAddress(InternalDistributedMember idm) {
this.localAddress = idm;
}
@@ -983,44 +1014,44 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.trace("processing {}", m);
switch (m.getDSFID()) {
- case HEARTBEAT_REQUEST:
- if (beingSick || playingDead) {
- logger.debug("sick member is ignoring check request");
- } else {
- processHeartbeatRequest((HeartbeatRequestMessage) m);
- }
- break;
- case HEARTBEAT_RESPONSE:
- if (beingSick || playingDead) {
- logger.debug("sick member is ignoring check response");
- } else {
- processHeartbeat((HeartbeatMessage) m);
- }
- break;
- case SUSPECT_MEMBERS_MESSAGE:
- if (beingSick || playingDead) {
- logger.debug("sick member is ignoring suspect message");
- } else {
- processSuspectMembersRequest((SuspectMembersMessage) m);
- }
- break;
- default:
- throw new IllegalArgumentException("unknown message type: " + m);
+ case HEARTBEAT_REQUEST:
+ if (beingSick || playingDead) {
+ logger.debug("sick member is ignoring check request");
+ } else {
+ processHeartbeatRequest((HeartbeatRequestMessage) m);
+ }
+ break;
+ case HEARTBEAT_RESPONSE:
+ if (beingSick || playingDead) {
+ logger.debug("sick member is ignoring check response");
+ } else {
+ processHeartbeat((HeartbeatMessage) m);
+ }
+ break;
+ case SUSPECT_MEMBERS_MESSAGE:
+ if (beingSick || playingDead) {
+ logger.debug("sick member is ignoring suspect message");
+ } else {
+ processSuspectMembersRequest((SuspectMembersMessage) m);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("unknown message type: " + m);
}
}
private void processHeartbeatRequest(HeartbeatRequestMessage m) {
-
+
this.stats.incHeartbeatRequestsReceived();
-
+
if (this.isStopping || this.playingDead) {
return;
}
-
+
// only respond if the intended recipient is this member
InternalDistributedMember me = localAddress;
- if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
+ if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId());
hm.setRecipient(m.getSender());
Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(hm);
@@ -1059,9 +1090,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* for that member
*/
private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) {
-
+
this.stats.incSuspectsReceived();
-
+
NetView cv = currentView;
if (cv == null) {
@@ -1096,11 +1127,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
-
+
if (cv.getCoordinator().equals(localAddress)) {
- for (SuspectRequest req: incomingRequest.getMembers()) {
- logger.info("received suspect message from {} for {}: {}",
- sender, req.getSuspectMember(), req.getReason());
+ for (SuspectRequest req : incomingRequest.getMembers()) {
+ logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason());
}
checkIfAvailable(sender, sMembers, cv);
}// coordinator ends
@@ -1120,9 +1150,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
InternalDistributedMember coordinator = check.getCoordinator();
if (coordinator != null && coordinator.equals(localAddress)) {
// new coordinator
- for (SuspectRequest req: incomingRequest.getMembers()) {
- logger.info("received suspect message from {} for {}: {}",
- sender, req.getSuspectMember(), req.getReason());
+ for (SuspectRequest req : incomingRequest.getMembers()) {
+ logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason());
}
checkIfAvailable(sender, smbr, cv);
} else {
@@ -1145,7 +1174,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
viewVsMembers = new HashSet<>();
viewVsSuspectedMembers.put(cv, viewVsMembers);
}
- for (SuspectRequest sr: sMembers) {
+ for (SuspectRequest sr : sMembers) {
viewVsMembers.add(sr);
}
}
@@ -1157,8 +1186,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* we attempt to connect to its socket and ask if it's the expected member.
* Otherwise we send a heartbeat request and wait for a reply.
*/
- private void checkIfAvailable(final InternalDistributedMember initiator,
- List<SuspectRequest> sMembers, final NetView cv) {
+ private void checkIfAvailable(final InternalDistributedMember initiator, List<SuspectRequest> sMembers, final NetView cv) {
for (final SuspectRequest sr : sMembers) {
final InternalDistributedMember mbr = sr.getSuspectMember();
@@ -1198,10 +1226,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
- private boolean inlineCheckIfAvailable(
- final InternalDistributedMember initiator, final NetView cv,
- boolean initiateRemoval,
- final InternalDistributedMember mbr, final String reason) {
+ private boolean inlineCheckIfAvailable(final InternalDistributedMember initiator, final NetView cv, boolean initiateRemoval, final InternalDistributedMember mbr, final String reason) {
if (services.getJoinLeave().isMemberLeaving(mbr)) {
return false;
@@ -1216,7 +1241,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// for some reason we used to update the timestamp for the member
// with the startTime, but we don't want to do that because it looks
// like a heartbeat has been received
-
+
logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
boolean pinged;
int port = cv.getFailureDetectionPort(mbr);
@@ -1239,7 +1264,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
doCheckMember(mbr, false);
pinged = doTCPCheckMember(mbr, port);
}
-
+
if (!pinged && !isStopping) {
TimeStamp ts = memberTimeStamps.get(mbr);
if (ts == null || ts.getTime() < startTime) {
@@ -1263,11 +1288,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
return !failed;
}
-
+
@Override
public void memberShutdown(DistributedMember mbr, String reason) {
}
-
+
@Override
public int getFailureDetectionPort() {
return this.socketPort;
@@ -1275,21 +1300,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
private void sendSuspectRequest(final List<SuspectRequest> requests) {
// the background suspect-collector thread is currently disabled
-// synchronized (suspectRequests) {
-// if (suspectRequests.size() > 0) {
-// for (SuspectRequest sr: suspectRequests) {
-// if (!requests.contains(sr)) {
-// requests.add(sr);
-// }
-// }
-// suspectRequests.clear();
-// }
-// }
+ // synchronized (suspectRequests) {
+ // if (suspectRequests.size() > 0) {
+ // for (SuspectRequest sr: suspectRequests) {
+ // if (!requests.contains(sr)) {
+ // requests.add(sr);
+ // }
+ // }
+ // suspectRequests.clear();
+ // }
+ // }
logger.debug("Sending suspect request for members {}", requests);
List<InternalDistributedMember> recipients;
if (currentView.size() > 4) {
HashSet<InternalDistributedMember> filter = new HashSet<>();
- for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e.hasMoreElements();) {
+ for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e.hasMoreElements(); ) {
filter.add(e.nextElement());
}
filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
@@ -1313,15 +1338,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher {
+
final Timer scheduler;
Socket socket;
final long timeout;
-
+
ConnectTimeoutTask(Timer scheduler, long timeout) {
this.scheduler = scheduler;
this.timeout = timeout;
}
-
+
@Override
public void beforeConnect(Socket socket) {
this.socket = socket;
@@ -1332,7 +1358,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
public void afterConnect(Socket socket) {
cancel();
}
-
+
@Override
public void run() {
try {
@@ -1343,9 +1369,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// ignored - nothing useful to do here
}
}
-
+
}
-
+
public DMStats getStats() {
return this.stats;
}