You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/01 01:09:26 UTC

[17/34] incubator-geode git commit: GEODE-1004 request heartbeat request with tcp check

GEODE-1004 request heartbeat request with tcp check


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9a8b3199
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9a8b3199
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9a8b3199

Branch: refs/heads/feature/GEODE-949-2
Commit: 9a8b319961e79ffb15caa5292ae3cad5e72de8f0
Parents: 3dda21e
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Fri Feb 26 10:41:42 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Fri Feb 26 10:41:42 2016 -0800

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 24 ++++++---
 .../gms/fd/GMSHealthMonitorJUnitTest.java       | 57 ++++++++++++++++++--
 2 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index beb781d..536e26e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -406,7 +406,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       @Override
       public void run() {
         // TODO GemFire used the tcp/ip connection but this is using heartbeats
-        boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+        boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
         if (!pinged) {
           suspectedMemberInView.put(mbr, currentView);
           String reason = "Member isn't responding to heartbeat requests";
@@ -438,7 +438,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * @param member
    * @return
    */
-  private boolean doCheckMember(InternalDistributedMember member) {
+  private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse) {
     if (playingDead || beingSick) {
       // a member playingDead should not be sending messages to other
       // members, so we avoid sending heartbeat requests or suspect
@@ -449,14 +449,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     logger.trace("Checking member {}", member);
     final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
     final Response pingResp = new Response();
-    requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+    if(waitForResponse) {
+      requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+    }
     try {
       Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
       this.stats.incHeartbeatRequestsSent();
       if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
         // member is not part of current view.
         logger.trace("Member {} is not part of current view.", member);
-      } else {
+      } else if (waitForResponse){
         synchronized (pingResp) {
           if (pingResp.getResponseMsg() == null) {
             pingResp.wait(memberTimeout);
@@ -484,7 +486,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     } catch (InterruptedException e) {
       logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
     } finally {
-      requestIdVsResponse.remove(hrm.getRequestId());
+      if(waitForResponse) {
+        requestIdVsResponse.remove(hrm.getRequestId());
+      }
     }
     return false;
   }
@@ -496,7 +500,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * @param suspectMember member that does not respond to HeartbeatRequestMessage
    * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
    */
-  private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+  boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
     Socket clientSocket = null;
     try {
       logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
@@ -1060,6 +1064,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           resp.notify();
         }
       }
+      //we got heartbeat lets update timestamp
+      contactedBy(m.getSender(), System.currentTimeMillis());
     }
   }
 
@@ -1255,7 +1261,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         if (logger.isDebugEnabled()) {
           logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
         }
-        pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+        pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
         GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
         GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
         if (pinged) {
@@ -1263,6 +1269,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
         }
       } else {
+        //this will just send heartbeat request, it will not wait for response
+        //if we will get heartbeat then it will change the timestamp, which we are 
+        //checking below in case of tcp check failure..
+        GMSHealthMonitor.this.doCheckMember(mbr, false);
         pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
       }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 9562e41..a96b546 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -81,7 +81,8 @@ public class GMSHealthMonitorJUnitTest {
   private long statsId = 123;
   final long memberTimeout = 1000l;
   private int[] portRange= new int[]{0, 65535};
-
+  private boolean useGMSHealthMonitorTestClass = false;
+  
   @Before
   public void initMocks() throws UnknownHostException {
     //System.setProperty("gemfire.bind-address", "localhost");
@@ -131,7 +132,7 @@ public class GMSHealthMonitorJUnitTest {
     }
     when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
     when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
-    gmsHealthMonitor = new GMSHealthMonitor();
+    gmsHealthMonitor = new GMSHealthMonitorTest();
     gmsHealthMonitor.init(services);
     gmsHealthMonitor.start();
   }
@@ -228,7 +229,7 @@ public class GMSHealthMonitorJUnitTest {
     Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsSent() > 0);
   }
 
-  private void installAView() {
+  private NetView installAView() {
     System.out.println("testSuspectMembersCalledThroughMemberCheckThread starting");
     NetView v = new NetView(mockMembers.get(0), 2, mockMembers);
 
@@ -237,6 +238,17 @@ public class GMSHealthMonitorJUnitTest {
     gmsHealthMonitor.started();
 
     gmsHealthMonitor.installView(v);
+    
+    return v;
+  }
+  
+  private void setFailureDetectionPorts(NetView v) {
+    java.util.Iterator<InternalDistributedMember> itr = mockMembers.iterator();
+    
+    int port = 7899;
+    while(itr.hasNext()) {
+      v.setFailureDetectionPort(itr.next(), port++);
+    }
   }
 
   /***
@@ -414,8 +426,8 @@ public class GMSHealthMonitorJUnitTest {
 
   @Test
   public void testCheckIfAvailableWithSimulatedHeartBeat() {
-    installAView();
-    
+    NetView v = installAView();
+   
     InternalDistributedMember memberToCheck = mockMembers.get(1);
     HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
     fakeHeartbeat.setSender(memberToCheck);
@@ -431,6 +443,33 @@ public class GMSHealthMonitorJUnitTest {
     assertTrue("CheckIfAvailable should have return true", retVal);
   }
   
+  @Test
+  public void testCheckIfAvailableWithSimulatedHeartBeatWithTcpCheck() {
+    useGMSHealthMonitorTestClass = true;
+    
+    try {
+      NetView v = installAView();
+      
+      setFailureDetectionPorts(v);
+      
+      InternalDistributedMember memberToCheck = mockMembers.get(1);
+      HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
+      fakeHeartbeat.setSender(memberToCheck);
+      when(messenger.send(any(HeartbeatRequestMessage.class))).then(new Answer() {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+          gmsHealthMonitor.processMessage(fakeHeartbeat);
+          return null;
+        }
+      });
+      
+      boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+      assertTrue("CheckIfAvailable should have return true", retVal);
+    }finally {
+      useGMSHealthMonitorTestClass = false;
+    }    
+  }
+  
   
   @Test
   public void testShutdown() {
@@ -635,4 +674,12 @@ public class GMSHealthMonitorJUnitTest {
     return baos.toByteArray();
   }
 
+  public class GMSHealthMonitorTest extends GMSHealthMonitor {
+    @Override
+    boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+      if(useGMSHealthMonitorTestClass)
+        return false;
+      return super.doTCPCheckMember(suspectMember, port);
+    }
+  }
 }