You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/02/26 19:47:21 UTC

[1/2] incubator-geode git commit: GEODE-1003 added dunit to verify de-serialization failure in peer node

Repository: incubator-geode
Updated Branches:
  refs/heads/develop e40962cf6 -> 9a8b31996


GEODE-1003 added dunit to verify de-serialization failure in peer node


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3dda21e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3dda21e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3dda21e5

Branch: refs/heads/develop
Commit: 3dda21e5b65673920cd22fbbf9e8f5a4d2468a7c
Parents: e40962c
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Fri Feb 26 10:31:49 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Fri Feb 26 10:31:49 2016 -0800

----------------------------------------------------------------------
 .../DistributedMulticastRegionDUnitTest.java    | 105 +++++++++++++++++++
 1 file changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3dda21e5/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
index 77e8253..72b7941 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
@@ -31,10 +31,15 @@ import com.gemstone.gemfire.cache.Scope;
 import com.gemstone.gemfire.distributed.Locator;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.ReplyException;
 import com.gemstone.gemfire.distributed.internal.SharedConfiguration;
 import com.gemstone.gemfire.internal.AvailablePortHelper;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.OffHeapTestUtil;
+import com.gemstone.gemfire.pdx.PdxReader;
+import com.gemstone.gemfire.pdx.PdxSerializable;
+import com.gemstone.gemfire.pdx.PdxSerializationException;
+import com.gemstone.gemfire.pdx.PdxWriter;
 import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.Invoke;
@@ -128,6 +133,106 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
       closeLocator();      
   }
   
+  public class TestObjectThrowsException implements PdxSerializable {
+    String name = "TestObjectThrowsException";
+
+    @Override
+    public void toData(PdxWriter writer) {
+      writer.writeString("name", name);      
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      throw new RuntimeException("Unable to desrialize message ");
+      //name = reader.readString("name");
+    }
+    
+  }
+  
+  public void testMulticastWithRegionOpsException() {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+
+    try {
+      final String name = "mcastRegion";
+      
+      SerializableRunnable setSysProp = new CacheSerializableRunnable("Create Region") {
+        public void run2() throws CacheException {
+          System.setProperty("gemfire.STORE_ALL_VALUE_FORMS", "true");
+        }
+      };
+      
+      
+      SerializableRunnable create =
+        new CacheSerializableRunnable("Create Region") {
+            public void run2() throws CacheException {
+              createRegion(name, getRegionAttributes());
+            }
+          };
+  
+      locatorPort = startLocator();
+      
+      vm0.invoke(setSysProp);
+      vm1.invoke(setSysProp);
+      
+      //1. start locator with mcast port
+      vm0.invoke(create);
+      vm1.invoke(create);
+      
+      SerializableRunnable validateMulticastBeforeRegionOps =
+          new CacheSerializableRunnable("validateMulticast before region ops") {
+              public void run2() throws CacheException {
+                validateMulticastOpsBeforeRegionOps();
+              }
+          };
+        
+      vm0.invoke(validateMulticastBeforeRegionOps);
+      vm1.invoke(validateMulticastBeforeRegionOps);
+      
+      SerializableRunnable doPuts =
+        new CacheSerializableRunnable("do put") {
+            public void run2() throws CacheException {
+              final Region region =
+                  getRootRegion().getSubregion(name);
+              boolean gotReplyException = false;
+              for(int i =0 ; i < 1; i++) {
+                try {
+                  region.put(i, new TestObjectThrowsException());
+                }catch(PdxSerializationException e) {
+                  gotReplyException = true;
+                }catch(Exception e ) {
+                  region.getCache().getLogger().info("Got exception of type " + e.getClass().toString());
+                } 
+              }
+              assertTrue("We should have got ReplyEception ", gotReplyException);
+            }
+        };
+        
+      vm0.invoke(doPuts);
+      
+      SerializableRunnable validateMulticastAfterRegionOps =
+        new CacheSerializableRunnable("validateMulticast after region ops") {
+            public void run2() throws CacheException {
+              validateMulticastOpsAfterRegionOps();
+            }
+        };
+      
+        vm0.invoke(validateMulticastAfterRegionOps);
+        vm1.invoke(validateMulticastAfterRegionOps);
+     
+        closeLocator();
+    }finally {
+      SerializableRunnable unsetSysProp = new CacheSerializableRunnable("Create Region") {
+        public void run2() throws CacheException {
+          System.setProperty("gemfire.STORE_ALL_VALUE_FORMS", "false");          
+        }
+      };
+      vm0.invoke(unsetSysProp);
+      vm1.invoke(unsetSysProp);
+    }
+  }
+  
   protected RegionAttributes getRegionAttributes() {
     AttributesFactory factory = new AttributesFactory();
     factory.setScope(Scope.DISTRIBUTED_ACK);


[2/2] incubator-geode git commit: GEODE-1004 request heartbeat request with tcp check

Posted by hi...@apache.org.
GEODE-1004 request heartbeat request with tcp check


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9a8b3199
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9a8b3199
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9a8b3199

Branch: refs/heads/develop
Commit: 9a8b319961e79ffb15caa5292ae3cad5e72de8f0
Parents: 3dda21e
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Fri Feb 26 10:41:42 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Fri Feb 26 10:41:42 2016 -0800

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 24 ++++++---
 .../gms/fd/GMSHealthMonitorJUnitTest.java       | 57 ++++++++++++++++++--
 2 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index beb781d..536e26e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -406,7 +406,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       @Override
       public void run() {
         // TODO GemFire used the tcp/ip connection but this is using heartbeats
-        boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+        boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
         if (!pinged) {
           suspectedMemberInView.put(mbr, currentView);
           String reason = "Member isn't responding to heartbeat requests";
@@ -438,7 +438,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * @param member
    * @return
    */
-  private boolean doCheckMember(InternalDistributedMember member) {
+  private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse) {
     if (playingDead || beingSick) {
       // a member playingDead should not be sending messages to other
       // members, so we avoid sending heartbeat requests or suspect
@@ -449,14 +449,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     logger.trace("Checking member {}", member);
     final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
     final Response pingResp = new Response();
-    requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+    if(waitForResponse) {
+      requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+    }
     try {
       Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
       this.stats.incHeartbeatRequestsSent();
       if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
         // member is not part of current view.
         logger.trace("Member {} is not part of current view.", member);
-      } else {
+      } else if (waitForResponse){
         synchronized (pingResp) {
           if (pingResp.getResponseMsg() == null) {
             pingResp.wait(memberTimeout);
@@ -484,7 +486,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     } catch (InterruptedException e) {
       logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
     } finally {
-      requestIdVsResponse.remove(hrm.getRequestId());
+      if(waitForResponse) {
+        requestIdVsResponse.remove(hrm.getRequestId());
+      }
     }
     return false;
   }
@@ -496,7 +500,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * @param suspectMember member that does not respond to HeartbeatRequestMessage
    * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
    */
-  private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+  boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
     Socket clientSocket = null;
     try {
       logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
@@ -1060,6 +1064,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           resp.notify();
         }
       }
+      //we got heartbeat lets update timestamp
+      contactedBy(m.getSender(), System.currentTimeMillis());
     }
   }
 
@@ -1255,7 +1261,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         if (logger.isDebugEnabled()) {
           logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
         }
-        pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+        pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
         GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
         GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
         if (pinged) {
@@ -1263,6 +1269,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
         }
       } else {
+        //this will just send heartbeat request, it will not wait for response
+        //if we will get heartbeat then it will change the timestamp, which we are 
+        //checking below in case of tcp check failure..
+        GMSHealthMonitor.this.doCheckMember(mbr, false);
         pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
       }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 9562e41..a96b546 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -81,7 +81,8 @@ public class GMSHealthMonitorJUnitTest {
   private long statsId = 123;
   final long memberTimeout = 1000l;
   private int[] portRange= new int[]{0, 65535};
-
+  private boolean useGMSHealthMonitorTestClass = false;
+  
   @Before
   public void initMocks() throws UnknownHostException {
     //System.setProperty("gemfire.bind-address", "localhost");
@@ -131,7 +132,7 @@ public class GMSHealthMonitorJUnitTest {
     }
     when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
     when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
-    gmsHealthMonitor = new GMSHealthMonitor();
+    gmsHealthMonitor = new GMSHealthMonitorTest();
     gmsHealthMonitor.init(services);
     gmsHealthMonitor.start();
   }
@@ -228,7 +229,7 @@ public class GMSHealthMonitorJUnitTest {
     Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsSent() > 0);
   }
 
-  private void installAView() {
+  private NetView installAView() {
     System.out.println("testSuspectMembersCalledThroughMemberCheckThread starting");
     NetView v = new NetView(mockMembers.get(0), 2, mockMembers);
 
@@ -237,6 +238,17 @@ public class GMSHealthMonitorJUnitTest {
     gmsHealthMonitor.started();
 
     gmsHealthMonitor.installView(v);
+    
+    return v;
+  }
+  
+  private void setFailureDetectionPorts(NetView v) {
+    java.util.Iterator<InternalDistributedMember> itr = mockMembers.iterator();
+    
+    int port = 7899;
+    while(itr.hasNext()) {
+      v.setFailureDetectionPort(itr.next(), port++);
+    }
   }
 
   /***
@@ -414,8 +426,8 @@ public class GMSHealthMonitorJUnitTest {
 
   @Test
   public void testCheckIfAvailableWithSimulatedHeartBeat() {
-    installAView();
-    
+    NetView v = installAView();
+   
     InternalDistributedMember memberToCheck = mockMembers.get(1);
     HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
     fakeHeartbeat.setSender(memberToCheck);
@@ -431,6 +443,33 @@ public class GMSHealthMonitorJUnitTest {
     assertTrue("CheckIfAvailable should have return true", retVal);
   }
   
+  @Test
+  public void testCheckIfAvailableWithSimulatedHeartBeatWithTcpCheck() {
+    useGMSHealthMonitorTestClass = true;
+    
+    try {
+      NetView v = installAView();
+      
+      setFailureDetectionPorts(v);
+      
+      InternalDistributedMember memberToCheck = mockMembers.get(1);
+      HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
+      fakeHeartbeat.setSender(memberToCheck);
+      when(messenger.send(any(HeartbeatRequestMessage.class))).then(new Answer() {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+          gmsHealthMonitor.processMessage(fakeHeartbeat);
+          return null;
+        }
+      });
+      
+      boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+      assertTrue("CheckIfAvailable should have return true", retVal);
+    }finally {
+      useGMSHealthMonitorTestClass = false;
+    }    
+  }
+  
   
   @Test
   public void testShutdown() {
@@ -635,4 +674,12 @@ public class GMSHealthMonitorJUnitTest {
     return baos.toByteArray();
   }
 
+  public class GMSHealthMonitorTest extends GMSHealthMonitor {
+    @Override
+    boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+      if(useGMSHealthMonitorTestClass)
+        return false;
+      return super.doTCPCheckMember(suspectMember, port);
+    }
+  }
 }