You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2015/12/11 22:22:57 UTC

[23/50] [abbrv] incubator-geode git commit: new unit tests and code clean-up

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index fbdcdf5..4b9c01f 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -16,26 +16,22 @@
  */
 package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
+
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
-import junit.framework.Assert;
-
+import org.apache.commons.lang.SerializationException;
 import org.jgroups.Event;
+import org.jgroups.JChannel;
 import org.jgroups.Message;
 import org.jgroups.conf.ClassConfigurator;
 import org.jgroups.protocols.UNICAST3;
@@ -43,9 +39,10 @@ import org.jgroups.util.UUID;
 import org.junit.After;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
 
 import com.gemstone.gemfire.ForcedDisconnectException;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
@@ -58,15 +55,21 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
 import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services.Stopper;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsMessenger.JGroupsReceiver;
 import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
+import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -76,6 +79,7 @@ public class JGroupsMessengerJUnitTest {
   private JoinLeave joinLeave;
   private Manager manager;
   private Stopper stopper;
+  private HealthMonitor healthMonitor;
   private InterceptUDP interceptor;
 
 
@@ -83,6 +87,10 @@ public class JGroupsMessengerJUnitTest {
    * Create stub and mock objects
    */
   private void initMocks(boolean enableMcast) throws Exception {
+    if (messenger != null) {
+      messenger.stop();
+      messenger = null;
+    }
     Properties nonDefault = new Properties();
     nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true");
     nonDefault.put(DistributionConfig.MCAST_PORT_NAME, enableMcast? ""+AvailablePortHelper.getRandomAvailableUDPPort() : "0");
@@ -100,6 +108,8 @@ public class JGroupsMessengerJUnitTest {
     manager = mock(Manager.class);
     when(manager.isMulticastAllowed()).thenReturn(enableMcast);
     
+    healthMonitor = mock(HealthMonitor.class);
+    
     joinLeave = mock(JoinLeave.class);
     
     ServiceConfig serviceConfig = new ServiceConfig(tconfig, config);
@@ -107,6 +117,7 @@ public class JGroupsMessengerJUnitTest {
     services = mock(Services.class);
     when(services.getConfig()).thenReturn(serviceConfig);
     when(services.getCancelCriterion()).thenReturn(stopper);
+    when(services.getHealthMonitor()).thenReturn(healthMonitor);
     when(services.getManager()).thenReturn(manager);
     when(services.getJoinLeave()).thenReturn(joinLeave);
     when(services.getStatistics()).thenReturn(mock(DMStats.class));
@@ -121,7 +132,7 @@ public class JGroupsMessengerJUnitTest {
         "<"+InterceptUDP.class.getName()+"/>" +
         jgroupsConfig.substring(insertIdx);
     messenger.setJGroupsStackConfigForTesting(jgroupsConfig);
-    System.out.println("jgroups config: " + jgroupsConfig);
+//    System.out.println("jgroups config: " + jgroupsConfig);
     
     messenger.start();
     messenger.started();
@@ -141,13 +152,195 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testMemberWeightIsSerialized() throws Exception {
     HeapDataOutputStream out = new HeapDataOutputStream(500, Version.CURRENT);
-    InternalDistributedMember m = new InternalDistributedMember("localhost", 8888);
-    ((GMSMember)m.getNetMember()).setMemberWeight((byte)40);
-    m.toData(out);
+    InternalDistributedMember mbr = createAddress(8888);
+    ((GMSMember)mbr.getNetMember()).setMemberWeight((byte)40);
+    mbr.toData(out);
     DataInputStream in = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
-    m = new InternalDistributedMember();
-    m.fromData(in);
-    assertEquals(40, m.getNetMember().getMemberWeight());
+    mbr = new InternalDistributedMember();
+    mbr.fromData(in);
+    assertEquals(40, mbr.getNetMember().getMemberWeight());
+  }
+  
+  @Test
+  public void testSerializationError() throws Exception {
+    for (int i=0; i<2 ; i++) {
+      boolean enableMcast = (i==1);
+      initMocks(enableMcast);
+      InternalDistributedMember mbr = createAddress(8888);
+      DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+      when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+      when(msg.getMulticast()).thenReturn(enableMcast);
+      if (!enableMcast) {
+        // for non-mcast we send a message with a reply-processor
+        when(msg.getProcessorId()).thenReturn(1234);
+      } else {
+        // for mcast we send a direct-ack message and expect the messenger
+        // to register it
+        stub(msg.isDirectAck()).toReturn(true);
+      }
+      when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+      
+      // for code coverage we need to test with both a SerializationException and
+      // an IOException.  The former is wrapped in a GemfireIOException while the
+      // latter is not
+      doThrow(new SerializationException()).when(msg).toData(any(DataOutput.class));
+      try {
+        messenger.send(msg);
+        fail("expected a failure");
+      } catch (GemFireIOException e) {
+        // success
+      }
+      if (enableMcast) {
+        verify(msg, atLeastOnce()).registerProcessor();
+      }
+      doThrow(new IOException()).when(msg).toData(any(DataOutput.class));
+      try {
+        messenger.send(msg);
+        fail("expected a failure");
+      } catch (GemFireIOException e) {
+        // success
+      }
+    }
+  }
+  
+  @Test
+  public void testJChannelError() throws Exception {
+    for (int i=0; i<2 ; i++) {
+      boolean enableMcast = (i==1);
+      initMocks(enableMcast);
+      JChannel mockChannel = mock(JChannel.class);
+      when(mockChannel.isConnected()).thenReturn(true);
+      doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
+      JChannel realChannel = messenger.myChannel;
+      messenger.myChannel = mockChannel;
+      try {
+        InternalDistributedMember mbr = createAddress(8888);
+        DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+        when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+        when(msg.getMulticast()).thenReturn(enableMcast);
+        when(msg.getProcessorId()).thenReturn(1234);
+        when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+        try {
+          messenger.send(msg);
+          fail("expected a failure");
+        } catch (DistributedSystemDisconnectedException e) {
+          // success
+        }
+        verify(mockChannel).send(isA(Message.class));
+      } finally {
+        messenger.myChannel = realChannel;
+      }
+    }
+  }
+  
+  @Test
+  public void testJChannelErrorDuringDisconnect() throws Exception {
+    for (int i=0; i<4 ; i++) {
+      System.out.println("loop #"+i);
+      boolean enableMcast = (i%2 == 1);
+      initMocks(enableMcast);
+      JChannel mockChannel = mock(JChannel.class);
+      when(mockChannel.isConnected()).thenReturn(true);
+      Exception ex, shutdownCause;
+      if (i < 2) {
+        ex = new RuntimeException("");
+        shutdownCause = new RuntimeException("shutdownCause");
+      } else {
+        shutdownCause = new ForcedDisconnectException("");
+        ex = new RuntimeException("", shutdownCause);
+      }
+      doThrow(ex).when(mockChannel).send(any(Message.class));
+      JChannel realChannel = messenger.myChannel;
+      messenger.myChannel = mockChannel;
+      
+      when(services.getShutdownCause()).thenReturn(shutdownCause);
+      
+      try {
+        InternalDistributedMember mbr = createAddress(8888);
+        DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+        when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+        when(msg.getMulticast()).thenReturn(enableMcast);
+        when(msg.getProcessorId()).thenReturn(1234);
+        when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+        try {
+          messenger.send(msg);
+          fail("expected a failure");
+        } catch (DistributedSystemDisconnectedException e) {
+          // the ultimate cause should be the shutdownCause returned
+          // by Services.getShutdownCause()
+          Throwable cause = e;
+          while (cause.getCause() != null) {
+            cause = cause.getCause();
+          }
+          assertTrue(cause != e);
+          assertTrue(cause == shutdownCause);
+        }
+        verify(mockChannel).send(isA(Message.class));
+      } finally {
+        messenger.myChannel = realChannel;
+      }
+    }
+  }
+  
+  @Test
+  public void testSendWhenChannelIsClosed() throws Exception {
+    for (int i=0; i<2 ; i++) {
+      initMocks(false);
+      JChannel mockChannel = mock(JChannel.class);
+      when(mockChannel.isConnected()).thenReturn(false);
+      doThrow(new RuntimeException()).when(mockChannel).send(any(Message.class));
+      JChannel realChannel = messenger.myChannel;
+      messenger.myChannel = mockChannel;
+      try {
+        InternalDistributedMember mbr = createAddress(8888);
+        DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+        when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+        when(msg.getMulticast()).thenReturn(false);
+        when(msg.getProcessorId()).thenReturn(1234);
+        try {
+          messenger.send(msg);
+          fail("expected a failure");
+        } catch (DistributedSystemDisconnectedException e) {
+          // success
+        }
+        verify(mockChannel, never()).send(isA(Message.class));
+      } finally {
+        messenger.myChannel = realChannel;
+      }
+    }
+  }
+
+  @Test
+  public void testSendUnreliably() throws Exception {
+    for (int i=0; i<2 ; i++) {
+      boolean enableMcast = (i==1);
+      initMocks(enableMcast);
+      InternalDistributedMember mbr = createAddress(8888);
+      DistributedCacheOperation.CacheOperationMessage msg = mock(DistributedCacheOperation.CacheOperationMessage.class);
+      when(msg.getRecipients()).thenReturn(new InternalDistributedMember[] {mbr});
+      when(msg.getMulticast()).thenReturn(enableMcast);
+      if (!enableMcast) {
+        // for non-mcast we send a message with a reply-processor
+        when(msg.getProcessorId()).thenReturn(1234);
+      } else {
+        // for mcast we send a direct-ack message and expect the messenger
+        // to register it
+        stub(msg.isDirectAck()).toReturn(true);
+      }
+      when(msg.getDSFID()).thenReturn((int)DataSerializableFixedID.PUT_ALL_MESSAGE);
+      interceptor.collectMessages = true;
+      try {
+        messenger.sendUnreliably(msg);
+      } catch (GemFireIOException e) {
+        fail("expected success");
+      }
+      if (enableMcast) {
+        verify(msg, atLeastOnce()).registerProcessor();
+      }
+      verify(msg).toData(isA(DataOutput.class));
+      assertTrue("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size() == 1);
+      assertTrue(interceptor.collectedMessages.get(0).isFlagSet(Message.Flag.NO_RELIABILITY));
+    }
   }
   
   @Test
@@ -265,7 +458,7 @@ public class JGroupsMessengerJUnitTest {
   public void testSendToMultipleMembers() throws Exception {
     initMocks(false);
     InternalDistributedMember sender = messenger.getMemberID();
-    InternalDistributedMember other = new InternalDistributedMember("localhost", 8888);
+    InternalDistributedMember other = createAddress(8888);
 
     NetView v = new NetView(sender);
     v.add(other);
@@ -285,11 +478,11 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelStillConnectedAfterEmergencyCloseAfterForcedDisconnectWithAutoReconnect() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
+    doCallRealMethod().when(services).isAutoReconnectEnabled();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.emergencyClose();
@@ -299,11 +492,11 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelStillConnectedAfterStopAfterForcedDisconnectWithAutoReconnect() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
+    doCallRealMethod().when(services).isAutoReconnectEnabled();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.stop();
@@ -313,12 +506,12 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelStillConnectedAfteremergencyWhileReconnectingDS() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(true).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(false).when(services).isAutoReconnectEnabled();
+    doReturn(true).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.emergencyClose();
@@ -329,12 +522,12 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelStillConnectedAfterStopWhileReconnectingDS() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(true).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(false).when(services).isAutoReconnectEnabled();
+    doReturn(true).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.stop();
@@ -344,12 +537,12 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelClosedOnEmergencyClose() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(false).when(services).isAutoReconnectEnabled();
+    doReturn(false).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.emergencyClose();
@@ -359,12 +552,12 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelClosedOnStop() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(false).when(services).isAutoReconnectEnabled();
+    doReturn(false).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.stop();
@@ -374,12 +567,12 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelClosedAfterEmergencyCloseForcedDisconnectWithoutAutoReconnect() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(true).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(false).when(services).isAutoReconnectEnabled();
+    doReturn(false).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.emergencyClose();
@@ -389,12 +582,12 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelStillConnectedStopAfterForcedDisconnectWithoutAutoReconnect() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(true).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(false).when(services).isAutoReconnectEnabled();
+    doReturn(false).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.stop();
@@ -404,12 +597,12 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelClosedAfterEmergencyCloseNotForcedDisconnectWithAutoReconnect() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(true).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(true).when(services).isAutoReconnectEnabled();
+    doReturn(false).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.emergencyClose();
@@ -419,18 +612,150 @@ public class JGroupsMessengerJUnitTest {
   @Test
   public void testChannelStillConnectedStopNotForcedDisconnectWithAutoReconnect() throws Exception {
     initMocks(false);
-    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
-    Mockito.doCallRealMethod().when(services).getShutdownCause();
-    Mockito.doCallRealMethod().when(services).emergencyClose();
-    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
-    Mockito.doReturn(true).when(services).isAutoReconnectEnabled();
-    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    doCallRealMethod().when(services).getShutdownCause();
+    doCallRealMethod().when(services).emergencyClose();
+    doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    doReturn(true).when(services).isAutoReconnectEnabled();
+    doReturn(false).when(manager).isReconnectingDS();
     services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
     assertTrue(messenger.myChannel.isConnected());
     messenger.stop();
     assertFalse(messenger.myChannel.isConnected());
   }
   
+  @Test
+  public void testMessageFiltering() throws Exception {
+    initMocks(true);
+    InternalDistributedMember mbr = createAddress(8888);
+    NetView view = new NetView(mbr);
+    
+    // the digest should be set in an outgoing join response
+    JoinResponseMessage joinResponse = new JoinResponseMessage(mbr, view);
+    messenger.filterOutgoingMessage(joinResponse);
+    assertNotNull(joinResponse.getMessengerData());
+    
+    // save the view digest for later
+    byte[] data = joinResponse.getMessengerData();
+    
+    // the digest should be used and the message bytes nulled out in an incoming join response
+    messenger.filterIncomingMessage(joinResponse);
+    assertNull(joinResponse.getMessengerData());
+    
+    // the digest shouldn't be set in an outgoing rejection message
+    joinResponse = new JoinResponseMessage("you can't join my distributed system.  nyah nyah nyah!");
+    messenger.filterOutgoingMessage(joinResponse);
+    assertNull(joinResponse.getMessengerData());
+    
+    // the digest shouldn't be installed from an incoming rejection message
+    joinResponse.setMessengerData(data);
+    messenger.filterIncomingMessage(joinResponse);
+    assertNotNull(joinResponse.getMessengerData());
+  }
+  
+  @Test
+  public void testPingPong() throws Exception {
+    initMocks(false);
+    GMSPingPonger pinger = messenger.getPingPonger();
+    InternalDistributedMember mbr = createAddress(8888);
+    JGAddress addr = new JGAddress(mbr);
+    
+    Message pingMessage = pinger.createPingMessage(null, addr);
+    assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));
+    assertFalse(pinger.isPongMessage(pingMessage.getBuffer()));
+    
+    Message pongMessage = pinger.createPongMessage(null, addr);
+    assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));
+    assertFalse(pinger.isPingMessage(pongMessage.getBuffer()));
+    
+    interceptor.collectMessages = true;
+    pinger.sendPingMessage(messenger.myChannel, null, addr);
+    assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1);
+    pingMessage = interceptor.collectedMessages.get(0);
+    assertTrue(pinger.isPingMessage(pingMessage.getBuffer()));
+    
+    interceptor.collectedMessages.clear();
+    pinger.sendPongMessage(messenger.myChannel, null, addr);
+    assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1);
+    pongMessage = interceptor.collectedMessages.get(0);
+    assertTrue(pinger.isPongMessage(pongMessage.getBuffer()));
+
+    interceptor.collectedMessages.clear();
+    JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver();
+    long pongsReceived = messenger.pongsReceived;
+    receiver.receive(pongMessage);
+    assertEquals(pongsReceived+1, messenger.pongsReceived);
+    receiver.receive(pingMessage);
+    assertEquals("expected 1 message but found " + interceptor.collectedMessages, interceptor.collectedMessages.size(), 1);
+    Message m = interceptor.collectedMessages.get(0);
+    assertTrue(pinger.isPongMessage(m.getBuffer()));
+  }
+  
+  @Test
+  public void testJGroupsIOExceptionHandler() throws Exception {
+    initMocks(false);
+    InternalDistributedMember mbr = createAddress(8888);
+    NetView v = new NetView(mbr);
+    v.add(messenger.getMemberID());
+    messenger.installView(v);
+
+    IOException ioe = new IOException("test exception");
+    messenger.handleJGroupsIOException(ioe, new JGAddress(mbr));
+    messenger.handleJGroupsIOException(ioe, new JGAddress(mbr)); // should be ignored
+    verify(healthMonitor).checkIfAvailable(mbr, "Unable to send messages to this member via JGroups", true);
+  }
+  
+  @Test
+  public void testReceiver() throws Exception {
+    initMocks(false);
+    JGroupsReceiver receiver = (JGroupsReceiver)messenger.myChannel.getReceiver();
+    
+    // a zero-length message is ignored
+    Message msg = new Message(new JGAddress(messenger.getMemberID()));
+    Object result = messenger.readJGMessage(msg);
+    assertNull(result);
+    
+    // for code coverage we need to pump this message through the receiver
+    receiver.receive(msg);
+    
+    // for more code coverage we need to actually set a buffer in the message
+    msg.setBuffer(new byte[0]);
+    result = messenger.readJGMessage(msg);
+    assertNull(result);
+    receiver.receive(msg);
+    
+    // now create a view and a real distribution-message
+    InternalDistributedMember myAddress = messenger.getMemberID();
+    InternalDistributedMember other = createAddress(8888);
+    NetView v = new NetView(myAddress);
+    v.add(other);
+    when(joinLeave.getView()).thenReturn(v);
+    messenger.installView(v);
+
+    List<InternalDistributedMember> recipients = v.getMembers();
+    SerialAckedMessage dmsg = new SerialAckedMessage();
+    dmsg.setRecipients(recipients);
+
+    // a message is ignored during manager shutdown
+    msg = messenger.createJGMessage(dmsg, new JGAddress(other), Version.CURRENT_ORDINAL);
+    when(manager.shutdownInProgress()).thenReturn(Boolean.TRUE);
+    receiver.receive(msg);
+    verify(manager, never()).processMessage(isA(DistributionMessage.class));
+  }
+  
+  @Test
+  public void testUseOldJChannel() throws Exception {
+    initMocks(false);
+    JChannel channel = messenger.myChannel;
+    services.getConfig().getTransport().setOldDSMembershipInfo(channel);
+    JGroupsMessenger newMessenger = new JGroupsMessenger();
+    newMessenger.init(services);
+    newMessenger.start();
+    newMessenger.started();
+    newMessenger.stop();
+    assertTrue(newMessenger.myChannel == messenger.myChannel);
+  }
+  
   /**
    * creates an InternalDistributedMember address that can be used
    * with the doctored JGroups channel.  This includes a logical
@@ -439,7 +764,7 @@ public class JGroupsMessengerJUnitTest {
    * @param port the UDP port to use for the new address
    */
   private InternalDistributedMember createAddress(int port) {
-    GMSMember gms = new GMSMember("localhost", 8888);
+    GMSMember gms = new GMSMember("localhost",  port);
     gms.setUUID(UUID.randomUUID());
     gms.setVmKind(DistributionManager.NORMAL_DM_TYPE);
     gms.setVersionOrdinal(Version.CURRENT_ORDINAL);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java b/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
index 0004246..5dffa47 100644
--- a/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
+++ b/gemfire-core/src/test/java/dunit/RemoteDUnitVMIF.java
@@ -31,4 +31,6 @@ public interface RemoteDUnitVMIF extends Remote {
   MethExecutorResult executeMethodOnClass(String name, String methodName,
       Object[] args) throws RemoteException;
 
+  void shutDownVM() throws RemoteException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/ChildVM.java b/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
index 67b2710..45a236a 100644
--- a/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
+++ b/gemfire-core/src/test/java/dunit/standalone/ChildVM.java
@@ -34,6 +34,15 @@ import dunit.standalone.DUnitLauncher.MasterRemote;
  */
 public class ChildVM {
   
+  private static boolean stopMainLoop = false;
+  
+  /**
+   * tells the main() loop to exit
+   */
+  public static void stopVM() {
+    stopMainLoop = true;
+  }
+  
   static {
     createHydraLogWriter();
   }
@@ -54,7 +63,7 @@ public class ChildVM {
       Naming.rebind("//localhost:" + namingPort + "/vm" + vmNum, dunitVM);
       holder.signalVMReady();
       //This loop is here so this VM will die even if the master is mean killed.
-      while(true) {
+      while (!stopMainLoop) {
         holder.ping();
         Thread.sleep(1000);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java b/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
index f3109f3..72c33d6 100644
--- a/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
+++ b/gemfire-core/src/test/java/dunit/standalone/DUnitLauncher.java
@@ -169,6 +169,30 @@ public class DUnitLauncher {
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
       public void run() {
+//        System.out.println("shutting down DUnit JVMs");
+//        for (int i=0; i<NUM_VMS; i++) {
+//          try {
+//            processManager.getStub(i).shutDownVM();
+//          } catch (Exception e) {
+//            System.out.println("exception shutting down vm_"+i+": " + e);
+//          }
+//        }
+//        // TODO - hasLiveVMs always returns true
+//        System.out.print("waiting for JVMs to exit");
+//        long giveUp = System.currentTimeMillis() + 5000;
+//        while (giveUp > System.currentTimeMillis()) {
+//          if (!processManager.hasLiveVMs()) {
+//            return;
+//          }
+//          System.out.print(".");
+//          System.out.flush();
+//          try {
+//            Thread.sleep(1000);
+//          } catch (InterruptedException e) {
+//            break;
+//          }
+//        }
+//        System.out.println("\nkilling any remaining JVMs");
         processManager.killVMs();
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
index 60ac04d..7fc762f 100644
--- a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
+++ b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
@@ -98,12 +98,20 @@ public class ProcessManager {
   public synchronized void killVMs() {
     for(ProcessHolder process : processes.values()) {
       if(process != null) {
-        //TODO - stop it gracefully? Why bother
         process.kill();
       }
     }
   }
   
+  public synchronized boolean hasLiveVMs() {
+    for(ProcessHolder process : processes.values()) {
+      if(process != null && process.isAlive()) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
   public synchronized void bounce(int vmNum) {
     if(!processes.containsKey(vmNum)) {
       throw new IllegalStateException("No such process " + vmNum);
@@ -240,6 +248,10 @@ public class ProcessManager {
     public boolean isKilled() {
       return killed;
     }
+    
+    public boolean isAlive() {
+      return !killed && process.isAlive();
+    }
   }
 
   public RemoteDUnitVMIF getStub(int i) throws AccessException, RemoteException, NotBoundException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java b/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
index 15acc2e..742dc55 100644
--- a/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
+++ b/gemfire-core/src/test/java/dunit/standalone/RemoteDUnitVM.java
@@ -135,11 +135,10 @@ public class RemoteDUnitVM extends UnicastRemoteObject implements RemoteDUnitVMI
     
   }
 
-  public void shutDownVM(boolean disconnect, boolean runShutdownHook)
-      throws RemoteException {
+  public void shutDownVM() throws RemoteException {
+    ChildVM.stopVM();
   }
 
-  public void disconnectVM()
-  throws RemoteException {
+  public void disconnectVM() throws RemoteException {
   }
 }