You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/02/26 19:47:21 UTC
[1/2] incubator-geode git commit: GEODE-1003 added dunit to verify
de-serialization failure in peer node
Repository: incubator-geode
Updated Branches:
refs/heads/develop e40962cf6 -> 9a8b31996
GEODE-1003 added dunit to verify de-serialization failure in peer node
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3dda21e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3dda21e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3dda21e5
Branch: refs/heads/develop
Commit: 3dda21e5b65673920cd22fbbf9e8f5a4d2468a7c
Parents: e40962c
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Fri Feb 26 10:31:49 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Fri Feb 26 10:31:49 2016 -0800
----------------------------------------------------------------------
.../DistributedMulticastRegionDUnitTest.java | 105 +++++++++++++++++++
1 file changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3dda21e5/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
index 77e8253..72b7941 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java
@@ -31,10 +31,15 @@ import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.SharedConfiguration;
import com.gemstone.gemfire.internal.AvailablePortHelper;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.OffHeapTestUtil;
+import com.gemstone.gemfire.pdx.PdxReader;
+import com.gemstone.gemfire.pdx.PdxSerializable;
+import com.gemstone.gemfire.pdx.PdxSerializationException;
+import com.gemstone.gemfire.pdx.PdxWriter;
import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.Host;
import com.gemstone.gemfire.test.dunit.Invoke;
@@ -128,6 +133,106 @@ public class DistributedMulticastRegionDUnitTest extends CacheTestCase {
closeLocator();
}
+ public class TestObjectThrowsException implements PdxSerializable {
+ String name = "TestObjectThrowsException";
+
+ @Override
+ public void toData(PdxWriter writer) {
+ writer.writeString("name", name);
+ }
+
+ @Override
+ public void fromData(PdxReader reader) {
+ throw new RuntimeException("Unable to desrialize message ");
+ //name = reader.readString("name");
+ }
+
+ }
+
+ public void testMulticastWithRegionOpsException() {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+
+ try {
+ final String name = "mcastRegion";
+
+ SerializableRunnable setSysProp = new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ System.setProperty("gemfire.STORE_ALL_VALUE_FORMS", "true");
+ }
+ };
+
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name, getRegionAttributes());
+ }
+ };
+
+ locatorPort = startLocator();
+
+ vm0.invoke(setSysProp);
+ vm1.invoke(setSysProp);
+
+ //1. start locator with mcast port
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ SerializableRunnable validateMulticastBeforeRegionOps =
+ new CacheSerializableRunnable("validateMulticast before region ops") {
+ public void run2() throws CacheException {
+ validateMulticastOpsBeforeRegionOps();
+ }
+ };
+
+ vm0.invoke(validateMulticastBeforeRegionOps);
+ vm1.invoke(validateMulticastBeforeRegionOps);
+
+ SerializableRunnable doPuts =
+ new CacheSerializableRunnable("do put") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ boolean gotReplyException = false;
+ for(int i =0 ; i < 1; i++) {
+ try {
+ region.put(i, new TestObjectThrowsException());
+ }catch(PdxSerializationException e) {
+ gotReplyException = true;
+ }catch(Exception e ) {
+ region.getCache().getLogger().info("Got exception of type " + e.getClass().toString());
+ }
+ }
+ assertTrue("We should have got ReplyEception ", gotReplyException);
+ }
+ };
+
+ vm0.invoke(doPuts);
+
+ SerializableRunnable validateMulticastAfterRegionOps =
+ new CacheSerializableRunnable("validateMulticast after region ops") {
+ public void run2() throws CacheException {
+ validateMulticastOpsAfterRegionOps();
+ }
+ };
+
+ vm0.invoke(validateMulticastAfterRegionOps);
+ vm1.invoke(validateMulticastAfterRegionOps);
+
+ closeLocator();
+ }finally {
+ SerializableRunnable unsetSysProp = new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ System.setProperty("gemfire.STORE_ALL_VALUE_FORMS", "false");
+ }
+ };
+ vm0.invoke(unsetSysProp);
+ vm1.invoke(unsetSysProp);
+ }
+ }
+
protected RegionAttributes getRegionAttributes() {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
[2/2] incubator-geode git commit: GEODE-1004 request heartbeat
request with tcp check
Posted by hi...@apache.org.
GEODE-1004 request heartbeat request with tcp check
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9a8b3199
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9a8b3199
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9a8b3199
Branch: refs/heads/develop
Commit: 9a8b319961e79ffb15caa5292ae3cad5e72de8f0
Parents: 3dda21e
Author: Hitesh Khamesra <hk...@pivotal.io>
Authored: Fri Feb 26 10:41:42 2016 -0800
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Fri Feb 26 10:41:42 2016 -0800
----------------------------------------------------------------------
.../membership/gms/fd/GMSHealthMonitor.java | 24 ++++++---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 57 ++++++++++++++++++--
2 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index beb781d..536e26e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -406,7 +406,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void run() {
// TODO GemFire used the tcp/ip connection but this is using heartbeats
- boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+ boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
if (!pinged) {
suspectedMemberInView.put(mbr, currentView);
String reason = "Member isn't responding to heartbeat requests";
@@ -438,7 +438,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* @param member
* @return
*/
- private boolean doCheckMember(InternalDistributedMember member) {
+ private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse) {
if (playingDead || beingSick) {
// a member playingDead should not be sending messages to other
// members, so we avoid sending heartbeat requests or suspect
@@ -449,14 +449,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.trace("Checking member {}", member);
final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
final Response pingResp = new Response();
- requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+ if(waitForResponse) {
+ requestIdVsResponse.put(hrm.getRequestId(), pingResp);
+ }
try {
Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(hrm);
this.stats.incHeartbeatRequestsSent();
if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
// member is not part of current view.
logger.trace("Member {} is not part of current view.", member);
- } else {
+ } else if (waitForResponse){
synchronized (pingResp) {
if (pingResp.getResponseMsg() == null) {
pingResp.wait(memberTimeout);
@@ -484,7 +486,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
} catch (InterruptedException e) {
logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
} finally {
- requestIdVsResponse.remove(hrm.getRequestId());
+ if(waitForResponse) {
+ requestIdVsResponse.remove(hrm.getRequestId());
+ }
}
return false;
}
@@ -496,7 +500,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* @param suspectMember member that does not respond to HeartbeatRequestMessage
* @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
*/
- private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+ boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
Socket clientSocket = null;
try {
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
@@ -1060,6 +1064,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
resp.notify();
}
}
+ //we got heartbeat lets update timestamp
+ contactedBy(m.getSender(), System.currentTimeMillis());
}
}
@@ -1255,7 +1261,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (logger.isDebugEnabled()) {
logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
}
- pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+ pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
if (pinged) {
@@ -1263,6 +1269,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
}
} else {
+ //this will just send heartbeat request, it will not wait for response
+ //if we will get heartbeat then it will change the timestamp, which we are
+ //checking below in case of tcp check failure..
+ GMSHealthMonitor.this.doCheckMember(mbr, false);
pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9a8b3199/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 9562e41..a96b546 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -81,7 +81,8 @@ public class GMSHealthMonitorJUnitTest {
private long statsId = 123;
final long memberTimeout = 1000l;
private int[] portRange= new int[]{0, 65535};
-
+ private boolean useGMSHealthMonitorTestClass = false;
+
@Before
public void initMocks() throws UnknownHostException {
//System.setProperty("gemfire.bind-address", "localhost");
@@ -131,7 +132,7 @@ public class GMSHealthMonitorJUnitTest {
}
when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
when(messenger.getMemberID()).thenReturn(mockMembers.get(3));
- gmsHealthMonitor = new GMSHealthMonitor();
+ gmsHealthMonitor = new GMSHealthMonitorTest();
gmsHealthMonitor.init(services);
gmsHealthMonitor.start();
}
@@ -228,7 +229,7 @@ public class GMSHealthMonitorJUnitTest {
Assert.assertTrue(gmsHealthMonitor.getStats().getSuspectsSent() > 0);
}
- private void installAView() {
+ private NetView installAView() {
System.out.println("testSuspectMembersCalledThroughMemberCheckThread starting");
NetView v = new NetView(mockMembers.get(0), 2, mockMembers);
@@ -237,6 +238,17 @@ public class GMSHealthMonitorJUnitTest {
gmsHealthMonitor.started();
gmsHealthMonitor.installView(v);
+
+ return v;
+ }
+
+ private void setFailureDetectionPorts(NetView v) {
+ java.util.Iterator<InternalDistributedMember> itr = mockMembers.iterator();
+
+ int port = 7899;
+ while(itr.hasNext()) {
+ v.setFailureDetectionPort(itr.next(), port++);
+ }
}
/***
@@ -414,8 +426,8 @@ public class GMSHealthMonitorJUnitTest {
@Test
public void testCheckIfAvailableWithSimulatedHeartBeat() {
- installAView();
-
+ NetView v = installAView();
+
InternalDistributedMember memberToCheck = mockMembers.get(1);
HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
fakeHeartbeat.setSender(memberToCheck);
@@ -431,6 +443,33 @@ public class GMSHealthMonitorJUnitTest {
assertTrue("CheckIfAvailable should have return true", retVal);
}
+ @Test
+ public void testCheckIfAvailableWithSimulatedHeartBeatWithTcpCheck() {
+ useGMSHealthMonitorTestClass = true;
+
+ try {
+ NetView v = installAView();
+
+ setFailureDetectionPorts(v);
+
+ InternalDistributedMember memberToCheck = mockMembers.get(1);
+ HeartbeatMessage fakeHeartbeat = new HeartbeatMessage();
+ fakeHeartbeat.setSender(memberToCheck);
+ when(messenger.send(any(HeartbeatRequestMessage.class))).then(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ gmsHealthMonitor.processMessage(fakeHeartbeat);
+ return null;
+ }
+ });
+
+ boolean retVal = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+ assertTrue("CheckIfAvailable should have return true", retVal);
+ }finally {
+ useGMSHealthMonitorTestClass = false;
+ }
+ }
+
@Test
public void testShutdown() {
@@ -635,4 +674,12 @@ public class GMSHealthMonitorJUnitTest {
return baos.toByteArray();
}
+ public class GMSHealthMonitorTest extends GMSHealthMonitor {
+ @Override
+ boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
+ if(useGMSHealthMonitorTestClass)
+ return false;
+ return super.doTCPCheckMember(suspectMember, port);
+ }
+ }
}