You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/06/14 17:48:10 UTC
incubator-geode git commit: GEODE-1542 shared/unordered tcp/ip
connection times out, initiating suspicion
Repository: incubator-geode
Updated Branches:
refs/heads/develop 4afc5b153 -> 33ceb3715
GEODE-1542 shared/unordered tcp/ip connection times out, initiating suspicion
This disables timing out of shared/unordered TcpConduit connections. We don't
want them to time out because we are using them to initiate suspect processing
on other members.
The ticket also pointed out a problem with the "final check" mechanism in
the health monitor. I tracked that down to improper use of SocketCreator
to create the server-socket in GMSHealthMonitor. It was creating sn SSL
socket if SSL is enabled but the client-side of the check uses non-SSL
sockets. I changed the server to use non-SSL sockets as well since no
useful information is sent over the final-check TCP/IP connections & they
need to be lightweight and fast.
While looking at logs I also found that the heartbeat request sent at the
beginning of a final-check had a request-ID even though it's not waiting
for a response. That causes processing of the response to do more work
than necessary so I changed it to remove the request-ID from the message.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/33ceb371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/33ceb371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/33ceb371
Branch: refs/heads/develop
Commit: 33ceb371554a13c7643ddaf9488ffa83963de1e7
Parents: 4afc5b1
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Jun 14 10:46:16 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Jun 14 10:47:53 2016 -0700
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 75 ++++++++++----------
.../membership/gms/membership/GMSJoinLeave.java | 8 +--
.../gms/messages/HeartbeatRequestMessage.java | 8 +++
.../gemfire/internal/SocketCreator.java | 35 +++++++--
.../gemfire/internal/tcp/Connection.java | 4 +-
5 files changed, 81 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index f27e0b8..203d9ce 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -19,6 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.fd;
import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+import static com.sun.corba.se.impl.naming.cosnaming.NamingUtils.debug;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -195,7 +196,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return timeStamp;
}
- public void setTimeStamp(long timeStamp) {
+ public void setTime(long timeStamp) {
this.timeStamp = timeStamp;
}
}
@@ -289,48 +290,41 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
long uuidMSBs = in.readLong();
GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived();
GMSHealthMonitor.this.stats.incTcpFinalCheckRequestsReceived();
- boolean debug = logger.isDebugEnabled();
GMSMember gmbr = (GMSMember) GMSHealthMonitor.this.localAddress.getNetMember();
UUID myUUID = gmbr.getUUID();
// during reconnect or rapid restart we will have a zero viewId but there may still
// be an old ID in the membership view that we do not want to respond to
int myVmViewId = gmbr.getVmViewId();
- if (debug) {
- if (playingDead) {
- logger.debug("simulating sick member in health check");
- } else if (vmViewId == myVmViewId
- && uuidLSBs == myUUID.getLeastSignificantBits()
- && uuidMSBs == myUUID.getMostSignificantBits()) {
- logger.debug("UUID matches my own - sending OK reply");
- } else {
- logger.debug("GMSHealthMonitor my UUID is {},{} received is {},{}. My viewID is {} received is {}",
- Long.toHexString(myUUID.getMostSignificantBits()),
- Long.toHexString(myUUID.getLeastSignificantBits()),
- Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs),
- myVmViewId, vmViewId);
- }
- }
- if (!playingDead
- && uuidLSBs == myUUID.getLeastSignificantBits()
- && uuidMSBs == myUUID.getMostSignificantBits()
- && vmViewId == myVmViewId) {
+ if (playingDead) {
+ logger.debug("HealthMonitor: simulating sick member in health check");
+ } else if (uuidLSBs == myUUID.getLeastSignificantBits()
+ && uuidMSBs == myUUID.getMostSignificantBits()
+ && vmViewId == myVmViewId) {
+ logger.debug("HealthMonitor: sending OK reply");
out.write(OK);
out.flush();
socket.shutdownOutput();
GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
if (debug) {
- logger.debug("GMSHealthMonitor server socket replied OK.");
+ logger.debug("HealthMonitor: server replied OK.");
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}. My viewID is {} received is {}",
+ Long.toHexString(myUUID.getMostSignificantBits()),
+ Long.toHexString(myUUID.getLeastSignificantBits()),
+ Long.toHexString(uuidMSBs),
+ Long.toHexString(uuidLSBs),
+ myVmViewId, vmViewId);
}
- }
- else {
out.write(ERROR);
out.flush();
socket.shutdownOutput();
GMSHealthMonitor.this.stats.incFinalCheckResponsesSent();
GMSHealthMonitor.this.stats.incTcpFinalCheckResponsesSent();
if (debug) {
- logger.debug("GMSHealthMonitor server socket replied ERROR.");
+ logger.debug("HealthMonitor: server replied ERROR.");
}
}
} catch (IOException e) {
@@ -376,8 +370,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
private void contactedBy(InternalDistributedMember sender, long timeStamp) {
TimeStamp cTS = new TimeStamp(timeStamp);
cTS = memberTimeStamps.putIfAbsent(sender, cTS);
- if (cTS != null) {
- cTS.setTimeStamp(timeStamp);
+ if (cTS != null && cTS.getTime() < timeStamp) {
+ cTS.setTime(timeStamp);
}
if (suspectedMemberInView.remove(sender) != null) {
logger.info("No longer suspecting {}", sender);
@@ -405,8 +399,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void run() {
- // TODO GemFire used the tcp/ip connection but this is using heartbeats
-
boolean pinged = false;
try {
pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
@@ -455,9 +447,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
long startTime = System.currentTimeMillis();
logger.trace("Checking member {}", member);
final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
- final Response pingResp = new Response();
- if(waitForResponse) {
+ Response pingResp = null;
+ if (waitForResponse) {
+ pingResp = new Response();
requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+ } else {
+ hrm.clearRequestId();
}
try {
Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
@@ -484,7 +479,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.trace("received heartbeat from {}", member);
this.stats.incHeartbeatsReceived();
if (ts != null) {
- ts.setTimeStamp(System.currentTimeMillis());
+ ts.setTime(System.currentTimeMillis());
}
return true;
}
@@ -547,12 +542,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
GMSMember gmbr = (GMSMember) suspectMember.getNetMember();
writeMemberToStream(gmbr, out);
- clientSocket.shutdownOutput();
this.stats.incFinalCheckRequestsSent();
this.stats.incTcpFinalCheckRequestsSent();
- logger.debug("Connected - reading response from suspect member {}", suspectMember);
+ logger.debug("Connected to suspect member - reading response");
int b = in.read();
- logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + b)));
+ }
if (b >= 0) {
this.stats.incFinalCheckResponsesReceived();
this.stats.incTcpFinalCheckResponsesReceived();
@@ -560,7 +556,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (b == OK) {
TimeStamp ts = memberTimeStamps.get(suspectMember);
if (ts != null) {
- ts.setTimeStamp(System.currentTimeMillis());
+ ts.setTime(System.currentTimeMillis());
}
return true;
} else {
@@ -659,7 +655,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
ServerSocket serverSocket = null;
try {
- serverSocket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange);
+ serverSocket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/,
+ true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false);
socketPort = serverSocket.getLocalPort();
} catch (IOException e) {
throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e);
@@ -1285,8 +1282,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
//this will just send heartbeat request, it will not wait for response
//if we will get heartbeat then it will change the timestamp, which we are
//checking below in case of tcp check failure..
- GMSHealthMonitor.this.doCheckMember(mbr, false);
- pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
+ doCheckMember(mbr, false);
+ pinged = doTCPCheckMember(mbr, port);
}
if (!pinged && !isStopping) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 8dce1a5..87fac53 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -533,7 +533,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
- logger.debug("JoinLeave is checking to see if I should become coordinator");
+ logger.debug("Checking to see if I should become coordinator");
NetView check = new NetView(v, v.getViewId() + 1);
check.remove(incomingRequest.getMemberID());
synchronized (removedMembers) {
@@ -600,7 +600,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
- logger.debug("JoinLeave is checking to see if I should become coordinator");
+ logger.debug("Checking to see if I should become coordinator");
NetView check = new NetView(v, v.getViewId() + 1);
synchronized (removedMembers) {
removedMembers.add(mbr);
@@ -630,7 +630,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
}
private void recordViewRequest(DistributionMessage request) {
- logger.debug("JoinLeave is recording the request to be processed in the next membership view");
+ logger.debug("Recording the request to be processed in the next membership view");
synchronized (viewRequests) {
viewRequests.add(request);
viewRequests.notifyAll();
@@ -1441,7 +1441,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
if (view != null) {
if (view.size() > 1) {
List<InternalDistributedMember> coords = view.getPreferredCoordinators(Collections.<InternalDistributedMember>emptySet(), localAddress, 5);
- logger.debug("JoinLeave sending a leave request to {}", coords);
+ logger.debug("Sending my leave request to {}", coords);
LeaveRequestMessage m = new LeaveRequestMessage(coords, this.localAddress, "this member is shutting down");
services.getMessenger().send(m);
} // view.size
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
index 3c08e33..e0f4515 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
@@ -41,6 +41,14 @@ public class HeartbeatRequestMessage extends HighPriorityDistributionMessage{
public InternalDistributedMember getTarget() {
return target;
}
+
+ /**
+ * If no response is desired the requestId can be reset by invoking
+ * this method
+ */
+ public void clearRequestId() {
+ requestId = -1;
+ }
@Override
public int getDSFID() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index 367d4a7..739028c 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -16,6 +16,8 @@
*/
package com.gemstone.gemfire.internal;
+import static javafx.scene.input.KeyCode.L;
+
import com.gemstone.gemfire.GemFireConfigException;
import com.gemstone.gemfire.SystemConnectException;
import com.gemstone.gemfire.SystemFailure;
@@ -765,16 +767,22 @@ public class SocketCreator {
* SSL configuration is left up to JSSE properties in java.security file.
*/
public ServerSocket createServerSocket( int nport, int backlog, InetAddress bindAddr ) throws IOException {
- return createServerSocket( nport, backlog, bindAddr, -1 );
+ return createServerSocket( nport, backlog, bindAddr, -1, useSSL);
}
public ServerSocket createServerSocket(int nport, int backlog,
- InetAddress bindAddr, int socketBufferSize)
+ InetAddress bindAddr, int socketBufferSize)
+ throws IOException {
+ return createServerSocket(nport, backlog, bindAddr, socketBufferSize, useSSL);
+ }
+
+ private ServerSocket createServerSocket(int nport, int backlog,
+ InetAddress bindAddr, int socketBufferSize, boolean sslConnection)
throws IOException {
// rw.readLock().lockInterruptibly();
// try {
printConfig();
- if ( this.useSSL ) {
+ if ( sslConnection ) {
if (this.sslContext == null) {
throw new GemFireConfigException("SSL not configured correctly, Please look at previous error");
}
@@ -830,7 +838,23 @@ public class SocketCreator {
public ServerSocket createServerSocketUsingPortRange(InetAddress ba, int backlog,
boolean isBindAddress, boolean useNIO, int tcpBufferSize, int[] tcpPortRange)
throws IOException {
-
+ return createServerSocketUsingPortRange(ba, backlog, isBindAddress, useNIO, tcpBufferSize, tcpPortRange, this.useSSL);
+ }
+
+ /**
+ * Creates or bind server socket to a random port selected
+ * from tcp-port-range which is same as membership-port-range.
+ * @param ba
+ * @param backlog
+ * @param isBindAddress
+ * @param tcpBufferSize
+ * @param sslConnection whether to connect using SSL
+ * @return Returns the new server socket.
+ * @throws IOException
+ */
+ public ServerSocket createServerSocketUsingPortRange(InetAddress ba, int backlog,
+ boolean isBindAddress, boolean useNIO, int tcpBufferSize, int[] tcpPortRange, boolean sslConnection)
+ throws IOException {
ServerSocket socket = null;
int localPort = 0;
int startingPort = 0;
@@ -862,7 +886,8 @@ public class SocketCreator {
InetSocketAddress addr = new InetSocketAddress(isBindAddress ? ba : null, localPort);
socket.bind(addr, backlog);
} else {
- socket = SocketCreator.getDefaultInstance().createServerSocket(localPort, backlog, isBindAddress? ba : null, tcpBufferSize);
+ socket = SocketCreator.getDefaultInstance()
+ .createServerSocket(localPort, backlog, isBindAddress? ba : null, tcpBufferSize, sslConnection);
}
break;
} catch (java.net.SocketException ex) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/33ceb371/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 85e3511..6528877 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -569,7 +569,9 @@ public class Connection implements Runnable {
if (isSocketClosed()) {
return true;
}
- if (isSocketInUse()) {
+ if (isSocketInUse()
+ || (this.sharedResource && !this.preserveOrder)) { // shared/unordered connections are used for failure-detection
+ // and are not subject to idle-timeout
return false;
}
boolean isIdle = !this.accessed;