You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:56 UTC
[08/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
new file mode 100644
index 0000000..b754254
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import static com.gemstone.gemfire.test.dunit.Wait.*;
+import static com.gemstone.gemfire.test.dunit.IgnoredException.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class ParallelWANStatsDUnitTest extends WANTestBase{
+
+ private static final int NUM_PUTS = 100;
+ private static final long serialVersionUID = 1L;
+
+ private String testName;
+
+ public ParallelWANStatsDUnitTest() {
+ super();
+ }
+
+ @Override
+ protected final void postSetUpWANTestBase() throws Exception {
+ this.testName = getTestMethodName();
+ }
+
+ @Test
+ public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSendersWithConflation(lnPort);
+
+ createSenderPRs(0);
+
+ startPausedSenders();
+
+ createReceiverPR(vm2, 1);
+ createReceiverPR(vm3, 1);
+
+ putKeyValues();
+
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
+
+ assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+ assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); //batches distributed
+ assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+
+ }
+
+ @Test
+ public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSenders(lnPort);
+
+ createReceiverPR(vm2, 0);
+
+ createSenderPRs(0);
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.doPuts( testName,
+ NUM_PUTS ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ testName, NUM_PUTS ));
+
+
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+
+ assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+ assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
+ assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+ }
+
+ @Test
+ public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSenders(lnPort);
+
+ createReceiverPR(vm2, 0);
+
+ createSenderPRs(3);
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.doPuts( testName,
+ NUM_PUTS ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ testName, NUM_PUTS ));
+
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+
+ assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+ assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
+ assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+ }
+
+ @Test
+ public void testWANStatsTwoWanSites_Bug44331() throws Exception {
+ Integer lnPort = createFirstLocatorWithDSId(1);
+ Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createCacheInVMs(tkPort, vm3);
+ createReceiverInVMs(vm2);
+ createReceiverInVMs(vm3);
+
+ vm4.invoke(() -> WANTestBase.createCache(lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln1",
+ 2, true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln2",
+ 3, true, 100, 10, false, false, null, true ));
+
+ createReceiverPR(vm2, 0);
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ testName, null, 0, 10, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ testName, "ln1,ln2", 0, 10, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln2" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( testName,
+ NUM_PUTS ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ testName, NUM_PUTS ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ testName, NUM_PUTS ));
+
+ ArrayList<Integer> v4Sender1List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln1", 0 ));
+ ArrayList<Integer> v4Sender2List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln2", 0 ));
+
+ assertEquals(0, v4Sender1List.get(0).intValue()); //queue size
+ assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); //eventsReceived
+ assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); //events queued
+ assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); //events distributed
+ assertTrue(v4Sender1List.get(4).intValue()>=10); //batches distributed
+ assertEquals(0, v4Sender1List.get(5).intValue()); //batches redistributed
+
+ assertEquals(0, v4Sender2List.get(0).intValue()); //queue size
+ assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); //eventsReceived
+ assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); //events queued
+ assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); //events distributed
+ assertTrue(v4Sender2List.get(4).intValue()>=10); //batches distributed
+ assertEquals(0, v4Sender2List.get(5).intValue()); //batches redistributed
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+ vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
+ }
+
+ @Test
+ public void testParallelPropagationHA() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSenders(lnPort);
+
+ createReceiverPR(vm2, 0);
+
+ createSenderPRs(3);
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName, 1000 ));
+ pause(200);
+ AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+ inv1.join();
+ inv2.join();
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ testName, 1000 ));
+
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
+
+ assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+ int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
+ //We may see a single retried event on all members due to the kill
+ assertTrue("Received " + receivedEvents, 3000 <= receivedEvents && 3003 >= receivedEvents); //eventsReceived
+ int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
+ assertTrue("Queued " + queuedEvents, 3000 <= queuedEvents && 3003 >= queuedEvents); //eventsQueued
+ //assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : its quite possible that vm4 has distributed some of the events
+ //assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its quite possible that vm4 has distributed some of the batches.
+ assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000 ));
+ }
+
+ /**
+ * 1 region and sender configured on local site and 1 region and a
+ * receiver configured on remote site. Puts to the local region are in progress.
+ * Remote region is destroyed in the middle.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagationWithRemoteRegionDestroy() throws Exception {
+ addIgnoredException("RegionDestroyedException");
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverPR(vm2, 0);
+ createReceiverInVMs(vm2);
+
+ createSenders(lnPort);
+
+ vm2.invoke(() -> WANTestBase.addCacheListenerAndDestroyRegion(
+ testName));
+
+ createSenderPRs(0);
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //start puts in RR_1 in another thread
+ vm4.invoke(() -> WANTestBase.doPuts( testName, 2000 ));
+
+ //verify that all is well in local site. All the events should be present in local region
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ testName, 2000 ));
+
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
+
+
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); //batches distributed : its quite possible that vm4 has distributed some of the batches.
+ assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); //batches redistributed
+ }
+
+ @Test
+ public void testParallelPropagationWithFilter() throws Exception {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+
+ createReceiverPR(vm2, 1);
+
+ createReceiverInVMs(vm2);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ createSenderPRs(0);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+
+
+ vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ testName, 800 ));
+
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+
+ assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+ assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(900, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(800, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80); //batches distributed
+ assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+ assertEquals(200, v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)); //events filtered
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800));
+ }
+
+ @Test
+ public void testParallelPropagationConflation() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSendersWithConflation(lnPort);
+
+ createSenderPRs(0);
+
+ startPausedSenders();
+
+ createReceiverPR(vm2, 1);
+
+ Map keyValues = putKeyValues();
+ final Map updateKeyValues = new HashMap();
+ for(int i=0;i<50;i++) {
+ updateKeyValues.put(i, i+"_updated");
+ }
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ ));
+
+ // Do the puts again. Since these are updates, the previous updates will be conflated.
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ testName, 0 ));
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0));
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ keyValues.putAll(updateKeyValues);
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ testName, keyValues.size() ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionContents(
+ testName, keyValues ));
+
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 150, NUM_PUTS));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 0 ));
+
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
+
+ assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+ assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
+ assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
+ assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated
+
+ }
+
+ protected Map putKeyValues() {
+ final Map keyValues = new HashMap();
+ for(int i=0; i< NUM_PUTS; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues ));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() ));
+
+ return keyValues;
+ }
+
+ protected void createReceiverPR(VM vm, int redundancy) {
+ vm.invoke(() -> WANTestBase.createPartitionedRegion(
+ testName, null, redundancy, 10, isOffHeap() ));
+ }
+
+ protected void createSenderPRs(int redundancy) {
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ testName, "ln", redundancy, 10, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ testName, "ln", redundancy, 10, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ testName, "ln", redundancy, 10, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ testName, "ln", redundancy, 10, isOffHeap() ));
+ }
+
+ protected void startPausedSenders() {
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() ->pauseSender( "ln" ));
+ vm5.invoke(() ->pauseSender( "ln" ));
+ vm6.invoke(() ->pauseSender( "ln" ));
+ vm7.invoke(() ->pauseSender( "ln" ));
+ }
+
+ protected void createSendersWithConflation(Integer lnPort) {
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, true, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, true, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, true, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, true, false, null, true ));
+ }
+
+ protected void createSenders(Integer lnPort) {
+ vm4.invoke(() -> WANTestBase.createCache(lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache(lnPort ));
+ vm6.invoke(() -> WANTestBase.createCache(lnPort ));
+ vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
new file mode 100644
index 0000000..b38d35b
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.execute.*;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+//The tests here are to validate changes introduced because a distributed deadlock
+//was found that caused issues for a production customer.
+//
+//There are 4 tests which use sender gateways with primaries on different
+//JVM's. Two tests use replicated and two use partition regions and the
+//the tests vary the conserve-sockets.
+//
+//currently the 4th test using PR, conserve-sockets=true hangs/fails and is commented
+//out to prevent issues
+@Category(DistributedTest.class)
+public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase {
+
+ public SerialGatewaySenderDistributedDeadlockDUnitTest() {
+ super();
+ }
+
+ //Uses replicated regions and conserve-sockets=false
+ @Test
+ public void testPrimarySendersOnDifferentVMsReplicated() throws Exception {
+
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCachesWith(Boolean.FALSE, nyPort, lnPort);
+
+ createSerialSenders();
+
+ createReplicatedRegions(nyPort);
+
+ //get one primary sender on vm4 and another primary on vm5
+ //the startup order matters here
+ startSerialSenders();
+
+ //exercise region and gateway operations with different messaging
+ exerciseWANOperations();
+ AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+ AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+ AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+ AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+ exerciseFunctions();
+
+ try {
+ invVM4transaction.join();
+ invVM5transaction.join();
+ invVM4.join();
+ invVM5.join();
+
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ //Uses partitioned regions and conserve-sockets=false
+ @Test
+ public void testPrimarySendersOnDifferentVMsPR() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCachesWith(Boolean.FALSE, nyPort, lnPort);
+
+ createSerialSenders();
+
+ createPartitionedRegions(nyPort);
+
+ startSerialSenders();
+
+ exerciseWANOperations();
+ AsyncInvocation invVM4transaction
+ = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+ AsyncInvocation invVM5transaction
+ = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+
+ AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+ AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+ exerciseFunctions();
+
+ try {
+ invVM4transaction.join();
+ invVM5transaction.join();
+ invVM4.join();
+ invVM5.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ //Uses replicated regions and conserve-sockets=true
+ @Test
+ public void testPrimarySendersOnDifferentVMsReplicatedSocketPolicy() throws Exception {
+
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCachesWith(Boolean.TRUE, nyPort, lnPort);
+
+ createSerialSenders();
+
+ createReplicatedRegions(nyPort);
+
+ //get one primary sender on vm4 and another primary on vm5
+ //the startup order matters here
+ startSerialSenders();
+
+ //exercise region and gateway operations with messaging
+ exerciseWANOperations();
+ AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+ AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+
+ AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+ AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+ exerciseFunctions();
+
+ try {
+ invVM4transaction.join();
+ invVM5transaction.join();
+ invVM4.join();
+ invVM5.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ //Uses partitioned regions and conserve-sockets=true
+ //this always causes a distributed deadlock
+ @Test
+ public void testPrimarySendersOnDifferentVMsPRSocketPolicy() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
+
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCachesWith(Boolean.TRUE, nyPort, lnPort);
+
+ createSerialSenders();
+
+ createPartitionedRegions(nyPort);
+
+ startSerialSenders();
+
+ exerciseWANOperations();
+ AsyncInvocation invVM4transaction
+ = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+ AsyncInvocation invVM5transaction
+ = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
+
+ AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+ AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+
+ exerciseFunctions();
+
+ try {
+ invVM4transaction.join();
+ invVM5transaction.join();
+ invVM4.join();
+ invVM5.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ //**************************************************************************
+ //Utility methods used by tests
+ //**************************************************************************
+ private void createReplicatedRegions(Integer nyPort) throws Exception {
+ //create receiver
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, false));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ //create senders
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1,ln2", false));
+
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1,ln2", false));
+ }
+
+ private void createCachesWith(Boolean socketPolicy, Integer nyPort, Integer lnPort) {
+ vm2.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, nyPort));
+
+ vm4.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
+
+ vm5.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
+ }
+
+ private void exerciseFunctions() throws Exception {
+ //do function calls that use a shared connection
+ for (int x = 0; x < 1000; x++) {
+ //setting it to Boolean.TRUE it should pass the test
+ vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
+ vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
+ }
+ for (int x = 0; x < 1000; x++) {
+ //setting the Boolean.FALSE below will cause a deadlock in some GFE versions
+ //setting it to Boolean.TRUE as above it should pass the test
+ //this is similar to the customer found distributed deadlock
+ vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
+ vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
+ }
+ }
+
+ private void createPartitionedRegions(Integer nyPort) throws Exception {
+ //create remote receiver
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR",
+ "", 0, 113, false));
+
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ //create sender vms
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false));
+
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false));
+ }
+
+ private void exerciseWANOperations() throws Exception {
+ //note - some of these should be made async to truly exercise the
+ //messaging between the WAN gateways and members
+
+ //exercise region and gateway operations
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+ vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+ Wait.pause(2000); //wait for events to propagate
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+ vm5.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 100));
+ Wait.pause(2000);//wait for events to propagate
+ vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+ vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+ vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
+ Wait.pause(2000); //wait for events to propagate
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
+ vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doInvalidates(getTestMethodName() + "_RR", 100, 100));
+ vm4.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
+ vm5.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
+ Wait.pause(2000);//wait for events to propagate
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
+ vm4.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 1000));
+ Wait.pause(2000);//wait for events to propagate
+ vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
+ vm4.invoke(() -> WANTestBase.doPutsPDXSerializable(getTestMethodName() + "_RR", 100));
+ Wait.pause(2000);
+ vm5.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
+ vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
+ }
+
+ private void startSerialSenders() throws Exception {
+ //get one primary sender on vm4 and another primary on vm5
+ //the startup order matters here so that primaries are
+ //on different JVM's
+ vm4.invoke(() -> WANTestBase.startSender("ln1"));
+
+ vm5.invoke(() -> WANTestBase.startSender("ln2"));
+
+ //start secondaries
+ vm5.invoke(() -> WANTestBase.startSender("ln1"));
+
+ vm4.invoke(() -> WANTestBase.startSender("ln2"));
+ }
+
+ private void createSerialSenders() throws Exception {
+
+ vm4.invoke(() -> WANTestBase.createSender("ln1", 2,
+ false, 100, 10, false, false, null, true));
+
+ vm5.invoke(() -> WANTestBase.createSender("ln1", 2,
+ false, 100, 10, false, false, null, true));
+
+ vm4.invoke(() -> WANTestBase.createSender("ln2", 2,
+ false, 100, 10, false, false, null, true));
+
+ vm5.invoke(() -> WANTestBase.createSender("ln2", 2,
+ false, 100, 10, false, false, null, true));
+ }
+
+ public static void doFunctionPuts(String name, int num, Boolean useThreadOwnedSocket) throws Exception {
+ Region region = CacheFactory.getAnyInstance().getRegion(name);
+ FunctionService.registerFunction(new TestFunction());
+ Execution exe = FunctionService.onRegion(region);
+ for (int x = 0; x < num; x++) {
+ exe.withArgs(useThreadOwnedSocket).execute("com.gemstone.gemfire.internal.cache.wan.serial.TestFunction");
+ }
+ }
+
+ public static void doTxPutsPR(String regionName, int numPuts, int size) throws Exception {
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ CacheTransactionManager mgr = cache.getCacheTransactionManager();
+ for (int x = 0; x < numPuts; x++) {
+ int temp = (int) (Math.floor(Math.random() * size));
+ try {
+ mgr.begin();
+ r.put(temp, temp);
+ mgr.commit();
+ } catch (com.gemstone.gemfire.cache.TransactionDataNotColocatedException txe) {
+ //ignore colocation issues or primary bucket issues
+ } catch (com.gemstone.gemfire.cache.CommitConflictException cce) {
+ //ignore - conflicts are ok and expected
+ }
+ }
+ }
+
+ public static void doInvalidates(String regionName, int numInvalidates, int size) throws Exception {
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ for (int x = 0; x < numInvalidates; x++) {
+ int temp = (int) (Math.floor(Math.random() * size));
+ try {
+ if (r.containsValueForKey(temp)) {
+ r.invalidate(temp);
+ }
+ } catch (com.gemstone.gemfire.cache.EntryNotFoundException entryNotFoundException) {
+ //ignore as an entry may not exist
+ }
+ }
+ }
+
+}
+
+class TestFunction implements Function {
+
+ @Override
+ public boolean hasResult() {
+ return false;
+ }
+
+ @Override
+ public void execute(FunctionContext fc) {
+ boolean option = (Boolean) fc.getArguments();
+ if (option) {
+ DistributedSystem.setThreadsSocketPolicy(false);
+ }
+ RegionFunctionContext context = (RegionFunctionContext) fc;
+ Region local = context.getDataSet();
+ local.put(randKeyValue(10), randKeyValue(10000));
+ }
+
+ @Override
+ public String getId() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ private int randKeyValue(int size) {
+ double temp = Math.floor(Math.random() * size);
+ return (int) temp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
new file mode 100644
index 0000000..8c255c1
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import com.jayway.awaitility.Awaitility;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener;
+import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener2;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ public SerialGatewaySenderEventListenerDUnitTest() {
+ super();
+ }
+
+ /**
+ * Test validates whether the listener attached receives all the events.
+ * this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang
+ */
+ @Ignore
+ @Test
+ public void testGatewaySenderEventListenerInvocationWithoutLocator() {
+ int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+ vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+ vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+ vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+
+ vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, false, true));
+ vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, false, true));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ final Map keyValues = new HashMap();
+ for(int i=0; i< 1000; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+ keyValues ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", keyValues.size() ));
+
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", keyValues.size() ));
+
+ vm4.invoke(() -> WANTestBase.printEventListenerMap());
+ vm5.invoke(() -> WANTestBase.printEventListenerMap());
+
+ fail("tried to invoke missing method");
+// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues ));
+ }
+
+ /**
+ * Test validates whether the listener attached receives all the events.
+ */
+ @Test
+ public void testGatewaySenderEventListenerInvocation() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, false, true));
+ vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, false, true));
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ final Map keyValues = new HashMap();
+ for(int i=0; i< 1000; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+ keyValues ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", keyValues.size() ));
+
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", keyValues.size() ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 0 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 0 ));
+
+ vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues ));
+ }
+
+ /**
+ * Test validates whether the listener attached receives all the events.
+ * When there are 2 listeners attached to the GatewaySender.
+ */
+ @Test
+ public void testGatewaySender2EventListenerInvocation() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, true, true));
+ vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, true, true));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ final Map keyValues = new HashMap();
+ for(int i=0; i< 1000; i++) {
+ keyValues.put(i, i);
+ }
+
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+ keyValues ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 0 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 0 ));
+
+ // TODO: move validateReceivedEventsMapSizeListener2 to a shared util class
+ vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener2("ln", keyValues ));
+ }
+
+ /**
+ * Test validates whether the PoolImpl is created. Ideally when a listener is attached
+ * pool should not be created.
+ */
+ @Test
+ public void testGatewaySenderEventListenerPoolImpl() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4);
+
+ vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, false, false ));
+
+ vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateNoPoolCreation("ln" ));
+ }
+
+ // Test start/stop/resume on listener invocation
+ //this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang
+ @Ignore
+ @Test
+ public void testGatewaySenderEventListener_GatewayOperations() {
+
+ int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+ vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+ vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+ vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
+
+ vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
+ false, 100, 10, false, false, null, false, true));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ final Map initialKeyValues = new HashMap();
+ for(int i=0; i< 1000; i++) {
+ initialKeyValues.put(i, i);
+ }
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+ initialKeyValues ));
+
+ fail("tried to invoke missing method");
+// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues ));
+
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ final Map keyValues = new HashMap();
+ for(int i=1000; i< 2000; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+ keyValues ));
+
+ fail("tried to invoke missing method");
+// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ final Map finalKeyValues = new HashMap();
+ for(int i=2000; i< 3000; i++) {
+ finalKeyValues.put(i, i);
+ }
+
+ vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
+ finalKeyValues ));
+
+ finalKeyValues.putAll(initialKeyValues);
+ fail("tried to invoke missing method");
+// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", finalKeyValues ));
+
+ }
+
+ public static void validateNoPoolCreation(final String siteId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ for(GatewaySender sender: senders) {
+ if (sender.getId().equals(siteId)) {
+ AbstractGatewaySender sImpl = (AbstractGatewaySender)sender;
+ assertNull(sImpl.getProxy());
+ }
+ }
+ }
+
+ public static void validateReceivedEventsMapSizeListener1(final String senderId, final Map map) {
+
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ sender = s;
+ break;
+ }
+ }
+
+ final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners();
+ if(listeners.size() == 1) {
+ final AsyncEventListener l = listeners.get(0);
+
+ WaitCriterion wc = new WaitCriterion() {
+ Map listenerMap;
+ public boolean done() {
+ listenerMap = ((MyGatewaySenderEventListener)l)
+ .getEventsMap();
+ boolean sizeCorrect = map.size() == listenerMap.size();
+ boolean keySetCorrect = listenerMap.keySet().containsAll(map.keySet());
+ boolean valuesCorrect = listenerMap.values().containsAll(map.values());
+ return sizeCorrect && keySetCorrect && valuesCorrect;
+ }
+
+ public String description() {
+ return "Waiting for all sites to get updated, the sizes are " + listenerMap.size() + " and " + map.size();
+ }
+ };
+ Wait.waitForCriterion(wc, 60000, 500, true);
+ }
+ }
+
+ public static void validateReceivedEventsMapSizeListener2(final String senderId, final Map map) {
+
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ sender = s;
+ break;
+ }
+ }
+
+ final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners();
+ if(listeners.size() == 2) {
+ final AsyncEventListener l1 = listeners.get(0);
+ final AsyncEventListener l2 = listeners.get(1);
+ Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS).until(()->{
+ Map listenerMap1 = ((MyGatewaySenderEventListener)l1)
+ .getEventsMap();
+
+ Map listenerMap2 = ((MyGatewaySenderEventListener2)l2)
+ .getEventsMap();
+ int listener1MapSize = listenerMap1.size();
+ int listener2MapSize = listenerMap1.size();
+ int expectedMapSize = map.size();
+ boolean sizeCorrect = expectedMapSize == listener1MapSize;
+ boolean keySetCorrect = listenerMap1.keySet().containsAll(map.keySet());
+ boolean valuesCorrect = listenerMap1.values().containsAll(map.values());
+
+ boolean sizeCorrect2 = expectedMapSize== listener2MapSize;
+ boolean keySetCorrect2 = listenerMap2.keySet().containsAll(map.keySet());
+ boolean valuesCorrect2 = listenerMap2.values().containsAll(map.values());
+
+ assertEquals("Failed while waiting for all sites to get updated with the correct events. \nThe " +
+ "size of listener 1's map = "+ listener1MapSize + "\n The size of listener 2's map = " +
+ ""+ listener2MapSize + "\n The expected map size =" + expectedMapSize ,
+ true, sizeCorrect && keySetCorrect && valuesCorrect && sizeCorrect2 && keySetCorrect2 && valuesCorrect2);
+ });
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
new file mode 100644
index 0000000..200e5b6
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
+import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.ServerLocator;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.RMIException;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ *
+ */
+@Category(DistributedTest.class)
+public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ public SerialGatewaySenderOperationsDUnitTest() {
+ super();
+ }
+
+ @Override
+ protected final void postSetUpWANTestBase() throws Exception {
+ IgnoredException.addIgnoredException("Broken pipe");
+ IgnoredException.addIgnoredException("Connection reset");
+ IgnoredException.addIgnoredException("Unexpected IOException");
+ IgnoredException.addIgnoredException("Connection refused");
+ IgnoredException.addIgnoredException("could not get remote locator information");
+
+ //Stopping the gateway closed the region,
+ //which causes this exception to get logged
+ IgnoredException.addIgnoredException(RegionDestroyedException.class.getSimpleName());
+ }
+
+ @Test
+ public void testSerialGatewaySenderOperationsWithoutStarting() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 100 ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" ));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" ));
+
+ }
+
+ protected void createSenderRegions() {
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ }
+
+ protected void createReceiverRegions() {
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ }
+
+ protected void createSenderCaches(Integer lnPort) {
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+ }
+
+ protected void createSenderVM5() {
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+ }
+
+ protected void createSenderVM4() {
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+ }
+
+
+ @Test
+ public void testStartPauseResumeSerialGatewaySender() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 100 ));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" ));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" ));
+
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10 ));
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+
+ try {
+ inv1.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail("Interrupted the async invocation.");
+ }
+
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ validateQueueContents(vm4, "ln", 0);
+ validateQueueContents(vm5, "ln", 0);
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 100 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 100 ));
+
+ }
+
+ @Test
+ public void testStopSerialGatewaySender() throws Throwable {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 20 ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 20 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 20 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 20 ));
+
+ vm2.invoke(() -> WANTestBase.stopReceivers());
+ vm3.invoke(() -> WANTestBase.stopReceivers());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 20 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+ vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+ /**
+ * Should have no effect on GatewaySenderState
+ */
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+ AsyncInvocation vm4async = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+ AsyncInvocation vm5async = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+ int START_WAIT_TIME = 30000;
+ vm4async.getResult(START_WAIT_TIME);
+ vm5async.getResult(START_WAIT_TIME);
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+ vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
+
+ vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 110 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 ));
+ vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 ));
+
+ vm2.invoke(() -> WANTestBase.startReceivers());
+ vm3.invoke(() -> WANTestBase.startReceivers());
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 110 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 110 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+ }
+
+ @Test
+ public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 100 ));
+
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 200 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 200 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 200 ));
+
+ //Do some puts while restarting a sender
+ AsyncInvocation asyncPuts = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 300 ));
+
+ Thread.sleep(10);
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ asyncPuts.getResult();
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 300 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 300 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+
+
+ }
+
+ @Test
+ public void testStopOneSerialGatewaySender_PrimarySecondary() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 100 ));
+
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 100 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 100 ));
+ }
+
+ @Test
+ public void testStopOneSender_StartAnotherSender() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ createCacheInVMs(lnPort, vm4);
+ createSenderVM4();
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ createSenderVM5();
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 100 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 100 ));
+ }
+
+ @Test
+ public void test_Bug44153_StopOneSender_StartAnotherSender_CheckQueueSize() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ createSenderVM4();
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+ validateQueueContents(vm4, "ln", 10);
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ createSenderVM5();
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_RR", 10, 110 ));
+
+ validateQueueContents(vm5, "ln", 100);
+ vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+ validateQueueContents(vm4, "ln", 10);
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 100 ));
+ vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 110 ));
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+ }
+
+ private void validateQueueContents(VM vm, String site, int size) {
+ vm.invoke(() -> WANTestBase.validateQueueContents( site,
+ size ));
+ }
+
+ /**
+ * Destroy SerialGatewaySender on all the nodes.
+ */
+ @Test
+ public void testDestroySerialGatewaySenderOnAllNodes() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+
+ //before destroying, stop the sender
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
+ vm5.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
+
+ vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
+ vm5.invoke(() -> WANTestBase.destroySender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
+ vm5.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
+ }
+
+ /**
+ * Destroy SerialGatewaySender on a single node.
+ */
+ @Test
+ public void testDestroySerialGatewaySenderOnSingleNode() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+
+ //before destroying, stop the sender
+ vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
+
+ vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
+ vm5.invoke(() -> WANTestBase.verifySenderRunningState( "ln" ));
+ }
+
+ /**
+ * Since the sender is attached to a region and in use, it can not be destroyed.
+ * Hence, exception is thrown by the sender API.
+ */
+ @Test
+ public void testDestroySerialGatewaySenderExceptionScenario() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 10 ));
+
+ try {
+ vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
+ } catch (RMIException e) {
+ assertTrue("Cause of the exception should be GatewaySenderException", e.getCause() instanceof GatewaySenderException);
+ }
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 10 ));
+ }
+
+
+ @Test
+ public void testGatewaySenderNotRegisteredAsCacheServer() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ SerializableRunnable check = new SerializableRunnable("assert no cache servers") {
+ public void run() {
+ InternalLocator inl = (InternalLocator)Locator.getLocator();
+ ServerLocator server = inl.getServerLocatorAdvisee();
+ LogWriterUtils.getLogWriter().info("Server load map is " + server.getLoadMap());
+ assertTrue("expected an empty map but found " + server.getLoadMap(),
+ server.getLoadMap().isEmpty());
+ QueueConnectionRequest request = new QueueConnectionRequest(
+ ClientProxyMembershipID.getNewProxyMembership(InternalDistributedSystem.getConnectedInstance()),
+ 1, new HashSet<>(), "", false);
+ QueueConnectionResponse response = (QueueConnectionResponse)server.processRequest(request);
+ assertTrue("expected no servers but found " + response.getServers(),
+ response.getServers().isEmpty());
+ }
+ };
+ vm0.invoke(check);
+ vm1.invoke(check);
+
+ }
+
+
+
+ public static void verifySenderPausedState(String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ AbstractGatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = (AbstractGatewaySender)s;
+ break;
+ }
+ }
+ assertTrue(sender.isPaused());
+ }
+
+ public static void verifySenderResumedState(String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ AbstractGatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = (AbstractGatewaySender)s;
+ break;
+ }
+ }
+ assertFalse(sender.isPaused());
+ assertTrue(sender.isRunning());
+ }
+
+ public static void verifySenderStoppedState(String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ AbstractGatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = (AbstractGatewaySender)s;
+ break;
+ }
+ }
+ assertFalse(sender.isRunning());
+ assertFalse(sender.isPaused());
+ }
+
+ public static void verifyGatewaySenderOperations(String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ assertFalse(sender.isPaused());
+ assertFalse(((AbstractGatewaySender)sender).isRunning());
+ sender.pause();
+ sender.resume();
+ sender.stop();
+ }
+}