You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/01 01:09:26 UTC
[17/34] incubator-geode git commit: GEODE-1004 request heartbeat
request with tcp check
GEODE-1004 request heartbeat request with tcp check
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9a8b3199
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9a8b3199
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9a8b3199
Branch: refs/heads/feature/GEODE-949-2
Commit: 9a8b319961e79ffb15caa5292ae3cad5e72de8f0
Parents: 3dda21e
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Fri Feb 26 10:41:42 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Fri Feb 26 10:41:42 2016 -0800
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 24 ++++++---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 57 ++++++++++++++++++--
2 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index beb781d..536e26e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -406,7 +406,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void run() {
// TODO GemFire used the tcp/ip connection but this is using heartbeats
- boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+ boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
if (!pinged) {
suspectedMemberInView.put(mbr, currentView);
String reason = "Member isn't responding to heartbeat requests";
@@ -438,7 +438,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* @param member
* @return
*/
- private boolean doCheckMember(InternalDistributedMember member) {
+ private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse) {
if (playingDead || beingSick) {
// a member playingDead should not be sending messages to other
// members, so we avoid sending heartbeat requests or suspect
@@ -449,14 +449,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.trace("Checking member {}", member);
final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
final Response pingResp = new Response();
- requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+ if(waitForResponse) {
+ requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+ }
try {
Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
this.stats.incHeartbeatRequestsSent();
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
// member is not part of current view.
logger.trace("Member {} is not part of current view.", member);
- } else {
+ } else if (waitForResponse){
synchronized (pingResp) {
if (pingResp.getResponseMsg() == null) {
pingResp.wait(memberTimeout);
@@ -484,7 +486,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
} catch (InterruptedException e) {
logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
} finally {
- requestIdVsResponse.remove(hrm.getRequestId());
+ if(waitForResponse) {
+ requestIdVsResponse.remove(hrm.getRequestId());
+ }
}
return false;
}
@@ -496,7 +500,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* @param suspectMember member that does not respond to HeartbeatRequestMessage
* @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
*/
- private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+ boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
Socket clientSocket = null;
try {
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
@@ -1060,6 +1064,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
resp.notify();
}
}
+ //we got heartbeat lets update timestamp
+ contactedBy(m.getSender(), System.currentTimeMillis());
}
}
@@ -1255,7 +1261,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (logger.isDebugEnabled()) {
logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
}
- pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+ pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
if (pinged) {
@@ -1263,6 +1269,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
}
} else {
+ //this will just send heartbeat request, it will not wait for response
+ //if we will get heartbeat then it will change the timestamp, which we are
+ //checking below in case of tcp check failure..
+ GMSHealthMonitor.this.doCheckMember(mbr, false);
pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 9562e41..a96b546 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -81,7 +81,8 @@ public class GMSHealthMonitorJUnitTest {
private long statsId = 123;
final long memberTimeout = 1000l;
private int[] portRange= new int[]{0, 65535};
-
+ private boolean useGMSHealthMonitorTestClass = false;
+
@Before
public void initMocks() throws UnknownHostException {
//System.setProperty("gemfire.bind-address", "localhost");
@@ -131,7 +132,7 @@ public class GMSHealthMonitorJUnitTest {
}
when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor = new GMSHealthMonitor();
+ gmsHealthMonitor = new GMSHealthMonitorTest();
gmsHealthMonitor.init(services);
gmsHealthMonitor.start();
}
@@ -228,7 +229,7 @@ public class GMSHealthMonitorJUnitTest {
Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsSent() > 0);
}
- private void installAView() {
+ private NetView installAView() {
System.out.println("testSuspectMembersCalledThroughMemberCheckThread starting");
NetView v = new NetView(mockMembers.get(0), 2, mockMembers);
@@ -237,6 +238,17 @@ public class GMSHealthMonitorJUnitTest {
gmsHealthMonitor.started();
gmsHealthMonitor.installView(v);
+
+ return v;
+ }
+
+ private void setFailureDetectionPorts(NetView v) {
+ java.util.Iterator<InternalDistributedMember> itr = mockMembers.iterator();
+
+ int port = 7899;
+ while(itr.hasNext()) {
+ v.setFailureDetectionPort(itr.next(), port++);
+ }
}
/***
@@ -414,8 +426,8 @@ public class GMSHealthMonitorJUnitTest {
@Test
public void testCheckIfAvailableWithSimulatedHeartBeat() {
- installAView();
-
+ NetView v = installAView();
+
InternalDistributedMember memberToCheck = mockMembers.get(1);
HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
fakeHeartbeat.setSender(memberToCheck);
@@ -431,6 +443,33 @@ public class GMSHealthMonitorJUnitTest {
assertTrue("CheckIfAvailable should have return true", retVal);
}
+ @Test
+ public void testCheckIfAvailableWithSimulatedHeartBeatWithTcpCheck() {
+ useGMSHealthMonitorTestClass = true;
+
+ try {
+ NetView v = installAView();
+
+ setFailureDetectionPorts(v);
+
+ InternalDistributedMember memberToCheck = mockMembers.get(1);
+ HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
+ fakeHeartbeat.setSender(memberToCheck);
+ when(messenger.send(any(HeartbeatRequestMessage.class))).then(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ gmsHealthMonitor.processMessage(fakeHeartbeat);
+ return null;
+ }
+ });
+
+ boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+ assertTrue("CheckIfAvailable should have return true", retVal);
+ }finally {
+ useGMSHealthMonitorTestClass = false;
+ }
+ }
+
@Test
public void testShutdown() {
@@ -635,4 +674,12 @@ public class GMSHealthMonitorJUnitTest {
return baos.toByteArray();
}
+ public class GMSHealthMonitorTest extends GMSHealthMonitor {
+ @Override
+ boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+ if(useGMSHealthMonitorTestClass)
+ return false;
+ return super.doTCPCheckMember(suspectMember, port);
+ }
+ }
}