You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:56 UTC

[08/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
new file mode 100644
index 0000000..b754254
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import static com.gemstone.gemfire.test.dunit.Wait.*;
+import static com.gemstone.gemfire.test.dunit.IgnoredException.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class ParallelWANStatsDUnitTest extends WANTestBase{
+  
+  private static final int NUM_PUTS = 100;
+  private static final long serialVersionUID = 1L;
+  
+  private String testName;
+  
+  public ParallelWANStatsDUnitTest() {
+    super();
+  }
+
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    this.testName = getTestMethodName();
+  }
+  
+  @Test
+  public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSendersWithConflation(lnPort);
+
+    createSenderPRs(0);
+
+    startPausedSenders();
+    
+    createReceiverPR(vm2, 1);
+    createReceiverPR(vm3, 1);
+
+    putKeyValues();
+
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+    
+    assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+    assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); //batches distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+    
+  }
+
+  @Test
+  public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSenders(lnPort);
+
+    createReceiverPR(vm2, 0);
+   
+    createSenderPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.doPuts( testName,
+        NUM_PUTS ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, NUM_PUTS ));
+    
+    
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+    assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+    
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+  }
+  
+  @Test
+  public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSenders(lnPort);
+
+    createReceiverPR(vm2, 0);
+
+    createSenderPRs(3);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.doPuts( testName,
+        NUM_PUTS ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, NUM_PUTS ));
+    
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+    assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+    
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+  }
+  
+  @Test
+  public void testWANStatsTwoWanSites_Bug44331() throws Exception {
+    Integer lnPort = createFirstLocatorWithDSId(1);
+    Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createCacheInVMs(tkPort, vm3);
+    createReceiverInVMs(vm2);
+    createReceiverInVMs(vm3);
+
+    vm4.invoke(() -> WANTestBase.createCache(lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln1",
+        2, true, 100, 10, false, false, null, true ));
+  
+    vm4.invoke(() -> WANTestBase.createSender( "ln2",
+        3, true, 100, 10, false, false, null, true ));
+  
+    createReceiverPR(vm2, 0);
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      testName, null, 0, 10, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      testName, "ln1,ln2", 0, 10, isOffHeap() ));
+    
+    vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln2" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( testName,
+        NUM_PUTS ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, NUM_PUTS ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        testName, NUM_PUTS ));
+    
+    ArrayList<Integer> v4Sender1List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln1", 0 ));
+    ArrayList<Integer> v4Sender2List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln2", 0 ));
+    
+    assertEquals(0, v4Sender1List.get(0).intValue()); //queue size
+    assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); //eventsReceived
+    assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); //events queued
+    assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); //events distributed
+    assertTrue(v4Sender1List.get(4).intValue()>=10); //batches distributed
+    assertEquals(0, v4Sender1List.get(5).intValue()); //batches redistributed
+    
+    assertEquals(0, v4Sender2List.get(0).intValue()); //queue size
+    assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); //eventsReceived
+    assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); //events queued
+    assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); //events distributed
+    assertTrue(v4Sender2List.get(4).intValue()>=10); //batches distributed
+    assertEquals(0, v4Sender2List.get(5).intValue()); //batches redistributed
+    
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+    vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+  }
+  
+  @Test
+  public void testParallelPropagationHA() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSenders(lnPort);
+    
+    createReceiverPR(vm2, 0);
+    
+    createSenderPRs(3);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName, 1000 ));
+    pause(200);
+    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+    inv1.join();
+    inv2.join();
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, 1000 ));
+    
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
+    
+    assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+    int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
+    //We may see a single retried event on all members due to the kill
+    assertTrue("Received " + receivedEvents, 3000 <= receivedEvents && 3003 >= receivedEvents); //eventsReceived
+    int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
+    assertTrue("Queued " + queuedEvents, 3000 <= queuedEvents && 3003 >= queuedEvents); //eventsQueued
+    //assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : its quite possible that vm4 has distributed some of the events
+    //assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its quite possible that vm4 has distributed some of the batches.
+    assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+  
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000 ));
+  }
+
+  /**
+   * 1 region and sender configured on local site and 1 region and a 
+   * receiver configured on remote site. Puts to the local region are in progress.
+   * Remote region is destroyed in the middle.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationWithRemoteRegionDestroy() throws Exception {
+    addIgnoredException("RegionDestroyedException");
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverPR(vm2, 0);
+    createReceiverInVMs(vm2);
+
+    createSenders(lnPort);
+
+    vm2.invoke(() -> WANTestBase.addCacheListenerAndDestroyRegion(
+        testName));
+
+    createSenderPRs(0);
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //start puts in RR_1 in another thread
+    vm4.invoke(() -> WANTestBase.doPuts( testName, 2000 ));
+    
+    //verify that all is well in local site. All the events should be present in local region
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+        testName, 2000 ));
+    
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+    
+    
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); //batches distributed : its quite possible that vm4 has distributed some of the batches.
+    assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); //batches redistributed
+  }
+
+  @Test
+  public void testParallelPropagationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+
+    createReceiverPR(vm2, 1);
+
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    createSenderPRs(0);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false,
+      new MyGatewayEventFilter(), true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false,
+      new MyGatewayEventFilter(), true ));
+  
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+
+    
+    vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, 800 ));
+    
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+    assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(900, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(800, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80); //batches distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+    assertEquals(200, v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)); //events filtered
+    
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800));
+  }
+  
+  @Test
+  public void testParallelPropagationConflation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSendersWithConflation(lnPort);
+
+    createSenderPRs(0);
+
+    startPausedSenders();
+
+    createReceiverPR(vm2, 1);
+
+    Map keyValues = putKeyValues();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0;i<50;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ ));
+    
+    // Do the puts again. Since these are updates, the previous updates will be conflated.
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, 0 ));
+
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0));
+    
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+    keyValues.putAll(updateKeyValues);
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        testName, keyValues.size() ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionContents(
+        testName, keyValues ));
+    
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 150, NUM_PUTS));
+    
+    vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 0 ));
+    
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+    
+    assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+    assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
+    assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+    assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated 
+    
+  }
+  
+  protected Map putKeyValues() {
+    final Map keyValues = new HashMap();
+    for(int i=0; i< NUM_PUTS; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues ));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() ));
+    
+    return keyValues;
+  }
+
+  protected void createReceiverPR(VM vm, int redundancy) {
+    vm.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, null, redundancy, 10, isOffHeap() ));
+  }
+  
+  protected void createSenderPRs(int redundancy) {
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", redundancy, 10, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", redundancy, 10, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", redundancy, 10, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        testName, "ln", redundancy, 10, isOffHeap() ));
+  }
+
+  protected void startPausedSenders() {
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+    
+    vm4.invoke(() ->pauseSender( "ln" ));
+    vm5.invoke(() ->pauseSender( "ln" ));
+    vm6.invoke(() ->pauseSender( "ln" ));
+    vm7.invoke(() ->pauseSender( "ln" ));
+  }
+
+  protected void createSendersWithConflation(Integer lnPort) {
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, true, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, true, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, true, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, true, false, null, true ));
+  }
+
+  protected void createSenders(Integer lnPort) {
+    vm4.invoke(() -> WANTestBase.createCache(lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache(lnPort ));
+    vm6.invoke(() -> WANTestBase.createCache(lnPort ));
+    vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
new file mode 100644
index 0000000..b38d35b
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.*;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+//The tests here are to validate changes introduced because a distributed deadlock
+//was found that caused issues for a production customer. 
+//
+//There are 4 tests which use sender gateways with primaries on different 
+//JVM's. Two tests use replicated and two use partition regions and the
+//the tests vary the conserve-sockets.  
+//
+//currently the 4th test using PR, conserve-sockets=true hangs/fails and is commented
+//out to prevent issues
+@Category(DistributedTest.class)
+public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase {
+
+    public SerialGatewaySenderDistributedDeadlockDUnitTest() {
+        super();
+    }
+
+    //Uses replicated regions and conserve-sockets=false
+  @Test
+  public void testPrimarySendersOnDifferentVMsReplicated() throws Exception {
+
+        Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+        Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+        createCachesWith(Boolean.FALSE, nyPort, lnPort);
+
+        createSerialSenders();
+
+        createReplicatedRegions(nyPort);
+
+        //get one primary sender on vm4 and another primary on vm5
+        //the startup order matters here
+        startSerialSenders();
+
+        //exercise region and gateway operations with different messaging
+        exerciseWANOperations();
+        AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+        AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+        AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+        AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+        exerciseFunctions();
+
+        try {
+            invVM4transaction.join();
+            invVM5transaction.join();
+            invVM4.join();
+            invVM5.join();
+
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    //Uses partitioned regions and conserve-sockets=false
+  @Test
+  public void testPrimarySendersOnDifferentVMsPR() throws Exception {
+        Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+        Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+        createCachesWith(Boolean.FALSE, nyPort, lnPort);
+
+        createSerialSenders();
+
+        createPartitionedRegions(nyPort);
+
+        startSerialSenders();
+
+        exerciseWANOperations();
+        AsyncInvocation invVM4transaction
+                = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+        AsyncInvocation invVM5transaction
+                = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+
+        AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+        AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+        exerciseFunctions();
+
+        try {
+            invVM4transaction.join();
+            invVM5transaction.join();
+            invVM4.join();
+            invVM5.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    //Uses replicated regions and conserve-sockets=true
+  @Test
+  public void testPrimarySendersOnDifferentVMsReplicatedSocketPolicy() throws Exception {
+
+        Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+        Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+        createCachesWith(Boolean.TRUE, nyPort, lnPort);
+
+        createSerialSenders();
+
+        createReplicatedRegions(nyPort);
+
+        //get one primary sender on vm4 and another primary on vm5
+        //the startup order matters here
+        startSerialSenders();
+
+        //exercise region and gateway operations with messaging
+        exerciseWANOperations();
+        AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+        AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+
+        AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+        AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+        exerciseFunctions();
+
+        try {
+            invVM4transaction.join();
+            invVM5transaction.join();
+            invVM4.join();
+            invVM5.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    //Uses partitioned regions and conserve-sockets=true
+    //this always causes a distributed deadlock
+  @Test
+  public void testPrimarySendersOnDifferentVMsPRSocketPolicy() throws Exception {
+        Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+        Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+        createCachesWith(Boolean.TRUE, nyPort, lnPort);
+
+        createSerialSenders();
+
+        createPartitionedRegions(nyPort);
+
+        startSerialSenders();
+
+        exerciseWANOperations();
+        AsyncInvocation invVM4transaction
+                = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+        AsyncInvocation invVM5transaction
+                = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+
+        AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+        AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+        exerciseFunctions();
+
+        try {
+            invVM4transaction.join();
+            invVM5transaction.join();
+            invVM4.join();
+            invVM5.join();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    //**************************************************************************
+    //Utility methods used by tests
+    //**************************************************************************
+    private void createReplicatedRegions(Integer nyPort) throws Exception {
+        //create receiver
+        vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", null, false));
+        vm2.invoke(() -> WANTestBase.createReceiver());
+
+        //create senders
+        vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1,ln2", false));
+
+        vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1,ln2", false));
+    }
+
+    private void createCachesWith(Boolean socketPolicy, Integer nyPort, Integer lnPort) {
+        vm2.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, nyPort));
+
+        vm4.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
+
+        vm5.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
+    }
+
+    private void exerciseFunctions() throws Exception {
+        //do function calls that use a shared connection
+        for (int x = 0; x < 1000; x++) {
+            //setting it to Boolean.TRUE it should pass the test
+            vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
+            vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
+        }
+        for (int x = 0; x < 1000; x++) {
+            //setting the Boolean.FALSE below will cause a deadlock in some GFE versions
+            //setting it to Boolean.TRUE as above it should pass the test
+            //this is similar to the customer found distributed deadlock
+            vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
+            vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
+        }
+    }
+
+    private void createPartitionedRegions(Integer nyPort) throws Exception {
+        //create remote receiver
+        vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR",
+                                                                                "", 0, 113, false));
+
+        vm2.invoke(() -> WANTestBase.createReceiver());
+
+        //create sender vms
+        vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+          getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false));
+
+        vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+          getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false));
+    }
+
+    private void exerciseWANOperations() throws Exception {
+        //note - some of these should be made async to truly exercise the 
+        //messaging between the WAN gateways and members
+
+        //exercise region and gateway operations
+        vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+        vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+        Wait.pause(2000); //wait for events to propagate
+        vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+        vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+        vm5.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 100));
+        Wait.pause(2000);//wait for events to propagate
+        vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+        vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+        vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+        vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+        Wait.pause(2000); //wait for events to propagate
+        vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+        vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+        vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doInvalidates(getTestMethodName() + "_RR", 100, 100));
+        vm4.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
+        vm5.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
+        Wait.pause(2000);//wait for events to propagate
+        vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
+        vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
+        vm4.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 1000));
+        Wait.pause(2000);//wait for events to propagate
+        vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+        vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+        vm4.invoke(() -> WANTestBase.doPutsPDXSerializable(getTestMethodName() + "_RR", 100));
+        Wait.pause(2000);
+        vm5.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
+        vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
+    }
+
+    private void startSerialSenders() throws Exception {
+        //get one primary sender on vm4 and another primary on vm5
+        //the startup order matters here so that primaries are
+        //on different JVM's
+        vm4.invoke(() -> WANTestBase.startSender("ln1"));
+
+        vm5.invoke(() -> WANTestBase.startSender("ln2"));
+
+        //start secondaries
+        vm5.invoke(() -> WANTestBase.startSender("ln1"));
+
+        vm4.invoke(() -> WANTestBase.startSender("ln2"));
+    }
+
+    private void createSerialSenders() throws Exception {
+
+        vm4.invoke(() -> WANTestBase.createSender("ln1", 2,
+            false, 100, 10, false, false, null, true));
+
+        vm5.invoke(() -> WANTestBase.createSender("ln1", 2,
+            false, 100, 10, false, false, null, true));
+
+        vm4.invoke(() -> WANTestBase.createSender("ln2", 2,
+            false, 100, 10, false, false, null, true));
+
+        vm5.invoke(() -> WANTestBase.createSender("ln2", 2,
+            false, 100, 10, false, false, null, true));
+    }
+
+    public static void doFunctionPuts(String name, int num, Boolean useThreadOwnedSocket) throws Exception {
+        Region region = CacheFactory.getAnyInstance().getRegion(name);
+        FunctionService.registerFunction(new TestFunction());
+        Execution exe = FunctionService.onRegion(region);
+        for (int x = 0; x < num; x++) {
+            exe.withArgs(useThreadOwnedSocket).execute("com.gemstone.gemfire.internal.cache.wan.serial.TestFunction");
+        }
+    }
+
+    public static void doTxPutsPR(String regionName, int numPuts, int size) throws Exception {
+        Region r = cache.getRegion(Region.SEPARATOR + regionName);
+        CacheTransactionManager mgr = cache.getCacheTransactionManager();
+        for (int x = 0; x < numPuts; x++) {
+            int temp = (int) (Math.floor(Math.random() * size));
+            try {
+                mgr.begin();
+                r.put(temp, temp);
+                mgr.commit();
+            } catch (com.gemstone.gemfire.cache.TransactionDataNotColocatedException txe) {
+                //ignore colocation issues or primary bucket issues 
+            } catch (com.gemstone.gemfire.cache.CommitConflictException cce) {
+                //ignore - conflicts are ok and expected
+            }
+        }
+    }
+
+    public static void doInvalidates(String regionName, int numInvalidates, int size) throws Exception {
+        Region r = cache.getRegion(Region.SEPARATOR + regionName);
+        for (int x = 0; x < numInvalidates; x++) {
+            int temp = (int) (Math.floor(Math.random() * size));
+            try {
+                if (r.containsValueForKey(temp)) {
+                    r.invalidate(temp);
+                }
+            } catch (com.gemstone.gemfire.cache.EntryNotFoundException entryNotFoundException) {
+                //ignore as an entry may not exist
+            }
+        }
+    }
+
+}
+
+class TestFunction implements Function {
+
+    @Override
+    public boolean hasResult() {
+        return false;
+    }
+
+    @Override
+    public void execute(FunctionContext fc) {
+        boolean option = (Boolean) fc.getArguments();
+        if (option) {
+            DistributedSystem.setThreadsSocketPolicy(false);
+        }
+        RegionFunctionContext context = (RegionFunctionContext) fc;
+        Region local = context.getDataSet();
+        local.put(randKeyValue(10), randKeyValue(10000));
+    }
+
+    @Override
+    public String getId() {
+        return this.getClass().getName();
+    }
+
+    @Override
+    public boolean optimizeForWrite() {
+        return false;
+    }
+
+    @Override
+    public boolean isHA() {
+        return false;
+    }
+
+    private int randKeyValue(int size) {
+        double temp = Math.floor(Math.random() * size);
+        return (int) temp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
new file mode 100644
index 0000000..8c255c1
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import com.jayway.awaitility.Awaitility;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener;
+import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener2;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public SerialGatewaySenderEventListenerDUnitTest() {
+    super();
+  }
+
+  /**
+   * Test validates whether the listener attached receives all the events.
+   * this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang 
+   */
+  @Ignore
+  @Test
+  public void testGatewaySenderEventListenerInvocationWithoutLocator() {
+    int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+    vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+    vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+    vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+
+    vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, false, true));
+    vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, false, true));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    final Map keyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+      keyValues ));
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_RR", keyValues.size() ));
+    
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_RR", keyValues.size() ));
+    
+    vm4.invoke(() -> WANTestBase.printEventListenerMap());
+    vm5.invoke(() -> WANTestBase.printEventListenerMap());
+
+    fail("tried to invoke missing method");
+//    vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues ));
+  }
+  
+  /**
+   * Test validates whether the listener attached receives all the events.
+   */
+  @Test
+  public void testGatewaySenderEventListenerInvocation() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, false, true));
+    vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, false, true));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    final Map keyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+      keyValues ));
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_RR", keyValues.size() ));
+    
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_RR", keyValues.size() ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 0 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 0 ));
+
+    vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues ));
+  }
+
+  /**
+   * Test validates whether the listener attached receives all the events. 
+   * When there are 2 listeners attached to the GatewaySender.
+   */
+  @Test
+  public void testGatewaySender2EventListenerInvocation() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, true, true));
+    vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, true, true));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    final Map keyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+      keyValues ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 0 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 0 ));
+
+    // TODO: move validateReceivedEventsMapSizeListener2 to a shared util class
+    vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener2("ln", keyValues ));
+  }
+  
+  /**
+   * Test validates whether the PoolImpl is created. Ideally when a listener is attached
+   * pool should not be created.
+   */
+  @Test
+  public void testGatewaySenderEventListenerPoolImpl() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4);
+
+    vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, false, false ));
+    
+    vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateNoPoolCreation("ln" ));
+  }
+  
+  // Test start/stop/resume on listener invocation
+  //this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang
+  @Ignore
+  @Test
+  public void testGatewaySenderEventListener_GatewayOperations() {
+
+    int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+    vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+    vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+    vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+
+    vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+        false, 100, 10, false, false, null, false, true));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    final Map initialKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      initialKeyValues.put(i, i);
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+      initialKeyValues ));
+
+    fail("tried to invoke missing method");
+//    vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues ));
+    
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    
+    final Map keyValues = new HashMap();
+    for(int i=1000; i< 2000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+      keyValues ));
+    
+    fail("tried to invoke missing method");
+//    vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    final Map finalKeyValues = new HashMap();
+    for(int i=2000; i< 3000; i++) {
+      finalKeyValues.put(i, i);
+    }
+    
+    vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+      finalKeyValues ));
+    
+    finalKeyValues.putAll(initialKeyValues);
+    fail("tried to invoke missing method");
+//    vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", finalKeyValues ));
+    
+  }
+    
+  public static void validateNoPoolCreation(final String siteId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    for(GatewaySender sender: senders) {
+      if (sender.getId().equals(siteId)) {
+        AbstractGatewaySender sImpl = (AbstractGatewaySender)sender;
+        assertNull(sImpl.getProxy());
+      }
+    }
+  }
+  
+  public static void validateReceivedEventsMapSizeListener1(final String senderId, final Map map) {
+
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    
+    final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners();
+    if(listeners.size() == 1) {
+      final AsyncEventListener l = listeners.get(0);
+
+      WaitCriterion wc = new WaitCriterion() {
+        Map listenerMap;
+        public boolean done() {
+          listenerMap = ((MyGatewaySenderEventListener)l)
+              .getEventsMap();
+          boolean sizeCorrect = map.size() == listenerMap.size();
+          boolean keySetCorrect = listenerMap.keySet().containsAll(map.keySet());
+          boolean valuesCorrect = listenerMap.values().containsAll(map.values());
+          return sizeCorrect && keySetCorrect && valuesCorrect;
+        }
+
+        public String description() {
+          return "Waiting for all sites to get updated, the sizes are " + listenerMap.size() + " and " + map.size();
+        }
+      };
+      Wait.waitForCriterion(wc, 60000, 500, true); 
+    }
+  }
+  
+  public static void validateReceivedEventsMapSizeListener2(final String senderId, final Map map) {
+
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    
+    final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners();
+    if(listeners.size() == 2) {
+      final AsyncEventListener l1 = listeners.get(0);
+      final AsyncEventListener l2 = listeners.get(1);
+      Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS).until(()->{
+        Map listenerMap1 = ((MyGatewaySenderEventListener)l1)
+          .getEventsMap();
+
+        Map listenerMap2 = ((MyGatewaySenderEventListener2)l2)
+          .getEventsMap();
+        int listener1MapSize = listenerMap1.size();
+        int listener2MapSize = listenerMap1.size();
+        int expectedMapSize = map.size();
+        boolean sizeCorrect = expectedMapSize == listener1MapSize;
+        boolean keySetCorrect = listenerMap1.keySet().containsAll(map.keySet());
+        boolean valuesCorrect = listenerMap1.values().containsAll(map.values());
+
+        boolean sizeCorrect2 = expectedMapSize== listener2MapSize;
+        boolean keySetCorrect2 = listenerMap2.keySet().containsAll(map.keySet());
+        boolean valuesCorrect2 = listenerMap2.values().containsAll(map.values());
+
+        assertEquals("Failed while waiting for all sites to get updated with the correct events. \nThe " +
+                     "size of listener 1's map = "+ listener1MapSize + "\n The size of listener 2's map = " +
+                     ""+ listener2MapSize + "\n The expected map size =" + expectedMapSize ,
+          true, sizeCorrect && keySetCorrect && valuesCorrect && sizeCorrect2 && keySetCorrect2 && valuesCorrect2);
+      });
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
new file mode 100644
index 0000000..200e5b6
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
+import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.ServerLocator;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.RMIException;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * 
+ */
+@Category(DistributedTest.class)
+public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public SerialGatewaySenderOperationsDUnitTest() {
+    super();
+  }
+
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    IgnoredException.addIgnoredException("Broken pipe");
+    IgnoredException.addIgnoredException("Connection reset");
+    IgnoredException.addIgnoredException("Unexpected IOException");
+    IgnoredException.addIgnoredException("Connection refused");
+    IgnoredException.addIgnoredException("could not get remote locator information");
+    
+    //Stopping the gateway closed the region,
+    //which causes this exception to get logged
+    IgnoredException.addIgnoredException(RegionDestroyedException.class.getSimpleName());
+  }
+
+  @Test
+  public void testSerialGatewaySenderOperationsWithoutStarting() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        100 ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" ));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" ));
+
+  }
+
+  protected void createSenderRegions() {
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+  }
+
+  protected void createReceiverRegions() {
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+  }
+
+  protected void createSenderCaches(Integer lnPort) {
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+  }
+
+  protected void createSenderVM5() {
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+  }
+
+  protected void createSenderVM4() {
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+  }
+
+  
+  @Test
+  public void testStartPauseResumeSerialGatewaySender() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        100 ));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" ));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" ));
+
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10 ));
+
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail("Interrupted the async invocation.");
+    }
+
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    validateQueueContents(vm4, "ln", 0);
+    validateQueueContents(vm5, "ln", 0);
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 100 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 100 ));
+
+  }
+
+  @Test
+  public void testStopSerialGatewaySender() throws Throwable {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        20 ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        20 ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 20 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 20 ));
+    
+    vm2.invoke(() -> WANTestBase.stopReceivers());
+    vm3.invoke(() -> WANTestBase.stopReceivers());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        20 ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+    
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+    /**
+     * Should have no effect on GatewaySenderState
+     */
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+    AsyncInvocation vm4async = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    AsyncInvocation vm5async = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    int START_WAIT_TIME = 30000;
+    vm4async.getResult(START_WAIT_TIME);
+    vm5async.getResult(START_WAIT_TIME);
+
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+
+    vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+      110 ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 ));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 ));
+    
+    vm2.invoke(() -> WANTestBase.startReceivers());
+    vm3.invoke(() -> WANTestBase.startReceivers());
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_RR", 110 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_RR", 110 ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+  }
+
+  @Test
+  public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        100 ));
+
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        200 ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 200 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 200 ));
+    
+    //Do some puts while restarting a sender
+    AsyncInvocation asyncPuts = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        300 ));
+    
+    Thread.sleep(10);
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+    
+    asyncPuts.getResult();
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 300 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 300 ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+    
+    
+  }
+
+  @Test
+  public void testStopOneSerialGatewaySender_PrimarySecondary() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        100 ));
+    
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 100 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 100 ));
+  }
+  
+  @Test
+  public void testStopOneSender_StartAnotherSender() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    createCacheInVMs(lnPort, vm4);
+    createSenderVM4();
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    createSenderVM5();
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        100 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 100 ));
+  }
+
+  @Test
+  public void test_Bug44153_StopOneSender_StartAnotherSender_CheckQueueSize() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    createSenderVM4();
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+    validateQueueContents(vm4, "ln", 10);
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+    vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    createSenderVM5();
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_RR", 10, 110 ));
+
+    validateQueueContents(vm5, "ln", 100);
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+    validateQueueContents(vm4, "ln", 10);
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 100 ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 110 ));
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+  }
+
+  private void validateQueueContents(VM vm, String site, int size) {
+    vm.invoke(() -> WANTestBase.validateQueueContents( site,
+        size ));
+  }
+
+  /**
+   * Destroy SerialGatewaySender on all the nodes.
+   */
+  @Test
+  public void testDestroySerialGatewaySenderOnAllNodes() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+    
+    //before destroying, stop the sender
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
+    vm5.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
+
+    vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
+    vm5.invoke(() -> WANTestBase.destroySender( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
+    vm5.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
+  }
+
+  /**
+   * Destroy SerialGatewaySender on a single node.
+   */
+  @Test
+  public void testDestroySerialGatewaySenderOnSingleNode() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+    
+    //before destroying, stop the sender
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+        
+    vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
+    
+    vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
+    vm5.invoke(() -> WANTestBase.verifySenderRunningState( "ln" ));
+  }
+  
+  /**
+   * Since the sender is attached to a region and in use, it can not be destroyed.
+   * Hence, exception is thrown by the sender API.
+   */
+  @Test
+  public void testDestroySerialGatewaySenderExceptionScenario() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createSenderCaches(lnPort);
+
+    createSenderVM4();
+    createSenderVM5();
+
+    createReceiverRegions();
+
+    createSenderRegions();
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        10 ));
+    
+    try {
+      vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
+    } catch (RMIException e) {
+      assertTrue("Cause of the exception should be GatewaySenderException", e.getCause() instanceof GatewaySenderException);
+    }
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 10 ));
+  }
+
+  
+  @Test
+  public void testGatewaySenderNotRegisteredAsCacheServer() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    SerializableRunnable check = new SerializableRunnable("assert no cache servers") {
+      public void run() {
+        InternalLocator inl = (InternalLocator)Locator.getLocator();
+        ServerLocator server = inl.getServerLocatorAdvisee();
+        LogWriterUtils.getLogWriter().info("Server load map is " + server.getLoadMap());
+        assertTrue("expected an empty map but found " + server.getLoadMap(),
+            server.getLoadMap().isEmpty());
+        QueueConnectionRequest request = new QueueConnectionRequest(
+            ClientProxyMembershipID.getNewProxyMembership(InternalDistributedSystem.getConnectedInstance()),
+            1, new HashSet<>(), "", false);
+        QueueConnectionResponse response = (QueueConnectionResponse)server.processRequest(request);
+        assertTrue("expected no servers but found " + response.getServers(),
+            response.getServers().isEmpty());
+      }
+    };
+    vm0.invoke(check);
+    vm1.invoke(check);
+    
+  }
+  
+
+  
+  public static void verifySenderPausedState(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender)s;
+        break;
+      }
+    }
+    assertTrue(sender.isPaused());
+  }
+
+  public static void verifySenderResumedState(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender)s;
+        break;
+      }
+    }
+    assertFalse(sender.isPaused());
+    assertTrue(sender.isRunning());
+  }
+
+  public static void verifySenderStoppedState(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    AbstractGatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = (AbstractGatewaySender)s;
+        break;
+      }
+    }
+    assertFalse(sender.isRunning());
+    assertFalse(sender.isPaused());
+  }
+
+  public static void verifyGatewaySenderOperations(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    assertFalse(sender.isPaused());
+    assertFalse(((AbstractGatewaySender)sender).isRunning());
+    sender.pause();
+    sender.resume();
+    sender.stop();
+  }
+}