You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/11/20 22:02:14 UTC
[20/50] [abbrv] incubator-geode git commit: GEODE-77: moving failure
detection sockets into the membership view
GEODE-77: moving failure detection sockets into the membership view
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c152e20b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c152e20b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c152e20b
Branch: refs/heads/develop
Commit: c152e20b01518aa2dd19d46060e34de1ab2fa91a
Parents: 74d4ed3
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Fri Oct 30 09:02:45 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Fri Oct 30 10:20:14 2015 -0700
----------------------------------------------------------------------
.../internal/membership/NetView.java | 69 +++++++-
.../internal/membership/gms/GMSUtil.java | 20 +++
.../membership/gms/fd/GMSHealthMonitor.java | 60 +++----
.../gms/interfaces/HealthMonitor.java | 12 +-
.../membership/gms/membership/GMSJoinLeave.java | 165 +++++++------------
.../gms/messages/InstallViewMessage.java | 43 -----
.../gms/messages/JoinRequestMessage.java | 19 +--
.../gms/messages/JoinResponseMessage.java | 43 -----
.../gms/messenger/JGroupsMessenger.java | 14 +-
.../membership/gms/messenger/StatRecorder.java | 13 ++
.../internal/InternalDataSerializer.java | 41 +++++
.../gemfire/internal/util/PluckStacks.java | 4 +-
.../gemfire/distributed/LocatorDUnitTest.java | 1 +
.../gms/membership/GMSJoinLeaveJUnitTest.java | 10 +-
.../membership/gms/messenger/InterceptUDP.java | 8 +
.../messenger/JGroupsMessengerJUnitTest.java | 91 +++++++++-
.../sanctionedDataSerializables.txt | 20 +--
17 files changed, 364 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index b33b5fc..76588a7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -11,6 +11,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -23,7 +24,6 @@ import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.Version;
@@ -36,16 +36,17 @@ import com.gemstone.gemfire.internal.Version;
* @since 5.5
*/
public class NetView implements DataSerializableFixedID {
- private static final long serialVersionUID = -8888347937416039434L;
private int viewId;
private List<InternalDistributedMember> members;
+ private int[] failureDetectionPorts = new int[10];
private Set<InternalDistributedMember> shutdownMembers;
private Set<InternalDistributedMember> crashedMembers;
private InternalDistributedMember creator;
private Set<InternalDistributedMember> hashedMembers;
static public final Random RANDOM = new Random();
+
public NetView() {
viewId = 0;
members = new ArrayList<InternalDistributedMember>(4);
@@ -53,27 +54,29 @@ public class NetView implements DataSerializableFixedID {
shutdownMembers = Collections.emptySet();
crashedMembers = new HashSet<>();
creator = null;
+ Arrays.fill(failureDetectionPorts, -1);
}
public NetView(InternalDistributedMember creator) {
viewId = 0;
members = new ArrayList<InternalDistributedMember>(4);
members.add(creator);
- this.hashedMembers = new HashSet<>(members);
+ hashedMembers = new HashSet<>(members);
shutdownMembers = new HashSet<>();
crashedMembers = Collections.emptySet();
this.creator = creator;
- int seed = creator.hashCode() + (int) System.currentTimeMillis();
+ Arrays.fill(failureDetectionPorts, -1);
}
// legacy method for JGMM
public NetView(int size, long viewId) {
this.viewId = (int) viewId;
members = new ArrayList<InternalDistributedMember>(size);
- this.hashedMembers = new HashSet<InternalDistributedMember>(members);
+ this.hashedMembers = new HashSet<InternalDistributedMember>();
shutdownMembers = new HashSet<>();
crashedMembers = Collections.emptySet();
creator = null;
+ Arrays.fill(failureDetectionPorts, -1);
}
public NetView(NetView other, int viewId) {
@@ -81,6 +84,8 @@ public class NetView implements DataSerializableFixedID {
this.viewId = viewId;
this.members = new ArrayList<InternalDistributedMember>(other.members);
this.hashedMembers = new HashSet<InternalDistributedMember>(other.members);
+ this.failureDetectionPorts = new int[other.failureDetectionPorts.length];
+ System.arraycopy(other.failureDetectionPorts, 0, this.failureDetectionPorts, 0, other.failureDetectionPorts.length);
this.shutdownMembers = new HashSet<InternalDistributedMember>(other.shutdownMembers);
this.crashedMembers = new HashSet<InternalDistributedMember>(other.crashedMembers);
}
@@ -93,6 +98,8 @@ public class NetView implements DataSerializableFixedID {
this.hashedMembers = new HashSet<InternalDistributedMember>(mbrs);
this.shutdownMembers = shutdowns;
this.crashedMembers = crashes;
+ this.failureDetectionPorts = new int[mbrs.size()+10];
+ Arrays.fill(this.failureDetectionPorts, -1);
}
public int getViewId() {
@@ -106,7 +113,45 @@ public class NetView implements DataSerializableFixedID {
public void setCreator(InternalDistributedMember creator) {
this.creator = creator;
}
+
+ public int[] getFailureDetectionPorts() {
+ return this.failureDetectionPorts;
+ }
+
+ public int getFailureDetectionPort(InternalDistributedMember mbr) {
+ int idx = members.indexOf(mbr);
+ if (idx < 0 || failureDetectionPorts == null || idx >= failureDetectionPorts.length) {
+ return -1;
+ }
+ return failureDetectionPorts[idx];
+ }
+
+ public void setFailureDetectionPort(InternalDistributedMember mbr, int port) {
+ int idx = members.indexOf(mbr);
+ if (idx < 0) {
+ throw new IllegalArgumentException("element not found in members list:" + mbr);
+ }
+ ensureFDCapacity(idx);
+ failureDetectionPorts[idx] = port;
+ }
+ /**
+ * ensures that there is a slot at idx to store an int
+ */
+ private void ensureFDCapacity(int idx) {
+ if (failureDetectionPorts == null) {
+ failureDetectionPorts = new int[idx+10];
+ Arrays.fill(failureDetectionPorts, -1);
+ } else if (idx >= failureDetectionPorts.length) {
+ int[] p = new int[idx+10];
+ if (failureDetectionPorts.length > 0) {
+ System.arraycopy(failureDetectionPorts, 0, p, 0, failureDetectionPorts.length);
+ }
+ Arrays.fill(p, idx, idx+9, -1);
+ failureDetectionPorts = p;
+ }
+ }
+
public List<InternalDistributedMember> getMembers() {
return Collections.unmodifiableList(this.members);
}
@@ -140,6 +185,9 @@ public class NetView implements DataSerializableFixedID {
public void add(InternalDistributedMember mbr) {
this.hashedMembers.add(mbr);
this.members.add(mbr);
+ int idx = members.size()-1;
+ ensureFDCapacity(idx);
+ this.failureDetectionPorts[idx] = -1;
}
public void addCrashedMembers(Set<InternalDistributedMember> mbr) {
@@ -148,12 +196,19 @@ public class NetView implements DataSerializableFixedID {
public boolean remove(InternalDistributedMember mbr) {
this.hashedMembers.remove(mbr);
+ int idx = this.members.indexOf(mbr);
+ if (idx >= 0) {
+ System.arraycopy(failureDetectionPorts, idx+1, failureDetectionPorts, idx, failureDetectionPorts.length-idx-1);
+ failureDetectionPorts[failureDetectionPorts.length-1] = -1;
+ }
return this.members.remove(mbr);
}
public void removeAll(Collection<InternalDistributedMember> ids) {
this.hashedMembers.removeAll(ids);
- this.members.removeAll(ids);
+ for (InternalDistributedMember mbr: ids) {
+ remove(mbr);
+ }
}
public boolean contains(InternalDistributedMember mbr) {
@@ -442,6 +497,7 @@ public class NetView implements DataSerializableFixedID {
writeAsArrayList(members, out);
InternalDataSerializer.writeSet(shutdownMembers, out);
InternalDataSerializer.writeSet(crashedMembers, out);
+ DataSerializer.writeIntArray(failureDetectionPorts, out);
}
@Override
@@ -452,6 +508,7 @@ public class NetView implements DataSerializableFixedID {
this.hashedMembers = new HashSet<InternalDistributedMember>(members);
shutdownMembers = InternalDataSerializer.readHashSet(in);
crashedMembers = InternalDataSerializer.readHashSet(in);
+ failureDetectionPorts = DataSerializer.readIntArray(in);
}
/** this will deserialize as an ArrayList */
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
index 17a6721..e1041f2 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
@@ -98,6 +98,26 @@ public class GMSUtil {
return sb.toString();
}
+
+ /**
+ * Formats the bytes in a buffer into hex octets, 50 per
+ * line
+ */
+ private String formatBytes(byte[] buf) {
+ StringBuilder w = new StringBuilder(20000);
+ for (int i=0; i<buf.length; i++) {
+ String s = Integer.toHexString(buf[i]&0xff);
+ if (s.length() == 1) {
+ w.append('0');
+ }
+ w.append(s).append(' ');
+ if ( (i%50) == 49 ) {
+ w.append("\n");
+ }
+ }
+ return w.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index f79888e..3f5db38 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -14,6 +14,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -137,10 +138,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
private ExecutorService serverSocketExecutor;
private static final int OK = 0x01;
private static final int ERROR = 0x02;
- private InetAddress ip;
+ private InetAddress socketAddress;
private volatile int socketPort;
private volatile ServerSocket serverSocket;
- private Map<InternalDistributedMember, InetSocketAddress> socketInfo = new ConcurrentHashMap<InternalDistributedMember, InetSocketAddress>();
public GMSHealthMonitor() {
@@ -309,14 +309,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// messages by returning true.
return true;
}
- //TODO: need to some tcp check
logger.trace("Checking member {}", pingMember);
final CheckRequestMessage prm = constructCheckRequestMessage(pingMember);
final Response pingResp = new Response();
requestIdVsResponse.put(prm.getRequestId(), pingResp);
try {
Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(prm);
- // TODO: send is throwing exception
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(pingMember)) {
// member is not part of current view.
logger.trace("Member {} is not part of current view.", pingMember);
@@ -359,7 +357,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* @param suspectMember member that does not respond to CheckRequestMessage
* @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
*/
- private boolean doTCPCheckMember(InternalDistributedMember suspectMember, InetSocketAddress addr) {
+ private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
logger.trace("Checking member {} with TCP socket connection.", suspectMember);
Socket clientSocket = new Socket();
try {
@@ -367,7 +365,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// for (Map.Entry<InternalDistributedMember, InetSocketAddress> entry : socketInfo.entrySet()) {
// logger.info("socketInfo member:" + entry.getKey() + " port:" + entry.getValue().getPort());
// }
- logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, addr.getAddress(), addr.getPort());
+ logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
+ InetSocketAddress addr = new InetSocketAddress(suspectMember.getInetAddress(), port);
clientSocket.connect(addr, (int) services.getConfig().getMemberTimeout());
if (clientSocket.isConnected()) {
clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
@@ -442,7 +441,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
- Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Member-Check Scheduler ");
+ Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Failure Detection Scheduler");
th.setDaemon(true);
return th;
}
@@ -455,7 +454,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public Thread newThread(Runnable r) {
int id = threadIdx.getAndIncrement();
- Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Member-Check Thread " + id);
+ Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Failure Detection thread " + id);
th.setDaemon(true);
return th;
}
@@ -487,7 +486,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public Thread newThread(Runnable r) {
int id = threadIdx.getAndIncrement();
- Thread th = new Thread(Services.getThreadGroup(), r, "TCP Check ServerSocket Thread " + id);
+ Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Failure Detection Server thread " + id);
th.setDaemon(true);
return th;
}
@@ -501,16 +500,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
// start server socket for TCP check
if (serverSocket == null) {
localAddress = services.getMessenger().getMemberID();
- ip = localAddress.getInetAddress();
+ socketAddress = localAddress.getInetAddress();
int[] portRange = services.getConfig().getMembershipPortRange();
socketPort = AvailablePort.getAvailablePortInRange(portRange[0], portRange[1], AvailablePort.SOCKET);
if (socketPort == -1) {
throw new SystemConnectException("Unable to find a free port in the membership port range");
}
serverSocket = new ServerSocket();
- serverSocket.bind(new InetSocketAddress(ip, socketPort));
- logger.info("GMSHealthMonitor started server socket on {}:{}.", ip, socketPort);
- socketInfo.put(localAddress, new InetSocketAddress(ip, socketPort));
+ serverSocket.bind(new InetSocketAddress(socketAddress, socketPort));
+ logger.info("Started failure detection server thread on {}:{}.", socketAddress, socketPort);
while (!services.getCancelCriterion().isCancelInProgress()
&& !GMSHealthMonitor.this.isStopping) {
try {
@@ -521,7 +519,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
socket.setSoTimeout((int) services.getConfig().getMemberTimeout());
new ClientSocketHandler(socket).start();
} catch (IOException e) {
- logger.trace("Unexpected exception", e);
+ if (!isStopping) {
+ logger.trace("Unexpected exception", e);
+ }
try {
if (socket != null) {
socket.close();
@@ -531,7 +531,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
}
}
- logger.info("GMSHealthMonitor server socket has done its jobs.");
+ logger.info("GMSHealthMonitor server thread exiting");
}
} catch (IOException e) {
logger.trace("Unexpected exception", e);
@@ -988,12 +988,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
boolean pinged;
- InetSocketAddress addr = socketInfo.get(mbr);
- if (addr == null || addr.getPort() < 0) {
+ NetView view = currentView;
+ int port = view.getFailureDetectionPort(mbr);
+ if (port <= 0) {
logger.info("Unable to locate failure detection port - requesting a heartbeat");
+ if (logger.isDebugEnabled()) {
+ logger.debug("\ncurrent view: {}\nports: {}", view, Arrays.toString(view.getFailureDetectionPorts()));
+ }
pinged = GMSHealthMonitor.this.doCheckMember(mbr);
} else {
- pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, addr);
+ pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
}
logger.info("Final check {}", pinged? "succeeded" : "failed");
@@ -1154,23 +1158,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void memberShutdown(DistributedMember mbr, String reason) {
- // TODO Auto-generated method stub
-
}
- public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo() {
- return this.socketInfo;
- }
-
- public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers) {
- logger.debug("installSocketInfo members=" + members + " portsForMembers=" + portsForMembers);
- for (int i = 0; i < members.size(); i++) {
- if (portsForMembers.get(i).intValue() == -1) {
- continue;
- }
- InetSocketAddress addr = new InetSocketAddress(members.get(i).getInetAddress(), portsForMembers.get(i).intValue());
- socketInfo.put(members.get(i), addr);
- }
+ @Override
+ public int getFailureDetectionPort() {
+ return this.socketPort;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
index 628e416..c55f1d7 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/HealthMonitor.java
@@ -41,15 +41,9 @@ public interface HealthMonitor extends Service {
public void memberShutdown(DistributedMember mbr, String reason);
/**
- * Returns a map that describes the members and their server sockets
+ * Returns the failure detection port for this member, or -1 if
+ * there is no such port
*/
- public Map<InternalDistributedMember, InetSocketAddress> getSocketInfo();
+ public int getFailureDetectionPort();
- /**
- * Update the information of the members and their server sockets
- *
- * @param members
- * @param portsForMembers List of socket ports for each member
- */
- public void installSocketInfo(List<InternalDistributedMember> members, List<Integer> portsForMembers);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 7b5dfc5..0b2abe3 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -15,6 +15,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -23,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -305,14 +305,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// send a join request to the coordinator and wait for a response
InternalDistributedMember coord = state.possibleCoordinator;
if (state.alreadyTried.contains(coord)) {
- logger.info("Probably coordinator is still {} - waiting for a join-response");
+ logger.info("Probable coordinator is still {} - waiting for a join-response", coord);
} else {
logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
- JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord));
- // add server socket port in the join request
- if (services.getHealthMonitor().getSocketInfo().get(localAddress) != null) {
- req.setSocketPort(services.getHealthMonitor().getSocketInfo().get(localAddress).getPort());
- }
+ int port = services.getHealthMonitor().getFailureDetectionPort();
+ JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port);
services.getMessenger().send(req);
}
@@ -356,7 +353,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.localAddress.setVmViewId(this.birthViewId);
GMSMember me = (GMSMember) this.localAddress.getNetMember();
me.setBirthViewId(birthViewId);
- services.getHealthMonitor().installSocketInfo(response.getCurrentView().getMembers(), response.getPortsForMembers());
installView(response.getCurrentView());
}
@@ -408,18 +404,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (!this.localAddress.getNetMember().preferredForCoordinator() &&
incomingRequest.getMemberID().getNetMember().preferredForCoordinator()) {
JoinResponseMessage m = new JoinResponseMessage(incomingRequest.getMemberID(), currentView, true);
- // add socket ports of all members to join response
- List<Integer> portsForMembers = new ArrayList<Integer>(currentView.size());
- Map<InternalDistributedMember, InetSocketAddress> socketInfo = services.getHealthMonitor().getSocketInfo();
- for (InternalDistributedMember mbr : currentView.getMembers()) {
- InetSocketAddress addr = socketInfo.get(mbr);
- if (addr != null) {
- portsForMembers.add(Integer.valueOf(addr.getPort()));
- } else {
- portsForMembers.add(Integer.valueOf(-1));
- }
- }
- m.setPortsForMembers(portsForMembers);
services.getMessenger().send(m);
return;
}
@@ -646,7 +630,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
mbrs.removeAll(leaving);
newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
removals);
-
+ newView.setFailureDetectionPort(this.localAddress, services.getHealthMonitor().getFailureDetectionPort());
}
if (viewCreator == null || viewCreator.isShutdown()) {
viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
@@ -658,10 +642,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView, List<Integer> portsForMembers) {
+ private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
for (InternalDistributedMember mbr : newMbrs) {
JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
- response.setPortsForMembers(portsForMembers);
services.getMessenger().send(response);
}
}
@@ -674,55 +657,18 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
}
- boolean prepareView(NetView view, List<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
- return sendView(view, newMembers, true, this.prepareProcessor, requests);
+ boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) {
+ return sendView(view, newMembers, true, this.prepareProcessor);
}
- void sendView(NetView view, List<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
- sendView(view, newMembers, false, this.viewProcessor, requests);
+ void sendView(NetView view, List<InternalDistributedMember> newMembers) {
+ sendView(view, newMembers, false, this.viewProcessor);
}
- /**
- * Build a list of socket ports for messages, e.g. InstallViewMessage, JoinResponseMessage
- * @param view
- * @param requests
- * @return
- */
- private void addPorts(NetView view, List<DistributionMessage> requests, List<Integer> portsForMembers) {
- Map<InternalDistributedMember, InetSocketAddress> socketInfo = services.getHealthMonitor().getSocketInfo();
- Map<InternalDistributedMember, Integer> portMap = new ConcurrentHashMap<InternalDistributedMember, Integer>();
- for (DistributionMessage req : requests) {
- if (req.getDSFID() == JOIN_REQUEST) {
- JoinRequestMessage joinReq = (JoinRequestMessage) req;
- portMap.put(joinReq.getMemberID(), Integer.valueOf(joinReq.getSocketPort()));
- }
- }
- for (InternalDistributedMember mbr : view.getMembers()) {
- InetSocketAddress addr = socketInfo.get(mbr);
- if (addr != null) {
- portsForMembers.add(Integer.valueOf(addr.getPort()));
- } else {
- Integer port = portMap.get(mbr);
- if (port != null) {
- portsForMembers.add(port);
- } else {
- portsForMembers.add(Integer.valueOf(-1));
- }
- }
- }
- }
-
- boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp,
- List<DistributionMessage> requests) {
+ boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
int id = view.getViewId();
InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing);
Set<InternalDistributedMember> recips = new HashSet<>(view.getMembers());
- // add socket ports of all members to InstallViewMessage
- List<Integer> portsForMembers = new ArrayList<Integer>(view.size());
- if (requests != null) {
- addPorts(view, requests, portsForMembers);
- msg.setPortsForMembers(portsForMembers);
- }
// a recent member was seen not to receive a new view - I think this is why
// recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
@@ -736,9 +682,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (preparing) {
this.preparedView = view;
- if (requests != null) {
- services.getHealthMonitor().installSocketInfo(view.getMembers(), portsForMembers);
- }
} else {
installView(view);
}
@@ -765,8 +708,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// a prepared view announcing the new member
if (!(isNetworkPartition(view) && quorumRequired)) {
List<Integer> newPorts = new ArrayList<Integer>(view.size());
- addPorts(view, requests, newPorts);
- sendJoinResponses(newMembers, view, portsForMembers);
+ sendJoinResponses(newMembers, view);
}
logger.debug("waiting for view responses");
@@ -809,9 +751,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
} else {
this.preparedView = view;
- if (!m.getPortsForMembers().isEmpty()) {
- services.getHealthMonitor().installSocketInfo(view.getMembers(), m.getPortsForMembers());
- }
ackView(m);
}
} else { // !preparing
@@ -1102,14 +1041,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
public void installView(NetView newView) {
- logger.info("received new view: {}\nold view is: {}", newView, currentView);
-
synchronized (viewInstallationLock) {
if (currentView != null && currentView.getViewId() >= newView.getViewId()) {
// old view - ignore it
return;
}
+ logger.info("received new view: {}\nold view is: {}", newView, currentView);
+
if (currentView == null && !this.isJoined) {
for (InternalDistributedMember mbr : newView.getMembers()) {
if (this.localAddress.equals(mbr)) {
@@ -1117,6 +1056,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
this.localAddress.setVmViewId(this.birthViewId);
GMSMember me = (GMSMember) this.localAddress.getNetMember();
me.setBirthViewId(birthViewId);
+ me.setSplitBrainEnabled(mbr.getNetMember().splitBrainEnabled());
isJoined = true;
break;
}
@@ -1633,13 +1573,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
Collection<InternalDistributedMember> recips = new ArrayList<>(v.size() + v.getCrashedMembers().size());
recips.addAll(v.getMembers());
recips.addAll(v.getCrashedMembers());
- List<Integer> ports = new ArrayList<>(v.size());
- for (InternalDistributedMember mbr: v.getMembers()) {
- InetSocketAddress addr = services.getHealthMonitor().getSocketInfo().get(mbr);
- int port = addr==null? -1 : addr.getPort();
- ports.add(Integer.valueOf(port));
- }
- msg.setPortsForMembers(ports);
msg.setRecipients(recips);
services.getMessenger().send(msg);
}
@@ -1694,7 +1627,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (initialView != null) {
try {
prepareAndSendView(initialView, Collections.<InternalDistributedMember>emptyList(), initialLeaving,
- initialRemovals, Collections.<DistributionMessage>emptyList());
+ initialRemovals);
} finally {
this.initialView = null;
this.initialLeaving = null;
@@ -1773,10 +1706,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* Returns false if the view cannot be prepared successfully, true otherwise
*/
void createAndSendView(List<DistributionMessage> requests) {
- List<InternalDistributedMember> joinReqs = new ArrayList<>();
- Set<InternalDistributedMember> leaveReqs = new HashSet<>();
- List<InternalDistributedMember> removalReqs = new ArrayList<>();
- List<String> removalReasons = new ArrayList<String>();
+ List<InternalDistributedMember> joinReqs = new ArrayList<>(10);
+ Map<InternalDistributedMember, Integer> joinPorts = new HashMap<>(10);
+ Set<InternalDistributedMember> leaveReqs = new HashSet<>(10);
+ List<InternalDistributedMember> removalReqs = new ArrayList<>(10);
+ List<String> removalReasons = new ArrayList<String>(10);
NetView oldView = currentView;
List<InternalDistributedMember> oldMembers;
@@ -1793,7 +1727,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
InternalDistributedMember mbr = null;
switch (msg.getDSFID()) {
case JOIN_REQUEST:
- mbr = ((JoinRequestMessage) msg).getMemberID();
+ JoinRequestMessage jmsg = (JoinRequestMessage)msg;
+ mbr = jmsg.getMemberID();
+ int port = jmsg.getFailureDetectionPort();
// see if an old member ID is being reused. If
// so we'll remove it from the new view
for (InternalDistributedMember m : oldMembers) {
@@ -1804,6 +1740,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
if (!joinReqs.contains(mbr)) {
joinReqs.add(mbr);
+ joinPorts.put(mbr, port);
}
break;
case LEAVE_REQUEST_MESSAGE:
@@ -1857,15 +1794,43 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
int viewNumber = 0;
List<InternalDistributedMember> mbrs;
if (currentView == null) {
- mbrs = new ArrayList<InternalDistributedMember>(joinReqs.size());
+ mbrs = new ArrayList<InternalDistributedMember>(joinReqs);
} else {
viewNumber = currentView.getViewId() + 1;
mbrs = new ArrayList<InternalDistributedMember>(oldMembers);
+ mbrs.addAll(joinReqs);
}
- mbrs.addAll(joinReqs);
mbrs.removeAll(leaveReqs);
mbrs.removeAll(removalReqs);
newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, new HashSet<InternalDistributedMember>(removalReqs));
+ int size = joinReqs.size();
+ for (InternalDistributedMember mbr: joinReqs) {
+ if (mbrs.contains(mbr)) {
+ newView.setFailureDetectionPort(mbr, joinPorts.get(mbr));
+ }
+ }
+ if (currentView != null) {
+ int[] ports = currentView.getFailureDetectionPorts();
+ if (ports != null) {
+ int idx = 0;
+ int portsSize = ports.length;
+ for (InternalDistributedMember mbr: currentView.getMembers()) {
+ if (newView.contains(mbr)) {
+ // unit tests create views w/o failure detection ports, so we must check the length
+ // of the array
+ if (idx < portsSize) {
+ newView.setFailureDetectionPort(mbr, ports[idx]);
+ } else {
+ newView.setFailureDetectionPort(mbr, -1);
+ }
+ }
+ idx += 1;
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Established failure detection ports for new view: {}", newView.getFailureDetectionPorts());
+ }
}
// if there are no membership changes then abort creation of
@@ -1883,7 +1848,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
// getting messages from members that have been kicked out
sendRemoveMessages(removalReqs, removalReasons, newView);
- prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers(), requests);
+ prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers());
return;
}
@@ -1892,7 +1857,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
* This handles the 2-phase installation of the view
*/
void prepareAndSendView(NetView newView, List<InternalDistributedMember> joinReqs, Set<InternalDistributedMember> leaveReqs,
- Set<InternalDistributedMember> removalReqs, List<DistributionMessage> requests) {
+ Set<InternalDistributedMember> removalReqs) {
boolean prepared = false;
do {
if (this.shutdown || Thread.currentThread().isInterrupted()) {
@@ -1914,7 +1879,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
return;
}
- prepared = prepareView(newView, joinReqs, requests);
+ prepared = prepareView(newView, joinReqs);
logger.debug("view preparation phase completed. prepared={}", prepared);
NetView conflictingView = prepareProcessor.getConflictingView();
@@ -1944,18 +1909,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
logger.info("adding these crashed members from a conflicting view to the crash-set for the next view: {}\nconflicting view: {}", unresponsive,
conflictingView);
failures.addAll(conflictingView.getCrashedMembers());
- List<InternalDistributedMember> newMembers = prepareProcessor.getConflictingView().getNewMembers();
+ List<InternalDistributedMember> newMembers = conflictingView.getNewMembers();
if (!newMembers.isEmpty()) {
logger.info("adding these new members from a conflicting view to the new view: {}", newMembers);
for (InternalDistributedMember mbr: newMembers) {
- InetSocketAddress addr = services.getHealthMonitor().getSocketInfo().get(mbr);
- // TODO: re-factor health monitor ports to be in the NetView so we don't need
- // to create a fake JoinRequestMessage here
- int port = addr==null? -1 : addr.getPort();
- JoinRequestMessage msg = new JoinRequestMessage(localAddress, mbr, null);
- msg.setSocketPort(port);
- requests.add(msg);
+ int port = conflictingView.getFailureDetectionPort(mbr);
+ JoinRequestMessage msg = new JoinRequestMessage(localAddress, mbr, null, port);
newView.add(mbr);
+ newView.setFailureDetectionPort(mbr, port);
joinReqs.add(mbr);
}
}
@@ -1995,7 +1956,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
lastConflictingView = null;
- sendView(newView, joinReqs, requests);
+ sendView(newView, joinReqs);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
index 1afdf40..a5be893 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
@@ -18,7 +18,6 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
private NetView view;
private Object credentials;
private boolean preparing;
- private List<Integer> portsForMembers = Collections.<Integer>emptyList();
public InstallViewMessage(NetView view, Object credentials) {
this.view = view;
@@ -58,45 +57,12 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
throw new IllegalStateException("this message is not intended to execute in a thread pool");
}
- private void writeListOfInteger(List<Integer> list, DataOutput out) throws IOException {
- int size;
- if (list == null) {
- size = -1;
- } else {
- size = list.size();
- }
- InternalDataSerializer.writeArrayLength(size, out);
- if (size > 0) {
- for (int i = 0; i < size; i++) {
- out.writeInt(list.get(i).intValue());
- }
- }
- }
-
- private List<Integer> readListOfInteger(DataInput in) throws IOException {
- int size = InternalDataSerializer.readArrayLength(in);
- if (size > 0) {
- List<Integer> list = new ArrayList<Integer>(size);
- for (int i = 0; i < size; i++) {
- list.add(Integer.valueOf(in.readInt()));
- }
- return list;
- }
- else if (size == 0) {
- return Collections.<Integer>emptyList();
- }
- else {
- return null;
- }
- }
-
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
DataSerializer.writeObject(this.view, out);
DataSerializer.writeObject(this.credentials, out);
out.writeBoolean(preparing);
- writeListOfInteger(portsForMembers, out);
}
@Override
@@ -105,22 +71,13 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
this.view = DataSerializer.readObject(in);
this.credentials = DataSerializer.readObject(in);
this.preparing = in.readBoolean();
- this.portsForMembers = readListOfInteger(in);
}
@Override
public String toString() {
return "InstallViewMessage(preparing="+this.preparing+"; "+this.view
+"; cred="+(credentials==null?"null": "not null")
- + "portsForMembers: " + portsForMembers
+")";
}
- public List<Integer> getPortsForMembers() {
- return this.portsForMembers;
- }
-
- public void setPortsForMembers(List<Integer> portsForMembers) {
- this.portsForMembers = portsForMembers;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
index 952b20e..3caf9ae 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java
@@ -13,14 +13,15 @@ import com.gemstone.gemfire.internal.Version;
public class JoinRequestMessage extends HighPriorityDistributionMessage {
private InternalDistributedMember memberID;
private Object credentials;
- private int socketPort = -1;
+ private int failureDetectionPort = -1;
public JoinRequestMessage(InternalDistributedMember coord,
- InternalDistributedMember id, Object credentials) {
+ InternalDistributedMember id, Object credentials, int fdPort) {
super();
setRecipient(coord);
this.memberID = id;
this.credentials = credentials;
+ this.failureDetectionPort = fdPort;
}
public JoinRequestMessage() {
@@ -47,7 +48,7 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
@Override
public String toString() {
- return getShortClassName() + "(" + memberID + (credentials==null? ")" : "; with credentials)") + " socketPort:" + socketPort;
+ return getShortClassName() + "(" + memberID + (credentials==null? ")" : "; with credentials)") + " failureDetectionPort:" + failureDetectionPort;
}
@Override
@@ -59,7 +60,7 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
public void toData(DataOutput out) throws IOException {
DataSerializer.writeObject(memberID, out);
DataSerializer.writeObject(credentials, out);
- DataSerializer.writePrimitiveInt(socketPort, out);
+ DataSerializer.writePrimitiveInt(failureDetectionPort, out);
// preserve the multicast setting so the receiver can tell
// if this is a mcast join request
out.writeBoolean(getMulticast());
@@ -69,16 +70,12 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage {
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
memberID = DataSerializer.readObject(in);
credentials = DataSerializer.readObject(in);
- socketPort = DataSerializer.readPrimitiveInt(in);
+ failureDetectionPort = DataSerializer.readPrimitiveInt(in);
setMulticast(in.readBoolean());
}
- public int getSocketPort() {
- return socketPort;
+ public int getFailureDetectionPort() {
+ return failureDetectionPort;
}
- public void setSocketPort(int socketPort) {
- this.socketPort = socketPort;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
index 77b72c3..bafd63e 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java
@@ -22,7 +22,6 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
private InternalDistributedMember memberID;
private Object messengerData;
private boolean becomeCoordinator;
- private List<Integer> portsForMembers = Collections.<Integer>emptyList();
public JoinResponseMessage(InternalDistributedMember memberID, NetView view) {
this.currentView = view;
@@ -78,7 +77,6 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
public String toString() {
return getShortClassName() + "("+memberID + "; "
+ (currentView==null? "" : currentView.toString())
- + "portsForMembers: " + portsForMembers
+ (rejectionMessage==null? "" : ("; "+rejectionMessage))
+ (becomeCoordinator? "; becomeCoordinator" : "")
+ ")";
@@ -94,43 +92,10 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
return JOIN_RESPONSE;
}
- private void writeListOfInteger(List<Integer> list, DataOutput out) throws IOException {
- int size;
- if (list == null) {
- size = -1;
- } else {
- size = list.size();
- }
- InternalDataSerializer.writeArrayLength(size, out);
- if (size > 0) {
- for (int i = 0; i < size; i++) {
- out.writeInt(list.get(i).intValue());
- }
- }
- }
-
- private List<Integer> readListOfInteger(DataInput in) throws IOException {
- int size = InternalDataSerializer.readArrayLength(in);
- if (size > 0) {
- List<Integer> list = new ArrayList<Integer>(size);
- for (int i = 0; i < size; i++) {
- list.add(Integer.valueOf(in.readInt()));
- }
- return list;
- }
- else if (size == 0) {
- return Collections.<Integer>emptyList();
- }
- else {
- return null;
- }
- }
-
@Override
public void toData(DataOutput out) throws IOException {
DataSerializer.writeObject(currentView, out);
DataSerializer.writeObject(memberID, out);
- writeListOfInteger(portsForMembers, out);
out.writeBoolean(becomeCoordinator);
DataSerializer.writeString(rejectionMessage, out);
DataSerializer.writeObject(messengerData, out);
@@ -140,17 +105,9 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage {
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
currentView = DataSerializer.readObject(in);
memberID = DataSerializer.readObject(in);
- portsForMembers = readListOfInteger(in);
becomeCoordinator = in.readBoolean();
rejectionMessage = DataSerializer.readString(in);
messengerData = DataSerializer.readObject(in);
}
- public void setPortsForMembers(List<Integer> portsForMembers) {
- this.portsForMembers = portsForMembers;
- }
-
- public List<Integer> getPortsForMembers() {
- return this.portsForMembers;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ae7ee16..bd21629 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -116,7 +116,7 @@ public class JGroupsMessenger implements Messenger {
Object nakackDigest;
- private NetView view;
+ private volatile NetView view;
private GMSPingPonger pingPonger = new GMSPingPonger();
@@ -863,8 +863,18 @@ public class JGroupsMessenger implements Messenger {
}
public QuorumChecker getQuorumChecker() {
+ NetView view = this.view;
+ if (view == null) {
+ view = services.getJoinLeave().getView();
+ if (view == null) {
+ view = services.getJoinLeave().getPreviousView();
+ if (view == null) {
+ return null;
+ }
+ }
+ }
GMSQuorumChecker qc = new GMSQuorumChecker(
- services.getJoinLeave().getPreviousView(), services.getConfig().getLossThreshold(),
+ view, services.getConfig().getLossThreshold(),
this.myChannel);
qc.initialize();
return qc;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
index 8b09d24..7431fe7 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
@@ -4,6 +4,8 @@ import org.apache.logging.log4j.Logger;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.conf.ClassConfigurator;
+import org.jgroups.protocols.FRAG2;
+import org.jgroups.protocols.FragHeader;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.NakAckHeader2;
@@ -30,6 +32,7 @@ public class StatRecorder extends Protocol {
private final short nakackHeaderId = ClassConfigurator.getProtocolId(NAKACK2.class);
private final short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class);
+ private final short frag2HeaderId = ClassConfigurator.getProtocolId(FRAG2.class);
/**
* set the statistics object to modify when events are detected
@@ -46,6 +49,7 @@ public class StatRecorder extends Protocol {
Message msg = (Message)evt.getArg();
processForMulticast(msg, INCOMING);
processForUnicast(msg, INCOMING);
+ filter(msg, INCOMING);
}
return up_prot.up(evt);
}
@@ -57,6 +61,7 @@ public class StatRecorder extends Protocol {
Message msg = (Message)evt.getArg();
processForMulticast(msg, OUTGOING);
processForUnicast(msg, OUTGOING);
+ filter(msg, OUTGOING);
break;
}
return down_prot.down(evt);
@@ -106,4 +111,12 @@ public class StatRecorder extends Protocol {
}
}
}
+
+ private void filter(Message msg, int direction) {
+ FragHeader hdr = (FragHeader)msg.getHeader(frag2HeaderId);
+ if (hdr != null) {
+ String str = direction == OUTGOING? "sending" : "receiving";
+ logger.debug("{} fragment {} msg offset {} msg size {}", str, hdr, msg.getOffset(), msg.getLength());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
index 8e864c9..6c3003e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java
@@ -33,6 +33,7 @@ import java.net.InetAddress;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -2661,6 +2662,46 @@ public abstract class InternalDataSerializer extends DataSerializer implements D
}
/**
+ * Serializes a list of Integers. The argument may be null. Deserialize with
+ * readListOfIntegers().
+ */
+ public void writeListOfIntegers(List<Integer> list, DataOutput out) throws IOException {
+ int size;
+ if (list == null) {
+ size = -1;
+ } else {
+ size = list.size();
+ }
+ InternalDataSerializer.writeArrayLength(size, out);
+ if (size > 0) {
+ for (int i = 0; i < size; i++) {
+ out.writeInt(list.get(i).intValue());
+ }
+ }
+ }
+
+ /**
+ * Reads a list of integers serialized by writeListOfIntegers. This
+ * will return null if the object serialized by writeListOfIntegers was null.
+ */
+ public List<Integer> readListOfIntegers(DataInput in) throws IOException {
+ int size = InternalDataSerializer.readArrayLength(in);
+ if (size > 0) {
+ List<Integer> list = new ArrayList<Integer>(size);
+ for (int i = 0; i < size; i++) {
+ list.add(Integer.valueOf(in.readInt()));
+ }
+ return list;
+ }
+ else if (size == 0) {
+ return Collections.<Integer>emptyList();
+ }
+ else {
+ return null;
+ }
+ }
+
+ /**
* Reads and discards an array of <code>byte</code>s from a
* <code>DataInput</code>.
*
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
index 15a64d7..d599bc9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
@@ -191,7 +191,9 @@ public class PluckStacks {
}
if (threadName.startsWith("GemFire Membership Timer")) {
// System.out.println("gf timer stack size = " + stackSize + "; frame = " + thread.get(1));
- return stackSize < 9 && thread.get(1).contains("Thread.State: WAITING");
+ return stackSize < 9 &&
+ (thread.get(1).contains("Thread.State: WAITING")
+ || thread.get(1).contains("Thread.State: TIMED_WAITING"));
}
if (threadName.startsWith("GemFire Membership View Creator")) {
// System.out.println("gf view creator stack size = " + stackSize + "; frame = " + thread.get(1));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
index 8779c6d..d2c6441 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/LocatorDUnitTest.java
@@ -1336,6 +1336,7 @@ public class LocatorDUnitTest extends DistributedTestCase {
dsProps.setProperty("locators", locators);
dsProps.setProperty("mcast-port", "0");
dsProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+// dsProps.setProperty("log-level", "finest");
final String uniqueName = getUniqueName();
vm0.invoke(new SerializableRunnable("Start locator on " + port1) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index adf925e..8dee00a 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -165,7 +165,7 @@ public class GMSJoinLeaveJUnitTest {
public void testProcessJoinMessageRejectOldMemberVersion() throws IOException {
initMocks();
- gmsJoinLeave.processMessage(new JoinRequestMessage(mockOldMember, mockOldMember, null));
+ gmsJoinLeave.processMessage(new JoinRequestMessage(mockOldMember, mockOldMember, null, -1));
assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
verify(messenger).send(any(JoinResponseMessage.class));
}
@@ -177,7 +177,7 @@ public class GMSJoinLeaveJUnitTest {
when(authenticator.authenticate(mockMembers[0], credentials)).thenThrow(new AuthenticationFailedException("we want to fail auth here"));
when(services.getMessenger()).thenReturn(messenger);
- gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials));
+ gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], credentials, -1));
assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
verify(messenger).send(any(JoinResponseMessage.class));
}
@@ -189,7 +189,7 @@ public class GMSJoinLeaveJUnitTest {
when(authenticator.authenticate(mockMembers[0], null)).thenThrow(new AuthenticationFailedException("we want to fail auth here"));
when(services.getMessenger()).thenReturn(messenger);
- gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], null));
+ gmsJoinLeave.processMessage(new JoinRequestMessage(mockMembers[0], mockMembers[0], null, -1));
assertTrue("JoinRequest should not have been added to view request", gmsJoinLeave.getViewRequests().size() == 0);
verify(messenger).send(any(JoinResponseMessage.class));
}
@@ -399,10 +399,10 @@ public class GMSJoinLeaveJUnitTest {
gmsJoinLeave.getView().add(gmsJoinLeaveMemberId);
gmsJoinLeave.getView().add(mockMembers[1]);
gmsJoinLeave.becomeCoordinatorForTest();
- JoinRequestMessage msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null);
+ JoinRequestMessage msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1);
msg.setSender(mockMembers[2]);
gmsJoinLeave.processMessage(msg);
- msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null);
+ msg = new JoinRequestMessage(gmsJoinLeaveMemberId, mockMembers[2], null, -1);
msg.setSender(mockMembers[2]);
gmsJoinLeave.processMessage(msg);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java
index 4cf4237..635d91d 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/InterceptUDP.java
@@ -2,6 +2,8 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
import java.net.UnknownHostException;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import org.jgroups.Address;
@@ -34,6 +36,9 @@ public class InterceptUDP extends Protocol {
int unicastSentDataMessages;
int mcastSentDataMessages;
+ boolean collectMessages = false;
+ List<Message> collectedMessages = new LinkedList<>();
+
public InterceptUDP() {
// uuid = new UUID();
// try {
@@ -64,6 +69,9 @@ public class InterceptUDP extends Protocol {
}
private void handleMessage(Message msg) {
+ if (collectMessages) {
+ collectedMessages.add(msg);
+ }
Object o = msg.getHeader(nakackHeaderId);
if (o != null) {
mcastSentDataMessages++;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 2b9d451..dfd7779 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -13,6 +13,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
+import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -20,6 +21,8 @@ import junit.framework.Assert;
import org.jgroups.Event;
import org.jgroups.Message;
+import org.jgroups.conf.ClassConfigurator;
+import org.jgroups.protocols.UNICAST3;
import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Test;
@@ -151,7 +154,7 @@ public class JGroupsMessengerJUnitTest {
when(joinLeave.getView()).thenReturn(v);
InternalDistributedMember sender = createAddress(8888);
- JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null);
+ JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1);
Message jmsg = messenger.createJGMessage(msg, messenger.jgAddress, Version.CURRENT_ORDINAL);
interceptor.up(new Event(Event.MSG, jmsg));
@@ -191,7 +194,7 @@ public class JGroupsMessengerJUnitTest {
NetView v = new NetView(sender);
when(joinLeave.getView()).thenReturn(v);
messenger.installView(v);
- JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null);
+ JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, null, -1);
if (mcastMsg) {
msg.setMulticast(true);
}
@@ -203,13 +206,95 @@ public class JGroupsMessengerJUnitTest {
sentMessages == 1);
// send a big message and expect fragmentation
- msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))]);
+ msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))], -1);
+
+ // configure an incoming message handler for JoinRequestMessage
+ final DistributionMessage[] messageReceived = new DistributionMessage[1];
+ MessageHandler handler = new MessageHandler() {
+ @Override
+ public void processMessage(DistributionMessage m) {
+ messageReceived[0] = m;
+ }
+ };
+ messenger.addHandler(JoinRequestMessage.class, handler);
+
+ // configure the outgoing message interceptor
+ interceptor.unicastSentDataMessages = 0;
+ interceptor.collectMessages = true;
+ interceptor.collectedMessages.clear();
+
+ messenger.send(msg);
+
+ assertTrue("expected 2 messages to be sent but found "+ interceptor.unicastSentDataMessages,
+ interceptor.unicastSentDataMessages == 2);
+
+ List<Message> messages = new ArrayList<>(interceptor.collectedMessages);
+ UUID fakeMember = new UUID(50, 50);
+ short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class);
+ int seqno = 1;
+ for (Message m: messages) {
+ m.setSrc(fakeMember);
+ UNICAST3.Header oldHeader = (UNICAST3.Header)m.getHeader(unicastHeaderId);
+ if (oldHeader == null) continue;
+ UNICAST3.Header newHeader = UNICAST3.Header.createDataHeader(seqno, oldHeader.connId(), seqno==1);
+ seqno += 1;
+ m.putHeader(unicastHeaderId, newHeader);
+ interceptor.up(new Event(Event.MSG, m));
+ }
+ Thread.sleep(5000);
+ System.out.println("received message = " + messageReceived[0]);
+ }
+
+ @Test
+ public void testDefragmentation() throws Exception {
+ initMocks(false);
+ MessageHandler mh = mock(MessageHandler.class);
+ messenger.addHandler(JoinRequestMessage.class, mh);
+
+ InternalDistributedMember sender = messenger.getMemberID();
+ NetView v = new NetView(sender);
+ when(joinLeave.getView()).thenReturn(v);
+ messenger.installView(v);
+
+ // configure an incoming message handler for JoinRequestMessage
+ final DistributionMessage[] messageReceived = new DistributionMessage[1];
+ MessageHandler handler = new MessageHandler() {
+ @Override
+ public void processMessage(DistributionMessage m) {
+ messageReceived[0] = m;
+ }
+ };
+ messenger.addHandler(JoinRequestMessage.class, handler);
+ // configure the outgoing message interceptor
interceptor.unicastSentDataMessages = 0;
+ interceptor.collectMessages = true;
+ interceptor.collectedMessages.clear();
+
+ JoinRequestMessage msg = new JoinRequestMessage(messenger.localAddress, sender, new byte[(int)(services.getConfig().getDistributionConfig().getUdpFragmentSize()*(1.5))], -1);
messenger.send(msg);
+
assertTrue("expected 2 messages to be sent but found "+ interceptor.unicastSentDataMessages,
interceptor.unicastSentDataMessages == 2);
+ // take the fragments and mess with them so they are coming from a new
+ // "fakeMember", feeding them back up the JGroups stack so that the messenger
+ // will receive them
+ List<Message> messages = new ArrayList<>(interceptor.collectedMessages);
+ UUID fakeMember = new UUID(50, 50);
+ short unicastHeaderId = ClassConfigurator.getProtocolId(UNICAST3.class);
+ int seqno = 1;
+ for (Message m: messages) {
+ m.setSrc(fakeMember);
+ UNICAST3.Header oldHeader = (UNICAST3.Header)m.getHeader(unicastHeaderId);
+ if (oldHeader == null) continue;
+ UNICAST3.Header newHeader = UNICAST3.Header.createDataHeader(seqno, oldHeader.connId(), seqno==1);
+ seqno += 1;
+ m.putHeader(unicastHeaderId, newHeader);
+ interceptor.up(new Event(Event.MSG, m));
+ }
+ Thread.sleep(5000);
+ System.out.println("received message = " + messageReceived[0]);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c152e20b/gemfire-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt b/gemfire-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
index 9fea5d8..52f022e 100644
--- a/gemfire-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
+++ b/gemfire-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
@@ -341,20 +341,20 @@ fromData,73,2a2bb900270100b500022a2bb900270100b500032a2bb900270100b500042a2bb800
toData,97,2b2ab40002b9002002002b2ab40003b9002002002b2ab40004b9002002002ab400092bb800212ab400072bb800222ab4000ac700081208a7000a2ab4000ab600232bb800212ab4000ac7000911012ca7000a2ab4000ab60024b800252bb80026b1
com/gemstone/gemfire/distributed/internal/membership/NetView,2
-fromData,61,2a2bb8004ec00018b5000d2a2bb9004f0100b500022a2bb80050b500052abb0006592ab40005b70007b500082a2bb80051b5000a2a2bb80051b5000cb1
-toData,44,2ab4000d2bb8004a2b2ab40002b9004b02002a2ab400052bb7004c2ab4000a2bb8004d2ab4000c2bb8004db1
+fromData,69,2a2bb80058c00025b5000e2a2bb900590100b500032a2bb8005ab500062abb0007592ab40006b70008b500092a2bb8005bb5000b2a2bb8005bb5000d2a2bb8005cb50002b1
+toData,52,2ab4000e2bb800532b2ab40003b9005402002a2ab400062bb700552ab4000b2bb800562ab4000d2bb800562ab400022bb80057b1
com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember,2
-fromData,150,2a2bb80043b500182bb9004401003d2a1c047e99000704a7000403b500152a1c057e99000704a7000403b500142a2bb80045b500172a2bb900440100b500022a2bb900440100b500032a2bb900440100b500052a2bb900460100b500162a2bb900440100b500062a2bb900440100b500072a2bb80047b500082a2bb80048b500092a2bb900490100b5001a2a2bb900490100b50019b1
-toData,145,2b2ab4001804b8003c033d2ab400159900071c04803d2ab400149900071c05803d2b1cb9003d02002ab400172bb8003e2b2ab40002b9003d02002b2ab40003b9003d02002b2ab40005b9003d02002b2ab40016b9003f02002b2ab40006b9003d02002b2ab40007b9003d02002ab400082bb800402ab400092bb800412b2ab4001ab9004203002b2ab40019b900420300b1
+fromData,150,2a2bb80044b500182bb9004501003d2a1c047e99000704a7000403b500152a1c057e99000704a7000403b500142a2bb80046b500172a2bb900470100b500022a2bb900470100b500032a2bb900470100b500052a2bb900480100b500162a2bb900480100b500072a2bb900470100b500062a2bb80049b500082a2bb8004ab500092a2bb9004b0100b5001a2a2bb9004b0100b50019b1
+toData,145,2b2ab4001804b8003c033d2ab400159900071c04803d2ab400149900071c05803d2b1cb9003d02002ab400172bb8003e2b2ab40002b9003f02002b2ab40003b9003f02002b2ab40005b9003f02002b2ab40016b9004002002b2ab40007b9004002002b2ab40006b9003f02002ab400082bb800412ab400092bb800422b2ab4001ab9004303002b2ab40019b900430300b1
com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest,2
fromData,71,2a2bb80016c00015b500022bb9001701003d2abb0018591cb70019b50003033e1d1ca2001a2ab400032bb80016c00015b9001a020057840301a7ffe72a2bb900170100b50004b1
toData,88,2ab400022bb8000f2ab40003c6003a2b2ab40003b900100100b9001102002ab40003b9001201004d2cb9001301009900152cb900140100c000154e2d2bb8000fa7ffe8a7000a2b03b9001102002b2ab40004b900110200b1
com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse,2
-fromData,89,2a2bb8001fc00020b500022a2bb8001fc00020b500032a2bb900210100b500092ab400099a00342a2bb900210100b500042a2bb900210100b500072a2bb900210100b500082a2bb8001fc00022b500052a2bb80023b50006b1
-toData,73,2ab400022bb8001c2ab400032bb8001c2b2ab40009b9001d02002b2ab40004b9001d02002b2ab40007b9001d02002b2ab40008b9001d02002ab400052bb8001c2ab400062bb8001eb1
+fromData,89,2a2bb80020c00021b500022a2bb80020c00021b500032a2bb900220100b500092ab400099a00342a2bb900220100b500042a2bb900220100b500072a2bb900220100b500082a2bb80020c00023b500052a2bb80024b50006b1
+toData,73,2ab400022bb8001d2ab400032bb8001d2b2ab40009b9001e02002b2ab40004b9001e02002b2ab40007b9001e02002b2ab40008b9001e02002ab400052bb8001d2ab400062bb8001fb1
com/gemstone/gemfire/distributed/internal/membership/gms/locator/GetViewRequest,2
fromData,1,b1
@@ -373,16 +373,16 @@ fromData,11,2a2bb9000e0100b50002b1
toData,11,2b2ab40002b9000d0200b1
com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage,2
-fromData,44,2a2bb7001a2a2bb8001bc0001cb500042a2bb8001bb500062a2bb9001d0100b500052a2a2bb7001eb50003b1
-toData,41,2a2bb700162ab400042bb800172ab400062bb800172b2ab40005b9001802002a2ab400032bb70019b1
+fromData,35,2a2bb7000b2a2bb8000cc0000db500022a2bb8000cb500042a2bb9000e0100b50003b1
+toData,32,2a2bb700082ab400022bb800092ab400042bb800092b2ab40003b9000a0200b1
com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage,2
fromData,38,2a2bb80018c00019b500042a2bb80018b500052a2bb8001ab500022a2bb9001b0100b6001cb1
toData,35,2ab400042bb800142ab400052bb800142ab400022bb800152b2ab60016b900170200b1
com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage,2
-fromData,58,2a2bb8002ac0002bb500042a2bb8002ac0002cb500052a2a2bb7002db500032a2bb9002e0100b500072a2bb8002fb500082a2bb8002ab50009b1
-toData,52,2ab400042bb800262ab400052bb800262a2ab400032bb700272b2ab40007b9002802002ab400082bb800292ab400092bb80026b1
+fromData,49,2a2bb8001ac0001bb500022a2bb8001ac0001cb500032a2bb9001d0100b500052a2bb8001eb500062a2bb8001ab50007b1
+toData,43,2ab400022bb800172ab400032bb800172b2ab40005b9001802002ab400062bb800192ab400072bb80017b1
com/gemstone/gemfire/distributed/internal/membership/gms/messages/LeaveRequestMessage,2
fromData,20,2a2bb8000bc0000cb500032a2bb8000db50004b1