You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/11/20 22:02:10 UTC
[16/50] [abbrv] incubator-geode git commit: [GEODE-77] TCP check for
final check in health monitor
[GEODE-77] TCP check for final check in health monitor
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/63802dab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/63802dab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/63802dab
Branch: refs/heads/develop
Commit: 63802dab6090eef2721620afe518e1ad0b65df5d
Parents: 5feab82
Author: Jianxia Chen <jc...@pivotal.io>
Authored: Fri Oct 23 13:14:59 2015 -0700
Committer: Jianxia Chen <jc...@pivotal.io>
Committed: Fri Oct 23 13:32:38 2015 -0700
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 240 ++++++-
.../gms/interfaces/HealthMonitor.java | 18 +
.../membership/gms/membership/GMSJoinLeave.java | 698 +++++++++----------
.../gms/messages/InstallViewMessage.java | 48 +-
.../gms/messages/JoinRequestMessage.java | 14 +-
.../gms/messages/JoinResponseMessage.java | 48 +-
.../membership/GMSHealthMonitorJUnitTest.java | 14 +-
.../sanctionedDataSerializables.txt | 12 +-
8 files changed, 725 insertions(+), 367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/63802dab/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 1ca206f..774ab37 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -4,6 +4,15 @@ import static com.gemstone.gemfire.internal.DataSerializableFixedID.CHECK_REQUES
import static com.gemstone.gemfire.internal.DataSerializableFixedID.CHECK_RESPONSE;
import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -24,11 +33,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
+import com.gemstone.gemfire.SystemConnectException;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
@@ -36,6 +47,8 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckRe
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckResponseMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
/**
@@ -118,6 +131,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
/** test hook */
boolean beingSick = false;
+
+ // For TCP check
+ private ExecutorService serverSocketExecutor;
+ private static final int OK = 0x01;
+ private static final int ERROR = 0x02;
+ private InetAddress ip;
+ private volatile int socketPort;
+ private volatile ServerSocket serverSocket;
+ private Map<InternalDistributedMember, InetSocketAddress> socketInfo = new ConcurrentHashMap<InternalDistributedMember, InetSocketAddress>();
public GMSHealthMonitor() {
@@ -329,6 +351,64 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return false;
}
+ /**
+ * During final check, establish TCP connection between current member and suspect member.
+ * And exchange PING/PONG message to see if the suspect member is still alive.
+ *
+ * @param suspectMember member that does not respond to CheckRequestMessage
+ * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
+ */
+ private boolean doTCPCheckMember(InternalDistributedMember suspectMember, InetSocketAddress addr) {
+ logger.trace("Checking member {} with TCP socket connection.", suspectMember);
+ Socket clientSocket = new Socket();
+ try {
+ // establish TCP connection
+ for (Map.Entry<InternalDistributedMember, InetSocketAddress> entry : socketInfo.entrySet()) {
+ logger.info("socketInfo member:" + entry.getKey() + " port:" + entry.getValue().getPort());
+ }
+ logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, addr.getAddress(), addr.getPort());
+ clientSocket.connect(addr, (int) services.getConfig().getMemberTimeout());
+ if (clientSocket.isConnected()) {
+ clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
+ InputStream in = clientSocket.getInputStream();
+ DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
+ logger.info("TCP check: suspect member uuid: " + ((GMSMember) suspectMember.getNetMember()).getUUID());
+ out.writeShort(Version.CURRENT_ORDINAL);
+ out.writeLong(((GMSMember) suspectMember.getNetMember()).getUuidLSBs());
+ out.writeLong(((GMSMember) suspectMember.getNetMember()).getUuidMSBs());
+ out.flush();
+ clientSocket.shutdownOutput();
+ logger.debug("Send suspect member uuid to member {} with TCP socket connection.", suspectMember);
+ int b = in.read();
+ logger.debug("Received {} from member {} with TCP socket connection.", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember);
+ if (b == OK) {
+ CustomTimeStamp ts = memberVsLastMsgTS.get(suspectMember);
+ if (ts != null) {
+ ts.setTimeStamp(System.currentTimeMillis());
+ }
+ return true;
+ } else {
+ //received ERROR
+ return false;
+ }
+ } else {// cannot establish TCP connection with suspect member
+ return false;
+ }
+ } catch (IOException e) {
+ logger.trace("Unexpected exception", e);
+ } finally {
+ try {
+ if (clientSocket != null) {
+ clientSocket.close();
+ }
+ } catch (IOException e) {
+ logger.trace("Unexpected exception", e);
+ }
+ }
+
+ return false;
+ }
+
/*
* (non-Javadoc)
*
@@ -357,7 +437,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
public void start() {
- {
+ {
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -398,6 +478,123 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
suspectRequestCollectorThread.setDaemon(true);
suspectRequestCollectorThread.start();
}
+
+ {
+ serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+ AtomicInteger threadIdx = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ int id = threadIdx.getAndIncrement();
+ Thread th = new Thread(Services.getThreadGroup(), r, "TCP Check ServerSocket Thread " + id);
+ th.setDaemon(true);
+ return th;
+ }
+ });
+
+ serverSocketExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ Socket socket = null;
+ try {
+ // start server socket for TCP check
+ if (serverSocket == null) {
+ localAddress = services.getMessenger().getMemberID();
+ ip = localAddress.getInetAddress();
+ int[] portRange = services.getConfig().getMembershipPortRange();
+ socketPort = AvailablePort.getAvailablePortInRange(portRange[0], portRange[1], AvailablePort.SOCKET);
+ if (socketPort == -1) {
+ throw new SystemConnectException("Unable to find a free port in the membership port range");
+ }
+ serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress(ip, socketPort));
+ logger.info("GMSHealthMonitor started server socket on {}:{}.", ip, socketPort);
+ socketInfo.put(localAddress, new InetSocketAddress(ip, socketPort));
+ while (!services.getCancelCriterion().isCancelInProgress()
+ && !GMSHealthMonitor.this.isStopping) {
+ try {
+ socket = serverSocket.accept();
+ if (GMSHealthMonitor.this.playingDead) {
+ continue;
+ }
+ socket.setSoTimeout((int) services.getConfig().getMemberTimeout());
+ new ClientSocketHandler(socket).start();
+ } catch (IOException e) {
+ logger.trace("Unexpected exception", e);
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException ioe) {
+ logger.trace("Unexpected exception", ioe);
+ }
+ }
+ }
+ logger.info("GMSHealthMonitor server socket has done its jobs.");
+ }
+ } catch (IOException e) {
+ logger.trace("Unexpected exception", e);
+ } finally {
+ // close the server socket
+ if (serverSocket != null && !serverSocket.isClosed()) {
+ try {
+ serverSocket.close();
+ serverSocket = null;
+ logger.info("GMSHealthMonitor server socket closed.");
+ } catch (IOException e) {
+ logger.debug("Unexpected exception", e);
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+
+ class ClientSocketHandler extends Thread {
+
+ private Socket socket;
+
+ public ClientSocketHandler(Socket socket) {
+ super(services.getThreadGroup(), "ClientSocketHandler");
+ this.socket = socket;
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ DataInputStream in = new DataInputStream(socket.getInputStream());
+ OutputStream out = socket.getOutputStream();
+ short version = in.readShort();
+ long uuidLSBs = in.readLong();
+ long uuidMSBs = in.readLong();
+ logger.debug("GMSHealthMonitor server socket received {} and {}.", uuidMSBs, uuidLSBs);
+ logger.debug("GMSHealthMonitor member uuid is {}", ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUUID());
+ if (uuidLSBs == ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUuidLSBs()
+ && uuidMSBs == ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUuidMSBs()) {
+ out.write(OK);
+ out.flush();
+ socket.shutdownOutput();
+ logger.debug("GMSHealthMonitor server socket replied OK.");
+ }
+ else {
+ out.write(ERROR);
+ out.flush();
+ socket.shutdownOutput();
+ logger.debug("GMSHealthMonitor server socket replied ERROR.");
+ }
+ } catch (IOException e) {
+ logger.trace("Unexpected exception", e);
+ } finally {
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ logger.info("Unexpected exception", e);
+ }
+ }
+ }
+ }
}
public synchronized void installView(NetView newView) {
@@ -521,6 +718,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
checkExecutor.shutdown();
}
+ if (serverSocketExecutor != null) {
+ if (serverSocket != null && !serverSocket.isClosed()) {
+ try {
+ serverSocket.close();
+ serverSocket = null;
+ logger.info("GMSHealthMonitor server socket is closed in stopServices().");
+ }
+ catch (IOException e) {
+ logger.trace("Unexpected exception", e);
+ }
+ }
+ serverSocketExecutor.shutdownNow();
+ logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
+ }
+
if (suspectRequestCollectorThread != null) {
suspectRequestCollectorThread.shutdown();
}
@@ -530,7 +742,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* test method
*/
public boolean isShutdown() {
- return scheduler.isShutdown() && checkExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive();
+ return scheduler.isShutdown() && checkExecutor.isShutdown() && serverSocketExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive();
}
@Override
@@ -752,7 +964,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
memberVsLastMsgTS.put(mbr, ts);
logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
- boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+ boolean pinged;
+ InetSocketAddress addr = socketInfo.get(mbr);
+ if (addr == null || addr.getPort() < 0) {
+ pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+ } else {
+ pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, addr);
+ }
logger.info("Final check {}", pinged? "succeeded" : "failed");
if (!pinged && !isStopping) {
@@ -919,4 +1137,20 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// TODO Auto-generated method stub
}
+
+ public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo() {
+ return this.socketInfo;
+ }
+
+ public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers) {
+ logger.debug("installSocketInfo members=" + members + " portsForMembers=" + portsForMembers);
+ for (int i = 0; i < members.size(); i++) {
+ if (portsForMembers.get(i).intValue() == -1) {
+ continue;
+ }
+ InetSocketAddress addr = new InetSocketAddress(members.get(i).getInetAddress(), portsForMembers.get(i).intValue());
+ socketInfo.put(members.get(i), addr);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/63802dab/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
index 9ace2be..628e416 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
@@ -1,7 +1,12 @@
package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetMember;
public interface HealthMonitor extends Service {
@@ -34,4 +39,17 @@ public interface HealthMonitor extends Service {
* ShutdownMessage has been received from the given member
*/
public void memberShutdown(DistributedMember mbr, String reason);
+
+ /**
+ * Returns a map that describes the members and their server sockets
+ */
+ public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo();
+
+ /**
+ * Update the information of the members and their server sockets
+ *
+ * @param members
+ * @param portsForMembers List of socket ports for each member
+ */
+ public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/63802dab/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 5a792eb..6d39a6a 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -19,8 +19,10 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -28,8 +30,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
@@ -72,26 +72,25 @@ import com.gemstone.gemfire.security.AuthenticationFailedException;
public class GMSJoinLeave implements JoinLeave, MessageHandler {
public static String BYPASS_DISCOVERY = "gemfire.bypass-discovery";
-
+
/** amount of time to wait for responses to FindCoordinatorRequests */
private static final int DISCOVERY_TIMEOUT = Integer.getInteger("gemfire.discovery-timeout", 3000);
/** amount of time to sleep before trying to join after a failed attempt */
private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
-
+
/** stall time to wait for concurrent join/leave/remove requests to be received */
public static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("gemfire.member-request-collection-interval", 500);
/** time to wait for a leave request to be transmitted by jgroups */
private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.leave-message-sleep-time", 1000);
-
+
/** if the locators don't know who the coordinator is we send find-coord requests to this many nodes */
private static final int MAX_DISCOVERY_NODES = Integer.getInteger("gemfire.max-discovery-nodes", 30);
-
+
/** membership logger */
private static final Logger logger = Services.getLogger();
-
/** the view ID where I entered into membership */
private int birthViewId;
@@ -99,65 +98,65 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private InternalDistributedMember localAddress;
private Services services;
-
+
/** have I connected to the distributed system? */
private volatile boolean isJoined;
/** guarded by viewInstallationLock */
private boolean isCoordinator;
-
+
/** a synch object that guards view installation */
private final Object viewInstallationLock = new Object();
/** the currently installed view. Guarded by viewInstallationLock */
private volatile NetView currentView;
-
+
/** the previous view **/
private volatile NetView previousView;
-
+
private final Set<InternalDistributedMember> removedMembers = new HashSet<>();
-
+
/** a new view being installed */
private NetView preparedView;
-
+
/** the last view that conflicted with view preparation */
private NetView lastConflictingView;
-
+
private List<InetSocketAddress> locators;
-
+
/** a list of join/leave/crashes */
private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
/** collects the response to a join request */
private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
-
+
/** collects responses to new views */
private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
-
+
/** collects responses to view preparation messages */
private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
/** whether quorum checks can cause a forced-disconnect */
private boolean quorumRequired = false;
-
+
/** timeout in receiving view acknowledgement */
private int viewAckTimeout;
/** background thread that creates new membership views */
private ViewCreator viewCreator;
-
+
/** am I shutting down? */
private volatile boolean isStopping;
/** state of collected artifacts during discovery */
final SearchState searchState = new SearchState();
-
+
/** a collection used to detect unit testing */
Set<String> unitTesting = new HashSet<>();
-
+
/** the view where quorum was most recently lost */
NetView quorumLostView;
-
+
static class SearchState {
Set<InternalDistributedMember> alreadyTried = new HashSet<>();
Set<InternalDistributedMember> registrants = new HashSet<>();
@@ -167,12 +166,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
boolean hasContactedAJoinedLocator;
NetView view;
Set<FindCoordinatorResponse> responses = new HashSet<>();
-
+
void cleanup() {
alreadyTried.clear();
possibleCoordinator = null;
view = null;
- synchronized(responses) {
+ synchronized (responses) {
responses.clear();
}
}
@@ -191,7 +190,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* @return true if successful, false if not
*/
public boolean join() {
-
+
try {
if (Boolean.getBoolean(BYPASS_DISCOVERY)) {
synchronized(viewInstallationLock) {
@@ -199,7 +198,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
return true;
}
-
+
SearchState state = searchState;
long locatorWaitTime = services.getConfig().getLocatorWaitTime() * 1000;
@@ -209,7 +208,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
long startTime = System.currentTimeMillis();
long locatorGiveUpTime = startTime + locatorWaitTime;
long giveupTime = startTime + timeout;
-
+
for (int tries=0; !this.isJoined; tries++) {
logger.debug("searching for the membership coordinator");
boolean found = findCoordinator();
@@ -256,18 +255,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return false;
}
} // for
-
+
if (!this.isJoined) {
logger.debug("giving up attempting to join the distributed system after " + (System.currentTimeMillis() - startTime) + "ms");
}
-
+
// to preserve old behavior we need to throw a SystemConnectException if
// unable to contact any of the locators
if (!this.isJoined && state.hasContactedAJoinedLocator) {
throw new SystemConnectException("Unable to join the distributed system in "
+ (System.currentTimeMillis()-startTime) + "ms");
}
-
+
return this.isJoined;
} finally {
// notify anyone waiting on the address to be completed
@@ -282,29 +281,31 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
/**
* send a join request and wait for a reply. Process the reply.
* This may throw a SystemConnectException or an AuthenticationFailedException
+ *
* @param coord
* @return true if the attempt succeeded, false if it timed out
*/
private boolean attemptToJoin() {
SearchState state = searchState;
-
+
// send a join request to the coordinator and wait for a response
InternalDistributedMember coord = state.possibleCoordinator;
logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
- JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress,
- services.getAuthenticator().getCredentials(coord));
-
+ JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord));
+ // add server socket port in the join request
+ if (services.getHealthMonitor().getSocketInfo().get(localAddress) != null) {
+ req.setSocketPort(services.getHealthMonitor().getSocketInfo().get(localAddress).getPort());
+ }
services.getMessenger().send(req);
-
+
JoinResponseMessage response = null;
- synchronized(joinResponse) {
+ synchronized (joinResponse) {
if (joinResponse[0] == null) {
try {
// Note that if we give up waiting but a response is on
// the way we will get the new view and join that way.
// See installView()
- long timeout = Math.max(services.getConfig().getMemberTimeout(),
- services.getConfig().getJoinTimeout()/5);
+ long timeout = Math.max(services.getConfig().getMemberTimeout(), services.getConfig().getJoinTimeout() / 5);
joinResponse.wait(timeout);
} catch (InterruptedException e) {
logger.debug("join attempt was interrupted");
@@ -319,7 +320,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
joinResponse[0] = null;
String failReason = response.getRejectionMessage();
if (failReason != null) {
- if (failReason.contains("Rejecting the attempt of a member using an older version")
+ if (failReason.contains("Rejecting the attempt of a member using an older version")
|| failReason.contains("15806")) {
throw new SystemConnectException(failReason);
}
@@ -335,8 +336,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
} else {
this.birthViewId = response.getMemberID().getVmViewId();
this.localAddress.setVmViewId(this.birthViewId);
- GMSMember me = (GMSMember)this.localAddress.getNetMember();
+ GMSMember me = (GMSMember) this.localAddress.getNetMember();
me.setBirthViewId(birthViewId);
+ services.getHealthMonitor().installSocketInfo(response.getCurrentView().getMembers(), response.getPortsForMembers());
installView(response.getCurrentView());
}
@@ -350,13 +352,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
return false;
}
-
-
+
/**
- * process a join request from another member. If this is the coordinator
+ * process a join request from another member. If this is the coordinator
* this method will enqueue the request for processing in another thread.
* If this is not the coordinator but the coordinator is known, the message
* is forwarded to the coordinator.
+ *
* @param incomingRequest
*/
private void processJoinRequest(JoinRequestMessage incomingRequest) {
@@ -364,8 +366,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("received join request from {}", incomingRequest.getMemberID());
if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
- logger.warn("detected an attempt to start a peer using an older version of the product {}",
- incomingRequest.getMemberID());
+ logger.warn("detected an attempt to start a peer using an older version of the product {}", incomingRequest.getMemberID());
JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
m.setRecipient(incomingRequest.getMemberID());
services.getMessenger().send(m);
@@ -379,26 +380,37 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
rejection = e.getMessage();
e.printStackTrace();
}
- if (rejection != null && rejection.length() > 0) {
+ if (rejection != null && rejection.length() > 0) {
JoinResponseMessage m = new JoinResponseMessage(rejection);
m.setRecipient(incomingRequest.getMemberID());
services.getMessenger().send(m);
return;
}
-
- if (!this.localAddress.getNetMember().preferredForCoordinator() &&
+
+ if (!this.localAddress.getNetMember().preferredForCoordinator() &&
incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) {
JoinResponseMessage m = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
+ // add socket ports of all members to join response
+ List<Integer> portsForMembers = new ArrayList<Integer>(currentView.size());
+ Map<InternalDistributedMember, InetSocketAddress> socketInfo = services.getHealthMonitor().getSocketInfo();
+ for (InternalDistributedMember mbr : currentView.getMembers()) {
+ InetSocketAddress addr = socketInfo.get(mbr);
+ if (addr != null) {
+ portsForMembers.add(Integer.valueOf(addr.getPort()));
+ } else {
+ portsForMembers.add(Integer.valueOf(-1));
+ }
+ }
+ m.setPortsForMembers(portsForMembers);
services.getMessenger().send(m);
return;
}
recordViewRequest(incomingRequest);
}
-
-
+
/**
- * Process a Leave request from another member. This may cause this member
- * to become the new membership coordinator. If this is the coordinator
+ * Process a Leave request from another member. This may cause this member
+ * to become the new membership coordinator. If this is the coordinator
* a new view will be triggered.
*
* @param incomingRequest
@@ -406,31 +418,30 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void processLeaveRequest(LeaveRequestMessage incomingRequest) {
logger.info("received leave request from {} for {}", incomingRequest.getSender(), incomingRequest.getMemberID());
-
-
+
NetView v = currentView;
InternalDistributedMember mbr = incomingRequest.getMemberID();
-
+
if (logger.isDebugEnabled()) {
- logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
- +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
+ logger.debug("JoinLeave.processLeaveRequest invoked. isCoordinator="+isCoordinator+ "; isStopping="+isStopping
+ +"; cancelInProgress="+ services.getCancelCriterion().isCancelInProgress());
}
if (!v.contains(mbr) && mbr.getVmViewId() < v.getViewId()) {
logger.debug("ignoring leave request from old member");
return;
}
-
+
if (incomingRequest.getMemberID().equals(this.localAddress)) {
logger.info("I am being told to leave the distributed system");
forceDisconnect(incomingRequest.getReason());
}
-
+
if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
logger.debug("JoinLeave is checking to see if I should become coordinator");
- NetView check = new NetView(v, v.getViewId()+1);
+ NetView check = new NetView(v, v.getViewId() + 1);
check.remove(incomingRequest.getMemberID());
- synchronized(removedMembers) {
+ synchronized (removedMembers) {
check.removeAll(removedMembers);
check.addCrashedMembers(removedMembers);
}
@@ -439,8 +450,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
becomeCoordinator(incomingRequest.getMemberID());
}
}
- }
- else {
+ } else {
if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
recordViewRequest(incomingRequest);
this.viewProcessor.processLeaveRequest(incomingRequest.getMemberID());
@@ -448,11 +458,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
-
+
/**
- * Process a Remove request from another member. This may cause this member
- * to become the new membership coordinator. If this is the coordinator
+ * Process a Remove request from another member. This may cause this member
+ * to become the new membership coordinator. If this is the coordinator
* a new view will be triggered.
*
* @param incomingRequest
@@ -462,13 +471,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
InternalDistributedMember mbr = incomingRequest.getMemberID();
- if (v != null && !v.contains(incomingRequest.getSender())) {
+ if (v != null && !v.contains(incomingRequest.getSender())) {
logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender());
return;
}
-
+
logger.info("Membership received a request to remove " + mbr
- + " from " + incomingRequest.getSender()
+ + " from " + incomingRequest.getSender()
+ " reason="+incomingRequest.getReason());
if (mbr.equals(this.localAddress)) {
@@ -476,16 +485,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
forceDisconnect(incomingRequest.getReason());
return;
}
-
+
if (getPendingRequestIDs(REMOVE_MEMBER_REQUEST).contains(mbr)) {
logger.debug("ignoring request as I already have a removal request for this member");
return;
}
-
+
if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
logger.debug("JoinLeave is checking to see if I should become coordinator");
- NetView check = new NetView(v, v.getViewId()+1);
- synchronized(removedMembers) {
+ NetView check = new NetView(v, v.getViewId() + 1);
+ synchronized (removedMembers) {
removedMembers.add(mbr);
check = new NetView(v, v.getViewId());
check.addCrashedMembers(removedMembers);
@@ -496,8 +505,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
becomeCoordinator(mbr);
}
}
- }
- else {
+ } else {
if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
recordViewRequest(incomingRequest);
this.viewProcessor.processRemoveRequest(mbr);
@@ -505,42 +513,41 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
-
+
private void recordViewRequest(DistributionMessage request) {
logger.debug("JoinLeave is recording the request to be processed in the next membership view");
- synchronized(viewRequests) {
+ synchronized (viewRequests) {
viewRequests.add(request);
viewRequests.notify();
}
}
-
+
// for testing purposes, returns a copy of the view requests for verification
List<DistributionMessage> getViewRequests() {
- synchronized(viewRequests) {
+ synchronized (viewRequests) {
return new LinkedList<DistributionMessage>(viewRequests);
}
}
-
+
// for testing purposes, returns the view-creation thread
ViewCreator getViewCreator() {
return viewCreator;
}
-
+
/**
* Yippeee - I get to be the coordinator
*/
void becomeCoordinator() { // package access for unit testing
becomeCoordinator(null);
}
-
-
+
public void becomeCoordinatorForTest() {
synchronized(viewInstallationLock) {
becomeCoordinator();
}
}
+
/**
* Transitions this member into the coordinator role. This must
* be invoked under a synch on viewInstallationLock that was held
@@ -581,7 +588,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (this.localAddress.getVmViewId() < 0) {
this.localAddress.setVmViewId(viewNumber);
}
-
List<InternalDistributedMember> mbrs = new ArrayList<>(currentView.getMembers());
if (!mbrs.contains(localAddress)) {
mbrs.add(localAddress);
@@ -605,40 +611,75 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
- private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
- for (InternalDistributedMember mbr: newMbrs) {
+
+ private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView, List<Integer> portsForMembers) {
+ for (InternalDistributedMember mbr : newMbrs) {
JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
+ response.setPortsForMembers(portsForMembers);
services.getMessenger().send(response);
}
}
-
- private void sendRemoveMessages(List<InternalDistributedMember> removals,
- List<String> reasons, NetView newView) {
+
+ private void sendRemoveMessages(List<InternalDistributedMember> removals, List<String> reasons, NetView newView) {
Iterator<String> reason = reasons.iterator();
- for (InternalDistributedMember mbr: removals) {
+ for (InternalDistributedMember mbr : removals) {
RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next());
services.getMessenger().send(response);
}
}
-
-
- boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers) {
- return sendView(view, newMembers, true, this.prepareProcessor);
+
+ boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
+ return sendView(view, newMembers, true, this.prepareProcessor, requests);
}
-
- void sendView(NetView view, Collection<InternalDistributedMember> newMembers) {
- sendView(view, newMembers, false, this.viewProcessor);
+
+ void sendView(NetView view, Collection<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
+ sendView(view, newMembers, false, this.viewProcessor, requests);
+ }
+
+ /**
+ * Build a list of socket ports for messages, e.g. InstallViewMessage, JoinResponseMessage
+ * @param view
+ * @param requests
+ * @return
+ */
+ private void addPorts(NetView view, List<DistributionMessage> requests, List<Integer> portsForMembers) {
+ Map<InternalDistributedMember, InetSocketAddress> socketInfo = services.getHealthMonitor().getSocketInfo();
+ Map<InternalDistributedMember, Integer> portMap = new ConcurrentHashMap<InternalDistributedMember, Integer>();
+ for (DistributionMessage req : requests) {
+ if (req.getDSFID() == JOIN_REQUEST) {
+ JoinRequestMessage joinReq = (JoinRequestMessage) req;
+ portMap.put(joinReq.getMemberID(), Integer.valueOf(joinReq.getSocketPort()));
+ }
+ }
+ for (InternalDistributedMember mbr : view.getMembers()) {
+ InetSocketAddress addr = socketInfo.get(mbr);
+ if (addr != null) {
+ portsForMembers.add(Integer.valueOf(addr.getPort()));
+ } else {
+ Integer port = portMap.get(mbr);
+ if (port != null) {
+ portsForMembers.add(port);
+ } else {
+ portsForMembers.add(Integer.valueOf(-1));
+ }
+ }
+ }
}
-
- boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
+ boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp,
+ List<DistributionMessage> requests) {
int id = view.getViewId();
InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing);
Set<InternalDistributedMember> recips = new HashSet<>(view.getMembers());
+ // add socket ports of all members to InstallViewMessage
+ List<Integer> portsForMembers = new ArrayList<Integer>(view.size());
+ if (requests != null) {
+ addPorts(view, requests, portsForMembers);
+ msg.setPortsForMembers(portsForMembers);
+ }
// a recent member was seen not to receive a new view - I think this is why
-// recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
+ // recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
recips.remove(this.localAddress); // no need to send it to ourselves
Set<InternalDistributedMember> responders = recips;
@@ -649,19 +690,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (preparing) {
this.preparedView = view;
+ if (requests != null) {
+ services.getHealthMonitor().installSocketInfo(view.getMembers(), portsForMembers);
+ }
} else {
installView(view);
}
-
+
if (recips.isEmpty()) {
logger.info("no recipients for new view aside from myself");
return true;
}
-
- logger.info((preparing? "preparing" : "sending") + " new view " + view);
+
+ logger.info((preparing ? "preparing" : "sending") + " new view " + view);
msg.setRecipients(recips);
-
+
Set<InternalDistributedMember> pendingLeaves = getPendingRequestIDs(LEAVE_REQUEST_MESSAGE);
Set<InternalDistributedMember> pendingRemovals = getPendingRequestIDs(REMOVE_MEMBER_REQUEST);
pendingRemovals.removeAll(view.getCrashedMembers());
@@ -672,78 +716,73 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// only wait for responses during preparation
if (preparing) {
logger.debug("waiting for view responses");
-
+
Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
logger.info("finished waiting for responses to view preparation");
-
+
InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
NetView conflictingView = rp.getConflictingView();
if (conflictingView != null) {
- logger.warn("received a conflicting membership view from " + conflictingViewSender
+ logger.warn("received a conflicting membership view from " + conflictingViewSender
+ " during preparation: " + conflictingView);
return false;
}
-
- if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) {
+
+ if (!failedToRespond.isEmpty() && (services.getCancelCriterion().cancelInProgress() == null)) {
logger.warn("these members failed to respond to the view change: " + failedToRespond);
return false;
}
}
-
+
return true;
}
-
-
private void processViewMessage(InstallViewMessage m) {
-
+
logger.debug("Membership: processing {}", m);
-
+
NetView view = m.getView();
-
- if (currentView != null && view.getViewId() < currentView.getViewId()) {
+
+ if (currentView != null && view.getViewId() < currentView.getViewId()) {
// ignore old views
ackView(m);
return;
}
-
-
+
if (m.isPreparing()) {
if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) {
services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
- }
- else {
+ } else {
this.preparedView = view;
+ if (!m.getPortsForMembers().isEmpty()) {
+ services.getHealthMonitor().installSocketInfo(view.getMembers(), m.getPortsForMembers());
+ }
ackView(m);
}
- }
- else { // !preparing
- if (currentView != null && !view.contains(this.localAddress)) {
+ } else { // !preparing
+ if (currentView != null && !view.contains(this.localAddress)) {
if (quorumRequired) {
forceDisconnect("This node is no longer in the membership view");
}
- }
- else {
+ } else {
ackView(m);
installView(view);
}
}
}
-
+
private void forceDisconnect(String reason) {
this.isStopping = true;
services.getManager().forceDisconnect(reason);
}
-
private void ackView(InstallViewMessage m) {
if (m.getView().contains(m.getView().getCreator())) {
services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
}
}
-
-
+
private void processViewAckMessage(ViewAckMessage m) {
if (m.isPrepareAck()) {
this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
@@ -751,15 +790,15 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.viewProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
}
}
-
+
/**
* This contacts the locators to find out who the current coordinator is.
- * All locators are contacted. If they don't agree then we choose the oldest
+ * All locators are contacted. If they don't agree then we choose the oldest
* coordinator and return it.
*/
private boolean findCoordinator() {
SearchState state = searchState;
-
+
assert this.localAddress != null;
// If we've already tried to bootstrap from locators that
@@ -769,7 +808,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if ( !state.hasContactedAJoinedLocator && state.view != null) {
return findCoordinatorFromView();
}
-
+
FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId);
Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
@@ -778,14 +817,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
int connectTimeout = (int)services.getConfig().getMemberTimeout();
boolean anyResponses = false;
boolean flagsSet = false;
-
+
logger.debug("sending {} to {}", request, locators);
state.hasContactedAJoinedLocator = false;
state.locatorsContacted = 0;
do {
- for (InetSocketAddress addr: locators) {
+ for (InetSocketAddress addr : locators) {
try {
Object o = TcpClient.requestToServer(
addr.getAddress(), addr.getPort(), request, connectTimeout,
@@ -846,7 +885,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
InternalDistributedMember coord = null;
boolean coordIsNoob = true;
- for (; it.hasNext(); ) {
+ for (; it.hasNext();) {
InternalDistributedMember mbr = it.next();
if (!state.alreadyTried.contains(mbr)) {
boolean mbrIsNoob = (mbr.getVmViewId() < 0);
@@ -866,7 +905,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
return true;
}
-
+
boolean findCoordinatorFromView() {
ArrayList<FindCoordinatorResponse> result;
SearchState state = searchState;
@@ -883,8 +922,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId);
req.setRecipients(v.getMembers());
- boolean testing = unitTesting.contains("findCoordinatorFromView");
- synchronized(state.responses) {
+ boolean testing = unitTesting.contains("findCoordinatorFromView");
+ synchronized (state.responses) {
if (!testing) {
state.responses.clear();
}
@@ -900,7 +939,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
result = new ArrayList<>(state.responses);
state.responses.clear();
}
-
+
InternalDistributedMember coord = null;
if (localAddress.getNetMember().preferredForCoordinator()) {
// it's possible that all other potential coordinators are gone
@@ -908,13 +947,13 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
coord = localAddress;
}
boolean coordIsNoob = true;
- for (FindCoordinatorResponse resp: result) {
+ for (FindCoordinatorResponse resp : result) {
InternalDistributedMember mbr = resp.getCoordinator();
if (!state.alreadyTried.contains(mbr)) {
boolean mbrIsNoob = (mbr.getVmViewId() < 0);
if (mbrIsNoob) {
// member has not yet joined
- if (coordIsNoob && (coord == null || coord.compareTo(mbr,false) > 0)) {
+ if (coordIsNoob && (coord == null || coord.compareTo(mbr, false) > 0)) {
coord = mbr;
}
} else {
@@ -926,11 +965,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
+
state.possibleCoordinator = coord;
return coord != null;
}
-
+
/**
* Some settings are gleaned from locator responses and set into the local
* configuration
@@ -938,11 +977,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
private void inheritSettingsFromLocator(InetSocketAddress addr, FindCoordinatorResponse response) {
boolean enabled = response.isNetworkPartitionDetectionEnabled();
if (!enabled && services.getConfig().isNetworkPartitionDetectionEnabled()) {
- throw new GemFireConfigException("locator at "+addr
+ throw new GemFireConfigException("locator at "+addr
+" does not have network-partition-detection enabled but my configuration has it enabled");
}
- GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
+ GMSMember mbr = (GMSMember) this.localAddress.getNetMember();
mbr.setSplitBrainEnabled(enabled);
services.getConfig().setNetworkPartitionDetectionEnabled(enabled);
services.getConfig().getDistributionConfig().setEnableNetworkPartitionDetection(enabled);
@@ -950,28 +989,29 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (response.isUsePreferredCoordinators()) {
this.quorumRequired = true;
logger.debug("The locator indicates that all locators should be preferred as coordinators");
- if (services.getLocator() != null
- || Locator.hasLocator()
+ if (services.getLocator() != null
+ || Locator.hasLocator()
|| !services.getConfig().getDistributionConfig().getStartLocator().isEmpty()
|| localAddress.getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
- ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true);
+ ((GMSMember) localAddress.getNetMember()).setPreferredForCoordinator(true);
}
} else {
- ((GMSMember)localAddress.getNetMember()).setPreferredForCoordinator(true);
+ ((GMSMember) localAddress.getNetMember()).setPreferredForCoordinator(true);
}
}
-
+
/**
* receives a JoinResponse holding a membership view or rejection message
+ *
* @param rsp
*/
private void processJoinResponse(JoinResponseMessage rsp) {
- synchronized(joinResponse) {
+ synchronized (joinResponse) {
joinResponse[0] = rsp;
joinResponse.notify();
}
}
-
+
private void processFindCoordinatorRequest(FindCoordinatorRequest req) {
FindCoordinatorResponse resp;
if (this.isJoined) {
@@ -983,16 +1023,15 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
resp.setRecipient(req.getMemberID());
services.getMessenger().send(resp);
}
-
+
private void processFindCoordinatorResponse(FindCoordinatorResponse resp) {
- synchronized(searchState.responses) {
+ synchronized (searchState.responses) {
searchState.responses.add(resp);
}
}
-
+
private void processNetworkPartitionMessage(NetworkPartitionMessage msg) {
- String str = "Membership coordinator "
- + msg.getSender() + " has declared that a network partition has occurred";
+ String str = "Membership coordinator " + msg.getSender() + " has declared that a network partition has occurred";
forceDisconnect(str);
}
@@ -1000,7 +1039,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
public NetView getView() {
return currentView;
}
-
+
public NetView getPreviousView() {
return previousView;
}
@@ -1009,35 +1048,34 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
public InternalDistributedMember getMemberID() {
return this.localAddress;
}
-
+
public void installView(NetView newView) {
-
+
logger.info("received new view: {}\nold view is: {}", newView, currentView);
-
- synchronized(viewInstallationLock) {
+
+ synchronized (viewInstallationLock) {
if (currentView != null && currentView.getViewId() >= newView.getViewId()) {
// old view - ignore it
return;
}
-
+
if (currentView == null && !this.isJoined) {
- for (InternalDistributedMember mbr: newView.getMembers()) {
+ for (InternalDistributedMember mbr : newView.getMembers()) {
if (this.localAddress.equals(mbr)) {
this.birthViewId = mbr.getVmViewId();
this.localAddress.setVmViewId(this.birthViewId);
- GMSMember me = (GMSMember)this.localAddress.getNetMember();
+ GMSMember me = (GMSMember) this.localAddress.getNetMember();
me.setBirthViewId(birthViewId);
isJoined = true;
break;
}
}
}
-
+
if (isNetworkPartition(newView)) {
if (quorumRequired) {
Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
- forceDisconnect(
- LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
+ forceDisconnect(LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
return;
}
}
@@ -1046,7 +1084,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
preparedView = null;
lastConflictingView = null;
services.installView(newView);
-
+
if (!newView.getCreator().equals(this.localAddress)) {
if (newView.shouldBeCoordinator(this.localAddress)) {
becomeCoordinator();
@@ -1057,20 +1095,20 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
if (!this.isCoordinator) {
- // get rid of outdated requests. It's possible some requests are
+ // get rid of outdated requests. It's possible some requests are
// newer than the view just processed - the senders will have to
// resend these
- synchronized(viewRequests) {
- for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
+ synchronized (viewRequests) {
+ for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext();) {
DistributionMessage m = it.next();
if (m instanceof JoinRequestMessage) {
it.remove();
} else if (m instanceof LeaveRequestMessage) {
- if (!currentView.contains(((LeaveRequestMessage)m).getMemberID())) {
+ if (!currentView.contains(((LeaveRequestMessage) m).getMemberID())) {
it.remove();
}
} else if (m instanceof RemoveMemberMessage) {
- if (!currentView.contains(((RemoveMemberMessage)m).getMemberID())) {
+ if (!currentView.contains(((RemoveMemberMessage) m).getMemberID())) {
it.remove();
}
}
@@ -1078,11 +1116,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
- synchronized(removedMembers) {
+ synchronized (removedMembers) {
removedMembers.clear();
}
}
-
+
/**
* Sends a message declaring a network partition to the
* members of the given view via Messenger
@@ -1099,8 +1137,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.debug("unable to send network partition message - continuing", e);
}
}
-
-
+
/**
* returns true if this member thinks it is the membership coordinator
* for the distributed system
@@ -1108,21 +1145,20 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
public boolean isCoordinator() {
return this.isCoordinator;
}
-
+
/**
* return true if we're stopping or are stopped
*/
public boolean isStopping() {
return this.isStopping;
}
-
+
/**
* returns the currently prepared view, if any
*/
public NetView getPreparedView() {
return this.preparedView;
}
-
/**
* check to see if the new view shows a drop of 51% or more
@@ -1134,32 +1170,29 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
int oldWeight = currentView.memberWeight();
int failedWeight = newView.getCrashedMemberWeight(currentView);
if (failedWeight > 0) {
- if (logger.isInfoEnabled()
- && !newView.getCreator().equals(localAddress)) { // view-creator logs this
+ if (logger.isInfoEnabled() && !newView.getCreator().equals(localAddress)) { // view-creator logs this
newView.logCrashedMemberWeights(currentView, logger);
}
- int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0);
+ int failurePoint = (int) (Math.round(51 * oldWeight) / 100.0);
if (failedWeight > failurePoint && quorumLostView != newView) {
quorumLostView = newView;
- logger.warn("total weight lost in this view change is {} of {}. Quorum has been lost!",
- failedWeight, oldWeight);
+ logger.warn("total weight lost in this view change is {} of {}. Quorum has been lost!", failedWeight, oldWeight);
services.getManager().quorumLost(newView.getActualCrashedMembers(currentView), currentView);
return true;
}
}
return false;
}
-
-
+
private void stopCoordinatorServices() {
if (viewCreator != null && !viewCreator.isShutdown()) {
viewCreator.shutdown();
}
}
-
+
public static void loadEmergencyClasses() {
}
-
+
@Override
public void emergencyClose() {
isStopping = true;
@@ -1177,29 +1210,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
public void beHealthy() {
}
-
-
@Override
public void start() {
}
-
-
@Override
public void started() {
this.localAddress = services.getMessenger().getMemberID();
}
-
-
@Override
public void stop() {
logger.debug("JoinLeave stopping");
leave();
}
-
-
@Override
public void stopped() {
}
@@ -1210,12 +1235,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
viewProcessor.memberSuspected(initiator, suspect);
}
-
-
@Override
public void leave() {
boolean waitForProcessing = false;
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
NetView view = currentView;
isStopping = true;
stopCoordinatorServices();
@@ -1223,15 +1246,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (view.size() > 1) {
if (this.isCoordinator) {
logger.debug("JoinLeave stopping coordination services");
- NetView newView = new NetView(view, view.getViewId()+1);
+ NetView newView = new NetView(view, view.getViewId() + 1);
newView.remove(localAddress);
InstallViewMessage m = new InstallViewMessage(newView, services.getAuthenticator().getCredentials(this.localAddress));
m.setRecipients(newView.getMembers());
services.getMessenger().send(m);
waitForProcessing = true;
- }
- else {
- List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 5);
+ } else {
+ List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 5);
logger.debug("JoinLeave sending a leave request to {}", coords);
LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down");
@@ -1239,38 +1261,32 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
waitForProcessing = true;
}
} // view.size
- }// view != null
+ } // view != null
}
if (waitForProcessing) {
try {
Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME);
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
-
-
@Override
public void remove(InternalDistributedMember m, String reason) {
NetView v = this.currentView;
-
+
services.getCancelCriterion().checkCancelInProgress(null);
-
+
if (v != null && v.contains(m)) {
Set<InternalDistributedMember> filter = new HashSet<>();
filter.add(m);
- RemoveMemberMessage msg = new RemoveMemberMessage(v.getPreferredCoordinators(filter, getMemberID(), 5),
- m,
- reason);
+ RemoveMemberMessage msg = new RemoveMemberMessage(v.getPreferredCoordinators(filter, getMemberID(), 5), m, reason);
msg.setSender(this.localAddress);
processRemoveRequest(msg);
if (!this.isCoordinator) {
msg.resetRecipients();
- msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(),
- localAddress, 10));
+ msg.setRecipients(v.getPreferredCoordinators(Collections.<InternalDistributedMember> emptySet(), localAddress, 10));
services.getMessenger().send(msg);
}
}
@@ -1278,32 +1294,29 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
@Override
public void memberShutdown(DistributedMember mbr, String reason) {
-
+
if (this.isCoordinator) {
- LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember)mbr, reason);
+ LeaveRequestMessage msg = new LeaveRequestMessage(Collections.singleton(this.localAddress), (InternalDistributedMember) mbr, reason);
recordViewRequest(msg);
}
}
-
@Override
public void disableDisconnectOnQuorumLossForTesting() {
this.quorumRequired = false;
}
-
+
@Override
public void init(Services s) {
this.services = s;
-
+
DistributionConfig dc = services.getConfig().getDistributionConfig();
- if (dc.getMcastPort() != 0
- && dc.getLocators().trim().isEmpty()
- && dc.getStartLocator().trim().isEmpty()) {
- throw new GemFireConfigException("Multicast cannot be configured for a non-distributed cache."
- + " Please configure the locator services for this cache using "+DistributionConfig.LOCATORS_NAME
- + " or " + DistributionConfig.START_LOCATOR_NAME+".");
+ if (dc.getMcastPort() != 0 && dc.getLocators().trim().isEmpty() && dc.getStartLocator().trim().isEmpty()) {
+ throw new GemFireConfigException(
+ "Multicast cannot be configured for a non-distributed cache." + " Please configure the locator services for this cache using "
+ + DistributionConfig.LOCATORS_NAME + " or " + DistributionConfig.START_LOCATOR_NAME + ".");
}
-
+
services.getMessenger().addHandler(JoinRequestMessage.class, this);
services.getMessenger().addHandler(JoinResponseMessage.class, this);
services.getMessenger().addHandler(InstallViewMessage.class, this);
@@ -1324,9 +1337,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue();
this.viewAckTimeout = ackCollectionTimeout;
-
+
this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection();
-
+
DistributionConfig dconfig = services.getConfig().getDistributionConfig();
String bindAddr = dconfig.getBindAddress();
locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
@@ -1340,37 +1353,36 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.debug("JoinLeave processing {}", m);
switch (m.getDSFID()) {
case JOIN_REQUEST:
- processJoinRequest((JoinRequestMessage)m);
+ processJoinRequest((JoinRequestMessage) m);
break;
case JOIN_RESPONSE:
- processJoinResponse((JoinResponseMessage)m);
+ processJoinResponse((JoinResponseMessage) m);
break;
case INSTALL_VIEW_MESSAGE:
- processViewMessage((InstallViewMessage)m);
+ processViewMessage((InstallViewMessage) m);
break;
case VIEW_ACK_MESSAGE:
- processViewAckMessage((ViewAckMessage)m);
+ processViewAckMessage((ViewAckMessage) m);
break;
case LEAVE_REQUEST_MESSAGE:
- processLeaveRequest((LeaveRequestMessage)m);
+ processLeaveRequest((LeaveRequestMessage) m);
break;
case REMOVE_MEMBER_REQUEST:
- processRemoveRequest((RemoveMemberMessage)m);
+ processRemoveRequest((RemoveMemberMessage) m);
break;
case FIND_COORDINATOR_REQ:
- processFindCoordinatorRequest((FindCoordinatorRequest)m);
+ processFindCoordinatorRequest((FindCoordinatorRequest) m);
break;
case FIND_COORDINATOR_RESP:
- processFindCoordinatorResponse((FindCoordinatorResponse)m);
+ processFindCoordinatorResponse((FindCoordinatorResponse) m);
break;
case NETWORK_PARTITION_MESSAGE:
- processNetworkPartitionMessage((NetworkPartitionMessage)m);
+ processNetworkPartitionMessage((NetworkPartitionMessage) m);
break;
default:
throw new IllegalArgumentException("unknown message type: " + m);
}
}
-
/**
* returns the member IDs of the pending requests having the given
@@ -1378,17 +1390,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
*/
Set<InternalDistributedMember> getPendingRequestIDs(int theDSFID) {
Set<InternalDistributedMember> result = new HashSet<>();
- synchronized(viewRequests) {
- for (DistributionMessage msg: viewRequests) {
+ synchronized (viewRequests) {
+ for (DistributionMessage msg : viewRequests) {
if (msg.getDSFID() == theDSFID) {
- result.add(((HasMemberID)msg).getMemberID());
+ result.add(((HasMemberID) msg).getMemberID());
}
}
}
return result;
}
-
-
+
class ViewReplyProcessor {
volatile int viewId = -1;
final Set<InternalDistributedMember> notRepliedYet = new HashSet<>();
@@ -1397,11 +1408,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
boolean waiting;
final boolean isPrepareViewProcessor;
final Set<InternalDistributedMember> pendingRemovals = new HashSet<>();
-
+
ViewReplyProcessor(boolean forPreparation) {
this.isPrepareViewProcessor = forPreparation;
}
-
+
synchronized void initialize(int viewId, Set<InternalDistributedMember> recips) {
waiting = true;
this.viewId = viewId;
@@ -1410,26 +1421,24 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
conflictingView = null;
pendingRemovals.clear();
}
-
- synchronized void processPendingRequests(Set<InternalDistributedMember> pendingLeaves,
- Set<InternalDistributedMember> pendingRemovals) {
+
+ synchronized void processPendingRequests(Set<InternalDistributedMember> pendingLeaves, Set<InternalDistributedMember> pendingRemovals) {
// there's no point in waiting for members who have already
// requested to leave or who have been declared crashed.
// We don't want to mix the two because pending removals
// aren't reflected as having crashed in the current view
// and need to cause a new view to be generated
- for (InternalDistributedMember mbr: pendingLeaves) {
+ for (InternalDistributedMember mbr : pendingLeaves) {
notRepliedYet.remove(mbr);
}
- for (InternalDistributedMember mbr: pendingRemovals) {
+ for (InternalDistributedMember mbr : pendingRemovals) {
if (this.notRepliedYet.contains(mbr)) {
this.pendingRemovals.add(mbr);
}
}
}
-
- synchronized void memberSuspected(InternalDistributedMember initiator,
- InternalDistributedMember suspect) {
+
+ synchronized void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect) {
if (waiting) {
// we will do a final check on this member if it hasn't already
// been done, so stop waiting for it now
@@ -1440,14 +1449,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
+
synchronized void processLeaveRequest(InternalDistributedMember mbr) {
if (waiting) {
logger.debug("view response processor recording leave request for {}", mbr);
stopWaitingFor(mbr);
}
}
-
+
synchronized void processRemoveRequest(InternalDistributedMember mbr) {
if (waiting) {
logger.debug("view response processor recording remove request for {}", mbr);
@@ -1455,12 +1464,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
checkIfDone();
}
}
-
+
synchronized void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) {
if (!waiting) {
return;
}
-
+
if (viewId == this.viewId) {
if (conflictingView != null) {
this.conflictingViewSender = sender;
@@ -1477,11 +1486,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
notRepliedYet.remove(mbr);
checkIfDone();
}
-
+
/** call with synchronized(this) */
private void checkIfDone() {
- if (notRepliedYet.isEmpty() ||
- (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) {
+ if (notRepliedYet.isEmpty() || (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) {
logger.debug("All anticipated view responses received - notifying waiting thread");
waiting = false;
notify();
@@ -1489,15 +1497,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.debug("Still waiting for these view replies: {}", notRepliedYet);
}
}
-
+
Set<InternalDistributedMember> waitForResponses() {
Set<InternalDistributedMember> result = this.notRepliedYet;
long endOfWait = System.currentTimeMillis() + viewAckTimeout;
try {
- while (System.currentTimeMillis() < endOfWait
- && (services.getCancelCriterion().cancelInProgress() == null)) {
+ while (System.currentTimeMillis() < endOfWait && (services.getCancelCriterion().cancelInProgress() == null)) {
try {
- synchronized(this) {
+ synchronized (this) {
if (!waiting || result.isEmpty() || this.conflictingView != null) {
break;
}
@@ -1522,56 +1529,53 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
return result;
}
-
+
NetView getConflictingView() {
return this.conflictingView;
}
-
+
InternalDistributedMember getConflictingViewSender() {
return this.conflictingViewSender;
}
-
+
Set<InternalDistributedMember> getUnresponsiveMembers() {
return this.notRepliedYet;
}
}
-
-
-
-
class ViewCreator extends Thread {
boolean shutdown = false;
volatile boolean waiting = false;
-
+
NetView initialView;
Set<InternalDistributedMember> initialLeaving;
Set<InternalDistributedMember> initialRemovals;
-
+
ViewCreator(String name, ThreadGroup tg) {
super(tg, name);
}
-
+
void shutdown() {
shutdown = true;
- synchronized(viewRequests) {
+ synchronized (viewRequests) {
viewRequests.notify();
interrupt();
}
}
-
+
boolean isShutdown() {
return shutdown;
}
-
+
boolean isWaiting() {
return waiting;
}
-
+
/**
* All views should be sent by the ViewCreator thread, so
* if this member becomes coordinator it may have an initial
* view to transmit that announces the removal of the former coordinator to
+ *
* @param newView
* @param leaving - members leaving in this view
* @param removals - members crashed in this view
@@ -1581,12 +1585,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.initialLeaving = leaving;
this.initialRemovals = removals;
}
-
+
private void sendInitialView() {
if (initialView != null) {
try {
- prepareAndSendView(initialView, Collections.<InternalDistributedMember>emptyList(),
- initialLeaving, initialRemovals);
+ prepareAndSendView(initialView, Collections.<InternalDistributedMember> emptyList(), initialLeaving, initialRemovals, null);
} finally {
this.initialView = null;
this.initialLeaving = null;
@@ -1603,7 +1606,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
long okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
try {
for (;;) {
- synchronized(viewRequests) {
+ synchronized (viewRequests) {
if (shutdown) {
return;
}
@@ -1645,7 +1648,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
} // synchronized
- if (requests != null && !requests.isEmpty()) {
+ if (requests != null && !requests.isEmpty()) {
logger.info("View Creator is processing {} requests for the next membership view", requests.size());
try {
createAndSendView(requests);
@@ -1659,7 +1662,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
shutdown = true;
}
}
-
+
/**
* Create a new membership view and send it to members (including crashed members).
* Returns false if the view cannot be prepared successfully, true otherwise
@@ -1678,17 +1681,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
oldMembers = Collections.emptyList();
}
Set<InternalDistributedMember> oldIDs = new HashSet<>();
-
- for (DistributionMessage msg: requests) {
+
+ for (DistributionMessage msg : requests) {
logger.debug("processing request {}", msg);
InternalDistributedMember mbr = null;
switch (msg.getDSFID()) {
case JOIN_REQUEST:
- mbr = ((JoinRequestMessage)msg).getMemberID();
- // see if an old member ID is being reused. If
+ mbr = ((JoinRequestMessage) msg).getMemberID();
+ // see if an old member ID is being reused. If
// so we'll remove it from the new view
- for (InternalDistributedMember m: oldMembers) {
+ for (InternalDistributedMember m : oldMembers) {
if (mbr.compareTo(m, false) == 0) {
oldIDs.add(m);
break;
@@ -1710,84 +1713,82 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
removalReqs.add(mbr);
removalReasons.add(((RemoveMemberMessage) msg).getReason());
} else {
- sendRemoveMessages(Collections.<InternalDistributedMember>singletonList(mbr),
- Collections.<String>singletonList(((RemoveMemberMessage)msg).getReason()),
- currentView);
+ sendRemoveMessages(Collections.<InternalDistributedMember> singletonList(mbr),
+ Collections.<String> singletonList(((RemoveMemberMessage) msg).getReason()), currentView);
}
break;
- default:
+ default:
logger.warn("Unknown membership request encountered: {}", msg);
break;
}
}
-
- for (InternalDistributedMember mbr: oldIDs) {
+
+ for (InternalDistributedMember mbr : oldIDs) {
if (!leaveReqs.contains(mbr) && !removalReqs.contains(mbr)) {
removalReqs.add(mbr);
removalReasons.add("Removal of old ID that has been reused");
}
}
-
+
if (removalReqs.isEmpty() && leaveReqs.isEmpty() && joinReqs.isEmpty()) {
return;
}
-
+
NetView newView;
- synchronized(viewInstallationLock) {
+ synchronized (viewInstallationLock) {
int viewNumber = 0;
List<InternalDistributedMember> mbrs;
if (currentView == null) {
mbrs = new ArrayList<InternalDistributedMember>(joinReqs.size());
} else {
- viewNumber = currentView.getViewId()+1;
+ viewNumber = currentView.getViewId() + 1;
mbrs = new ArrayList<InternalDistributedMember>(oldMembers);
}
mbrs.addAll(joinReqs);
mbrs.removeAll(leaveReqs);
mbrs.removeAll(removalReqs);
- newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs,
- new HashSet<InternalDistributedMember>(removalReqs));
+ newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, new HashSet<InternalDistributedMember>(removalReqs));
}
-
+
// if there are no membership changes then abort creation of
// the new view
if (newView.getMembers().equals(currentView.getMembers())) {
logger.info("membership hasn't changed - aborting new view {}", newView);
return;
}
-
- for (InternalDistributedMember mbr: joinReqs) {
+
+ for (InternalDistributedMember mbr : joinReqs) {
mbr.setVmViewId(newView.getViewId());
mbr.getNetMember().setSplitBrainEnabled(services.getConfig().isNetworkPartitionDetectionEnabled());
}
// send removal messages before installing the view so we stop
// getting messages from members that have been kicked out
sendRemoveMessages(removalReqs, removalReasons, newView);
-
+
// we want to always check for quorum loss but don't act on it
// unless network-partition-detection is enabled
- if ( !(isNetworkPartition(newView) && quorumRequired) ) {
- sendJoinResponses(joinReqs, newView);
+ if (!(isNetworkPartition(newView) && quorumRequired)) {
+ // add socket ports of all members to join response
+ List<Integer> portsForMembers = new ArrayList<Integer>(newView.size());
+ addPorts(newView, requests, portsForMembers);
+ sendJoinResponses(joinReqs, newView, portsForMembers);
}
- prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers());
+ prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers(), requests);
return;
}
-
-
+
/**
* This handles the 2-phase installation of the view
*/
- void prepareAndSendView(NetView newView,
- List<InternalDistributedMember> joinReqs,
- Set<InternalDistributedMember> leaveReqs,
- Set<InternalDistributedMember> removalReqs) {
+ void prepareAndSendView(NetView newView, List<InternalDistributedMember> joinReqs, Set<InternalDistributedMember> leaveReqs,
+ Set<InternalDistributedMember> removalReqs, List<DistributionMessage> requests) {
boolean prepared = false;
do {
if (this.shutdown || Thread.currentThread().isInterrupted()) {
return;
}
-
+
if (quorumRequired && isNetworkPartition(newView)) {
sendNetworkPartitionMessage(newView);
try {
@@ -1798,13 +1799,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
- forceDisconnect(
- LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
+ forceDisconnect(LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
shutdown = true;
return;
}
- prepared = prepareView(newView, joinReqs);
+ prepared = prepareView(newView, joinReqs, requests);
logger.debug("view preparation phase completed. prepared={}", prepared);
if (prepared) {
@@ -1827,12 +1827,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
List<InternalDistributedMember> failures = new ArrayList<>(currentView.getCrashedMembers().size() + unresponsive.size());
NetView conflictingView = prepareProcessor.getConflictingView();
- if (conflictingView != null
- && !conflictingView.getCreator().equals(localAddress)
- && conflictingView.getViewId() > newView.getViewId()
+ if (conflictingView != null && !conflictingView.getCreator().equals(localAddress) && conflictingView.getViewId() > newView.getViewId()
&& (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
lastConflictingView = conflictingView;
- logger.info("adding these crashed members from a conflicting view to the crash-set for the next view: {}\nconflicting view: {}", unresponsive, conflictingView);
+ logger.info("adding these crashed members from a conflicting view to the crash-set for the next view: {}\nconflicting view: {}", unresponsive,
+ conflictingView);
failures.addAll(conflictingView.getCrashedMembers());
}
@@ -1849,43 +1848,43 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
removalReqs.addAll(failures);
List<InternalDistributedMember> newMembers = new ArrayList<>(newView.getMembers());
newMembers.removeAll(removalReqs);
- newView = new NetView(localAddress, newView.getViewId()+1, newMembers, leaveReqs,
- removalReqs);
+ newView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs);
}
} while (!prepared);
-
+
lastConflictingView = null;
-
- sendView(newView, joinReqs);
+
+ sendView(newView, joinReqs, requests);
}
-
+
/**
* performs health checks on the collection of members, removing any that
* are found to be healthy
+ *
* @param mbrs
*/
private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
- List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
-
+ List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
+
Set<InternalDistributedMember> newRemovals = new HashSet<>();
Set<InternalDistributedMember> newLeaves = new HashSet<>();
-
- synchronized(viewRequests) {
- for (DistributionMessage msg: viewRequests) {
+
+ synchronized (viewRequests) {
+ for (DistributionMessage msg : viewRequests) {
switch (msg.getDSFID()) {
case LEAVE_REQUEST_MESSAGE:
- newLeaves.add(((LeaveRequestMessage)msg).getMemberID());
+ newLeaves.add(((LeaveRequestMessage) msg).getMemberID());
break;
case REMOVE_MEMBER_REQUEST:
- newRemovals.add(((RemoveMemberMessage)msg).getMemberID());
+ newRemovals.add(((RemoveMemberMessage) msg).getMemberID());
break;
default:
break;
}
}
}
-
- for (InternalDistributedMember mbr: mbrs) {
+
+ for (InternalDistributedMember mbr : mbrs) {
if (newRemovals.contains(mbr)) {
// no need to do a health check on a member who is already leaving
logger.info("member {} is already scheduled for removal", mbr);
@@ -1911,22 +1910,23 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
});
}
-
+
mbrs.removeAll(newLeaves);
-
+
if (mbrs.isEmpty()) {
return;
}
-
+
ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() {
AtomicInteger i = new AtomicInteger();
+
@Override
public Thread newThread(Runnable r) {
return new Thread(Services.getThreadGroup(), r,
"GemFire View Creator verification thread " + i.incrementAndGet());
}
});
-
+
try {
List<Future<InternalDistributedMember>> futures;
futures = svc.invokeAll(checkers);
@@ -1957,5 +1957,5 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
}
-
+
}