You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2015/12/11 22:22:57 UTC
[23/50] [abbrv] incubator-geode git commit: new unit tests and code
clean-up
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index fbdcdf5..4b9c01f 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -16,26 +16,22 @@
*/
package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
+
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-import junit.framework.Assert;
-
+import org.apache.commons.lang.SerializationException;
import org.jgroups.Event;
+import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.UNICAST3;
@@ -43,9 +39,10 @@ import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
import com.gemstone.gemfire.ForcedDisconnectException;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
@@ -58,15 +55,21 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger.JGroupsReceiver;
import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
import com.gemstone.gemfire.test.junit.categories.UnitTest;
@Category(UnitTest.class)
@@ -76,6 +79,7 @@ public class JGroupsMessengerJUnitTest {
private JoinLeave joinLeave;
private Manager manager;
private Stopper stopper;
+ private HealthMonitor healthMonitor;
private InterceptUDP interceptor;
@@ -83,6 +87,10 @@ public class JGroupsMessengerJUnitTest {
* Create stub and mock objects
*/
private void initMocks(boolean enableMcast) throws Exception {
+ if (messenger != null) {
+ messenger.stop();
+ messenger = null;
+ }
Properties nonDefault = new Properties();
nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true");
nonDefault.put(DistributionConfig.MCAST_PORT_NAME, enableMcast? ""+AvailablePortHelper.getRandomAvailableUDPPort() : "0");
@@ -100,6 +108,8 @@ public class JGroupsMessengerJUnitTest {
manager = mock(Manager.class);
when(manager.isMulticastAllowed()).thenReturn(enableMcast);
+ healthMonitor = mock(HealthMonitor.class);
+
joinLeave = mock(JoinLeave.class);
ServiceConfig serviceConfig = new ServiceConfig(tconfig, config);
@@ -107,6 +117,7 @@ public class JGroupsMessengerJUnitTest {
services = mock(Services.class);
when(services.getConfig()).thenReturn(serviceConfig);
when(services.getCancelCriterion()).thenReturn(stopper);
+ when(services.getHealthMonitor()).thenReturn(healthMonitor);
when(services.getManager()).thenReturn(manager);
when(services.getJoinLeave()).thenReturn(joinLeave);
when(services.getStatistics()).thenReturn(mock(DMStats.class));
@@ -121,7 +132,7 @@ public class JGroupsMessengerJUnitTest {
"<"+InterceptUDP.class.getName()+"/>" +
jgroupsConfig.substring(insertIdx);
messenger.setJGroupsStackConfigForTesting(jgroupsConfig);
- System.out.println("jgroups config: " + jgroupsConfig);
+// System.out.println("jgroups config: " + jgroupsConfig);
messenger.start();
messenger.started();
@@ -141,13 +152,195 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testMemberWeightIsSerialized() throws Exception {
HeapDataOutputStream out = new HeapDataOutputStream(500, Version.CURRENT);
- InternalDistributedMember m = new InternalDistributedMember("localhost", 8888);
- ((GMSMember)m.getNetMember()).setMemberWeight((byte)40);
- m.toData(out);
+ InternalDistributedMember mbr = createAddress(8888);
+ ((GMSMember)mbr.getNetMember()).setMemberWeight((byte)40);
+ mbr.toData(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
- m = new InternalDistributedMember();
- m.fromData(in);
- assertEquals(40, m.getNetMember().getMemberWeight());
+ mbr = new InternalDistributedMember();
+ mbr.fromData(in);
+ assertEquals(40, mbr.getNetMember().getMemberWeight());
+ }
+
+ @Test
+ public void testSerializationError() throws Exception {
+ for (int i=0; i<2 ; i++) {
+ boolean enableMcast = (i==1);
+ initMocks(enableMcast);
+ InternalDistributedMember mbr = createAddress(8888);
+ DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+ when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+ when(msg.getMulticast()).thenReturn(enableMcast);
+ if (!enableMcast) {
+ // for non-mcast we send a message with a reply-processor
+ when(msg.getProcessorId()).thenReturn(1234);
+ } else {
+ // for mcast we send a direct-ack message and expect the messenger
+ // to register it
+ stub(msg.isDirectAck()).toReturn(true);
+ }
+ when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+
+ // for code coverage we need to test with both a SerializationException and
+ // an IOException. The former is wrapped in a GemfireIOException while the
+ // latter is not
+ doThrow(new SerializationException()).when(msg).toData(any(DataOutput.class));
+ try {
+ messenger.send(msg);
+ fail("expected a failure");
+ } catch (GemFireIOException e) {
+ // success
+ }
+ if (enableMcast) {
+ verify(msg, atLeastOnce()).registerProcessor();
+ }
+ doThrow(new IOException()).when(msg).toData(any(DataOutput.class));
+ try {
+ messenger.send(msg);
+ fail("expected a failure");
+ } catch (GemFireIOException e) {
+ // success
+ }
+ }
+ }
+
+ @Test
+ public void testJChannelError() throws Exception {
+ for (int i=0; i<2 ; i++) {
+ boolean enableMcast = (i==1);
+ initMocks(enableMcast);
+ JChannel mockChannel = mock(JChannel.class);
+ when(mockChannel.isConnected()).thenReturn(true);
+ doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
+ JChannel realChannel = messenger.myChannel;
+ messenger.myChannel = mockChannel;
+ try {
+ InternalDistributedMember mbr = createAddress(8888);
+ DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+ when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+ when(msg.getMulticast()).thenReturn(enableMcast);
+ when(msg.getProcessorId()).thenReturn(1234);
+ when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+ try {
+ messenger.send(msg);
+ fail("expected a failure");
+ } catch (DistributedSystemDisconnectedException e) {
+ // success
+ }
+ verify(mockChannel).send(isA(Message.class));
+ } finally {
+ messenger.myChannel = realChannel;
+ }
+ }
+ }
+
+ @Test
+ public void testJChannelErrorDuringDisconnect() throws Exception {
+ for (int i=0; i<4 ; i++) {
+ System.out.println("loop #"+i);
+ boolean enableMcast = (i%2 == 1);
+ initMocks(enableMcast);
+ JChannel mockChannel = mock(JChannel.class);
+ when(mockChannel.isConnected()).thenReturn(true);
+ Exception ex, shutdownCause;
+ if (i < 2) {
+ ex = new RuntimeException("");
+ shutdownCause = new RuntimeException("shutdownCause");
+ } else {
+ shutdownCause = new ForcedDisconnectException("");
+ ex = new RuntimeException("", shutdownCause);
+ }
+ doThrow(ex).when(mockChannel).send(any(Message.class));
+ JChannel realChannel = messenger.myChannel;
+ messenger.myChannel = mockChannel;
+
+ when(services.getShutdownCause()).thenReturn(shutdownCause);
+
+ try {
+ InternalDistributedMember mbr = createAddress(8888);
+ DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+ when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+ when(msg.getMulticast()).thenReturn(enableMcast);
+ when(msg.getProcessorId()).thenReturn(1234);
+ when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+ try {
+ messenger.send(msg);
+ fail("expected a failure");
+ } catch (DistributedSystemDisconnectedException e) {
+ // the ultimate cause should be the shutdownCause returned
+ // by Services.getShutdownCause()
+ Throwable cause = e;
+ while (cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ assertTrue(cause != e);
+ assertTrue(cause == shutdownCause);
+ }
+ verify(mockChannel).send(isA(Message.class));
+ } finally {
+ messenger.myChannel = realChannel;
+ }
+ }
+ }
+
+ @Test
+ public void testSendWhenChannelIsClosed() throws Exception {
+ for (int i=0; i<2 ; i++) {
+ initMocks(false);
+ JChannel mockChannel = mock(JChannel.class);
+ when(mockChannel.isConnected()).thenReturn(false);
+ doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
+ JChannel realChannel = messenger.myChannel;
+ messenger.myChannel = mockChannel;
+ try {
+ InternalDistributedMember mbr = createAddress(8888);
+ DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+ when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+ when(msg.getMulticast()).thenReturn(false);
+ when(msg.getProcessorId()).thenReturn(1234);
+ try {
+ messenger.send(msg);
+ fail("expected a failure");
+ } catch (DistributedSystemDisconnectedException e) {
+ // success
+ }
+ verify(mockChannel, never()).send(isA(Message.class));
+ } finally {
+ messenger.myChannel = realChannel;
+ }
+ }
+ }
+
+ @Test
+ public void testSendUnreliably() throws Exception {
+ for (int i=0; i<2 ; i++) {
+ boolean enableMcast = (i==1);
+ initMocks(enableMcast);
+ InternalDistributedMember mbr = createAddress(8888);
+ DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+ when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+ when(msg.getMulticast()).thenReturn(enableMcast);
+ if (!enableMcast) {
+ // for non-mcast we send a message with a reply-processor
+ when(msg.getProcessorId()).thenReturn(1234);
+ } else {
+ // for mcast we send a direct-ack message and expect the messenger
+ // to register it
+ stub(msg.isDirectAck()).toReturn(true);
+ }
+ when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+ interceptor.collectMessages = true;
+ try {
+ messenger.sendUnreliably(msg);
+ } catch (GemFireIOException e) {
+ fail("expected success");
+ }
+ if (enableMcast) {
+ verify(msg, atLeastOnce()).registerProcessor();
+ }
+ verify(msg).toData(isA(DataOutput.class));
+ assertTrue("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size() == 1);
+ assertTrue(interceptor.collectedMessages.get(0).isFlagSet(Message.Flag.NO_RELIABILITY));
+ }
}
@Test
@@ -265,7 +458,7 @@ public class JGroupsMessengerJUnitTest {
public void testSendToMultipleMembers() throws Exception {
initMocks(false);
InternalDistributedMember sender = messenger.getMemberID();
- InternalDistributedMember other = new InternalDistributedMember("localhost", 8888);
+ InternalDistributedMember other = createAddress(8888);
NetView v = new NetView(sender);
v.add(other);
@@ -285,11 +478,11 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelStillConnectedAfterEmergencyCloseAfterForcedDisconnectWithAutoReconnect() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
- Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
+ doCallRealMethod().when(services).isAutoReconnectEnabled();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
@@ -299,11 +492,11 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelStillConnectedAfterStopAfterForcedDisconnectWithAutoReconnect() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
- Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
+ doCallRealMethod().when(services).isAutoReconnectEnabled();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
@@ -313,12 +506,12 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelStillConnectedAfteremergencyWhileReconnectingDS() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(true).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(false).when(services).isAutoReconnectEnabled();
+ doReturn(true).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
@@ -329,12 +522,12 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelStillConnectedAfterStopWhileReconnectingDS() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(true).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(false).when(services).isAutoReconnectEnabled();
+ doReturn(true).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
@@ -344,12 +537,12 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelClosedOnEmergencyClose() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(false).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(false).when(services).isAutoReconnectEnabled();
+ doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
@@ -359,12 +552,12 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelClosedOnStop() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(false).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(false).when(services).isAutoReconnectEnabled();
+ doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
@@ -374,12 +567,12 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelClosedAfterEmergencyCloseForcedDisconnectWithoutAutoReconnect() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(false).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(true).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(false).when(services).isAutoReconnectEnabled();
+ doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
@@ -389,12 +582,12 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelStillConnectedStopAfterForcedDisconnectWithoutAutoReconnect() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(false).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(true).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(false).when(services).isAutoReconnectEnabled();
+ doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
@@ -404,12 +597,12 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelClosedAfterEmergencyCloseNotForcedDisconnectWithAutoReconnect() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(true).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(false).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(true).when(services).isAutoReconnectEnabled();
+ doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.emergencyClose();
@@ -419,18 +612,150 @@ public class JGroupsMessengerJUnitTest {
@Test
public void testChannelStillConnectedStopNotForcedDisconnectWithAutoReconnect() throws Exception {
initMocks(false);
- Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
- Mockito.doCallRealMethod().when(services).getShutdownCause();
- Mockito.doCallRealMethod().when(services).emergencyClose();
- Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
- Mockito.doReturn(true).when(services).isAutoReconnectEnabled();
- Mockito.doReturn(false).when(manager).isReconnectingDS();
+ doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+ doCallRealMethod().when(services).getShutdownCause();
+ doCallRealMethod().when(services).emergencyClose();
+ doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+ doReturn(true).when(services).isAutoReconnectEnabled();
+ doReturn(false).when(manager).isReconnectingDS();
services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
assertTrue(messenger.myChannel.isConnected());
messenger.stop();
assertFalse(messenger.myChannel.isConnected());
}
+ @Test
+ public void testMessageFiltering() throws Exception {
+ initMocks(true);
+ InternalDistributedMember mbr = createAddress(8888);
+ NetView view = new NetView(mbr);
+
+ // the digest should be set in an outgoing join response
+ JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view);
+ messenger.filterOutgoingMessage(joinResponse);
+ assertNotNull(joinResponse.getMessengerData());
+
+ // save the view digest for later
+ byte[] data = joinResponse.getMessengerData();
+
+ // the digest should be used and the message bytes nulled out in an incoming join response
+ messenger.filterIncomingMessage(joinResponse);
+ assertNull(joinResponse.getMessengerData());
+
+ // the digest shouldn't be set in an outgoing rejection message
+ joinResponse = new JoinResponseMessage("you can't join my distributed system. nyah nyah nyah!");
+ messenger.filterOutgoingMessage(joinResponse);
+ assertNull(joinResponse.getMessengerData());
+
+ // the digest shouldn't be installed from an incoming rejection message
+ joinResponse.setMessengerData(data);
+ messenger.filterIncomingMessage(joinResponse);
+ assertNotNull(joinResponse.getMessengerData());
+ }
+
+ @Test
+ public void testPingPong() throws Exception {
+ initMocks(false);
+ GMSPingPonger pinger = messenger.getPingPonger();
+ InternalDistributedMember mbr = createAddress(8888);
+ JGAddress addr = new JGAddress(mbr);
+
+ Message pingMessage = pinger.createPingMessage(null, addr);
+ assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));
+ assertFalse(pinger.isPongMessage(pingMessage.getBuffer()));
+
+ Message pongMessage = pinger.createPongMessage(null, addr);
+ assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));
+ assertFalse(pinger.isPingMessage(pongMessage.getBuffer()));
+
+ interceptor.collectMessages = true;
+ pinger.sendPingMessage(messenger.myChannel, null, addr);
+ assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1);
+ pingMessage = interceptor.collectedMessages.get(0);
+ assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));
+
+ interceptor.collectedMessages.clear();
+ pinger.sendPongMessage(messenger.myChannel, null, addr);
+ assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1);
+ pongMessage = interceptor.collectedMessages.get(0);
+ assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));
+
+ interceptor.collectedMessages.clear();
+ JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver();
+ long pongsReceived = messenger.pongsReceived;
+ receiver.receive(pongMessage);
+ assertEquals(pongsReceived+1, messenger.pongsReceived);
+ receiver.receive(pingMessage);
+ assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1);
+ Message m = interceptor.collectedMessages.get(0);
+ assertTrue(pinger.isPongMessage(m.getBuffer()));
+ }
+
+ @Test
+ public void testJGroupsIOExceptionHandler() throws Exception {
+ initMocks(false);
+ InternalDistributedMember mbr = createAddress(8888);
+ NetView v = new NetView(mbr);
+ v.add(messenger.getMemberID());
+ messenger.installView(v);
+
+ IOException ioe = new IOException("test exception");
+ messenger.handleJGroupsIOException(ioe, new JGAddress(mbr));
+ messenger.handleJGroupsIOException(ioe, new JGAddress(mbr)); // should be ignored
+ verify(healthMonitor).checkIfAvailable(mbr, "Unable to send messages to this member via JGroups", true);
+ }
+
+ @Test
+ public void testReceiver() throws Exception {
+ initMocks(false);
+ JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver();
+
+ // a zero-length message is ignored
+ Message msg = new Message(new JGAddress(messenger.getMemberID()));
+ Object result = messenger.readJGMessage(msg);
+ assertNull(result);
+
+ // for code coverage we need to pump this message through the receiver
+ receiver.receive(msg);
+
+ // for more code coverage we need to actually set a buffer in the message
+ msg.setBuffer(new byte[0]);
+ result = messenger.readJGMessage(msg);
+ assertNull(result);
+ receiver.receive(msg);
+
+ // now create a view and a real distribution-message
+ InternalDistributedMember myAddress = messenger.getMemberID();
+ InternalDistributedMember other = createAddress(8888);
+ NetView v = new NetView(myAddress);
+ v.add(other);
+ when(joinLeave.getView()).thenReturn(v);
+ messenger.installView(v);
+
+ List<InternalDistributedMember> recipients = v.getMembers();
+ SerialAckedMessage dmsg = new SerialAckedMessage();
+ dmsg.setRecipients(recipients);
+
+ // a message is ignored during manager shutdown
+ msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL);
+ when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE);
+ receiver.receive(msg);
+ verify(manager, never()).processMessage(isA(DistributionMessage.class));
+ }
+
+ @Test
+ public void testUseOldJChannel() throws Exception {
+ initMocks(false);
+ JChannel channel = messenger.myChannel;
+ services.getConfig().getTransport().setOldDSMembershipInfo(channel);
+ JGroupsMessenger newMessenger = new JGroupsMessenger();
+ newMessenger.init(services);
+ newMessenger.start();
+ newMessenger.started();
+ newMessenger.stop();
+ assertTrue(newMessenger.myChannel == messenger.myChannel);
+ }
+
/**
* creates an InternalDistributedMember address that can be used
* with the doctored JGroups channel. This includes a logical
@@ -439,7 +764,7 @@ public class JGroupsMessengerJUnitTest {
* @param port the UDP port to use for the new address
*/
private InternalDistributedMember createAddress(int port) {
- GMSMember gms = new GMSMember("localhost", 8888);
+ GMSMember gms = new GMSMember("localhost", port);
gms.setUUID(UUID.randomUUID());
gms.setVmKind(DistributionManager.NORMAL_DM_TYPE);
gms.setVersionOrdinal(Version.CURRENT_ORDINAL);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java b/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
index 0004246..5dffa47 100644
--- a/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
+++ b/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
@@ -31,4 +31,6 @@ public interface RemoteDUnitVMIF extends Remote {
MethExecutorResult executeMethodOnClass(String name, String methodName,
Object[] args) throws RemoteException;
+ void shutDownVM() throws RemoteException;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/ChildVM.java b/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
index 67b2710..45a236a 100644
--- a/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
+++ b/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
@@ -34,6 +34,15 @@ import dunit.standalone.DUnitLauncher.MasterRemote;
*/
public class ChildVM {
+ private static boolean stopMainLoop = false;
+
+ /**
+ * tells the main() loop to exit
+ */
+ public static void stopVM() {
+ stopMainLoop = true;
+ }
+
static {
createHydraLogWriter();
}
@@ -54,7 +63,7 @@ public class ChildVM {
Naming.rebind("//localhost:" + namingPort + "/vm" + vmNum, dunitVM);
holder.signalVMReady();
//This loop is here so this VM will die even if the master is mean killed.
- while(true) {
+ while (!stopMainLoop) {
holder.ping();
Thread.sleep(1000);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java b/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
index f3109f3..72c33d6 100644
--- a/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
+++ b/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
@@ -169,6 +169,30 @@ public class DUnitLauncher {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
+// System.out.println("shutting down DUnit JVMs");
+// for (int i=0; i<NUM_VMS; i++) {
+// try {
+// processManager.getStub(i).shutDownVM();
+// } catch (Exception e) {
+// System.out.println("exception shutting down vm_"+i+": " + e);
+// }
+// }
+// // TODO - hasLiveVMs always returns true
+// System.out.print("waiting for JVMs to exit");
+// long giveUp = System.currentTimeMillis() + 5000;
+// while (giveUp > System.currentTimeMillis()) {
+// if (!processManager.hasLiveVMs()) {
+// return;
+// }
+// System.out.print(".");
+// System.out.flush();
+// try {
+// Thread.sleep(1000);
+// } catch (InterruptedException e) {
+// break;
+// }
+// }
+// System.out.println("\nkilling any remaining JVMs");
processManager.killVMs();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
index 60ac04d..7fc762f 100644
--- a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
+++ b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
@@ -98,12 +98,20 @@ public class ProcessManager {
public synchronized void killVMs() {
for(ProcessHolder process : processes.values()) {
if(process != null) {
- //TODO - stop it gracefully? Why bother
process.kill();
}
}
}
+ public synchronized boolean hasLiveVMs() {
+ for(ProcessHolder process : processes.values()) {
+ if(process != null && process.isAlive()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public synchronized void bounce(int vmNum) {
if(!processes.containsKey(vmNum)) {
throw new IllegalStateException("No such process " + vmNum);
@@ -240,6 +248,10 @@ public class ProcessManager {
public boolean isKilled() {
return killed;
}
+
+ public boolean isAlive() {
+ return !killed && process.isAlive();
+ }
}
public RemoteDUnitVMIF getStub(int i) throws AccessException, RemoteException, NotBoundException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java b/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
index 15acc2e..742dc55 100644
--- a/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
+++ b/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
@@ -135,11 +135,10 @@ public class RemoteDUnitVM extends UnicastRemoteObject implements RemoteDUnitVMI
}
- public void shutDownVM(boolean disconnect, boolean runShutdownHook)
- throws RemoteException {
+ public void shutDownVM() throws RemoteException {
+ ChildVM.stopVM();
}
- public void disconnectVM()
- throws RemoteException {
+ public void disconnectVM() throws RemoteException {
}
}