You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:16 UTC
[28/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
deleted file mode 100644
index b754254..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import static com.gemstone.gemfire.test.dunit.Wait.*;
-import static com.gemstone.gemfire.test.dunit.IgnoredException.*;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.junit.categories.FlakyTest;
-
-@Category(DistributedTest.class)
-public class ParallelWANStatsDUnitTest extends WANTestBase{
-
- private static final int NUM_PUTS = 100;
- private static final long serialVersionUID = 1L;
-
- private String testName;
-
- public ParallelWANStatsDUnitTest() {
- super();
- }
-
- @Override
- protected final void postSetUpWANTestBase() throws Exception {
- this.testName = getTestMethodName();
- }
-
- @Test
- public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSendersWithConflation(lnPort);
-
- createSenderPRs(0);
-
- startPausedSenders();
-
- createReceiverPR(vm2, 1);
- createReceiverPR(vm3, 1);
-
- putKeyValues();
-
- ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS ));
-
- assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); //batches distributed
- assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
-
- }
-
- @Test
- public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(vm2);
-
- createSenders(lnPort);
-
- createReceiverPR(vm2, 0);
-
- createSenderPRs(0);
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.doPuts( testName,
- NUM_PUTS ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, NUM_PUTS ));
-
-
- ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
-
- assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
- assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
- }
-
- @Test
- public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(vm2);
-
- createSenders(lnPort);
-
- createReceiverPR(vm2, 0);
-
- createSenderPRs(3);
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.doPuts( testName,
- NUM_PUTS ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, NUM_PUTS ));
-
- ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
-
- assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
- assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
- }
-
- @Test
- public void testWANStatsTwoWanSites_Bug44331() throws Exception {
- Integer lnPort = createFirstLocatorWithDSId(1);
- Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createCacheInVMs(tkPort, vm3);
- createReceiverInVMs(vm2);
- createReceiverInVMs(vm3);
-
- vm4.invoke(() -> WANTestBase.createCache(lnPort ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln1",
- 2, true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln2",
- 3, true, 100, 10, false, false, null, true ));
-
- createReceiverPR(vm2, 0);
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, null, 0, 10, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln1,ln2", 0, 10, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln2" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( testName,
- NUM_PUTS ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, NUM_PUTS ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- testName, NUM_PUTS ));
-
- ArrayList<Integer> v4Sender1List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln1", 0 ));
- ArrayList<Integer> v4Sender2List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln2", 0 ));
-
- assertEquals(0, v4Sender1List.get(0).intValue()); //queue size
- assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); //eventsReceived
- assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); //events queued
- assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); //events distributed
- assertTrue(v4Sender1List.get(4).intValue()>=10); //batches distributed
- assertEquals(0, v4Sender1List.get(5).intValue()); //batches redistributed
-
- assertEquals(0, v4Sender2List.get(0).intValue()); //queue size
- assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); //eventsReceived
- assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); //events queued
- assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); //events distributed
- assertTrue(v4Sender2List.get(4).intValue()>=10); //batches distributed
- assertEquals(0, v4Sender2List.get(5).intValue()); //batches redistributed
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
- vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS ));
- }
-
- @Test
- public void testParallelPropagationHA() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(vm2);
-
- createSenders(lnPort);
-
- createReceiverPR(vm2, 0);
-
- createSenderPRs(3);
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName, 1000 ));
- pause(200);
- AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
- inv1.join();
- inv2.join();
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, 1000 ));
-
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0));
-
- assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
- //We may see a single retried event on all members due to the kill
- assertTrue("Received " + receivedEvents, 3000 <= receivedEvents && 3003 >= receivedEvents); //eventsReceived
- int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
- assertTrue("Queued " + queuedEvents, 3000 <= queuedEvents && 3003 >= queuedEvents); //eventsQueued
- //assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : its quite possible that vm4 has distributed some of the events
- //assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its quite possible that vm4 has distributed some of the batches.
- assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000 ));
- }
-
- /**
- * 1 region and sender configured on local site and 1 region and a
- * receiver configured on remote site. Puts to the local region are in progress.
- * Remote region is destroyed in the middle.
- *
- * @throws Exception
- */
- @Test
- public void testParallelPropagationWithRemoteRegionDestroy() throws Exception {
- addIgnoredException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createReceiverPR(vm2, 0);
- createReceiverInVMs(vm2);
-
- createSenders(lnPort);
-
- vm2.invoke(() -> WANTestBase.addCacheListenerAndDestroyRegion(
- testName));
-
- createSenderPRs(0);
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- //start puts in RR_1 in another thread
- vm4.invoke(() -> WANTestBase.doPuts( testName, 2000 ));
-
- //verify that all is well in local site. All the events should be present in local region
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- testName, 2000 ));
-
- ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", -1));
-
-
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); //batches distributed : its quite possible that vm4 has distributed some of the batches.
- assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); //batches redistributed
- }
-
- @Test
- public void testParallelPropagationWithFilter() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
-
- createCacheInVMs(nyPort, vm2);
-
- createReceiverPR(vm2, 1);
-
- createReceiverInVMs(vm2);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- createSenderPRs(0);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
-
-
- vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, 800 ));
-
- ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
-
- assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(900, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(800, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80); //batches distributed
- assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
- assertEquals(200, v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)); //events filtered
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800));
- }
-
- @Test
- public void testParallelPropagationConflation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(vm2);
-
- createSendersWithConflation(lnPort);
-
- createSenderPRs(0);
-
- startPausedSenders();
-
- createReceiverPR(vm2, 1);
-
- Map keyValues = putKeyValues();
- final Map updateKeyValues = new HashMap();
- for(int i=0;i<50;i++) {
- updateKeyValues.put(i, i+"_updated");
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
-
- vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ ));
-
- // Do the puts again. Since these are updates, the previous updates will be conflated.
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
-
- vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, 0 ));
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0));
-
- vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- keyValues.putAll(updateKeyValues);
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, keyValues.size() ));
-
- vm2.invoke(() -> WANTestBase.validateRegionContents(
- testName, keyValues ));
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 150, NUM_PUTS));
-
- vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 0 ));
-
- ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 ));
-
- assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
- assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
- assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated
-
- }
-
- protected Map putKeyValues() {
- final Map keyValues = new HashMap();
- for(int i=0; i< NUM_PUTS; i++) {
- keyValues.put(i, i);
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues ));
-
- vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() ));
-
- return keyValues;
- }
-
- protected void createReceiverPR(VM vm, int redundancy) {
- vm.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, null, redundancy, 10, isOffHeap() ));
- }
-
- protected void createSenderPRs(int redundancy) {
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", redundancy, 10, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", redundancy, 10, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", redundancy, 10, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", redundancy, 10, isOffHeap() ));
- }
-
- protected void startPausedSenders() {
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm4.invoke(() ->pauseSender( "ln" ));
- vm5.invoke(() ->pauseSender( "ln" ));
- vm6.invoke(() ->pauseSender( "ln" ));
- vm7.invoke(() ->pauseSender( "ln" ));
- }
-
- protected void createSendersWithConflation(Integer lnPort) {
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, true, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, true, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, true, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, true, false, null, true ));
- }
-
- protected void createSenders(Integer lnPort) {
- vm4.invoke(() -> WANTestBase.createCache(lnPort ));
- vm5.invoke(() -> WANTestBase.createCache(lnPort ));
- vm6.invoke(() -> WANTestBase.createCache(lnPort ));
- vm7.invoke(() -> WANTestBase.createCache(lnPort ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
deleted file mode 100644
index b38d35b..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.execute.*;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.Wait;
-
-//The tests here are to validate changes introduced because a distributed deadlock
-//was found that caused issues for a production customer.
-//
-//There are 4 tests which use sender gateways with primaries on different
-//JVM's. Two tests use replicated and two use partition regions and the
-//the tests vary the conserve-sockets.
-//
-//currently the 4th test using PR, conserve-sockets=true hangs/fails and is commented
-//out to prevent issues
-@Category(DistributedTest.class)
-public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase {
-
- public SerialGatewaySenderDistributedDeadlockDUnitTest() {
- super();
- }
-
- //Uses replicated regions and conserve-sockets=false
- @Test
- public void testPrimarySendersOnDifferentVMsReplicated() throws Exception {
-
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
-
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-
- createCachesWith(Boolean.FALSE, nyPort, lnPort);
-
- createSerialSenders();
-
- createReplicatedRegions(nyPort);
-
- //get one primary sender on vm4 and another primary on vm5
- //the startup order matters here
- startSerialSenders();
-
- //exercise region and gateway operations with different messaging
- exerciseWANOperations();
- AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
- AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
- AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
- AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-
- exerciseFunctions();
-
- try {
- invVM4transaction.join();
- invVM5transaction.join();
- invVM4.join();
- invVM5.join();
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
- }
-
- //Uses partitioned regions and conserve-sockets=false
- @Test
- public void testPrimarySendersOnDifferentVMsPR() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
-
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-
- createCachesWith(Boolean.FALSE, nyPort, lnPort);
-
- createSerialSenders();
-
- createPartitionedRegions(nyPort);
-
- startSerialSenders();
-
- exerciseWANOperations();
- AsyncInvocation invVM4transaction
- = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
- AsyncInvocation invVM5transaction
- = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
-
- AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
- AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-
- exerciseFunctions();
-
- try {
- invVM4transaction.join();
- invVM5transaction.join();
- invVM4.join();
- invVM5.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
- }
-
- //Uses replicated regions and conserve-sockets=true
- @Test
- public void testPrimarySendersOnDifferentVMsReplicatedSocketPolicy() throws Exception {
-
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
-
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-
- createCachesWith(Boolean.TRUE, nyPort, lnPort);
-
- createSerialSenders();
-
- createReplicatedRegions(nyPort);
-
- //get one primary sender on vm4 and another primary on vm5
- //the startup order matters here
- startSerialSenders();
-
- //exercise region and gateway operations with messaging
- exerciseWANOperations();
- AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
- AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
-
- AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
- AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-
- exerciseFunctions();
-
- try {
- invVM4transaction.join();
- invVM5transaction.join();
- invVM4.join();
- invVM5.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
- }
-
- //Uses partitioned regions and conserve-sockets=true
- //this always causes a distributed deadlock
- @Test
- public void testPrimarySendersOnDifferentVMsPRSocketPolicy() throws Exception {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1));
-
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-
- createCachesWith(Boolean.TRUE, nyPort, lnPort);
-
- createSerialSenders();
-
- createPartitionedRegions(nyPort);
-
- startSerialSenders();
-
- exerciseWANOperations();
- AsyncInvocation invVM4transaction
- = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
- AsyncInvocation invVM5transaction
- = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000));
-
- AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
- AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-
- exerciseFunctions();
-
- try {
- invVM4transaction.join();
- invVM5transaction.join();
- invVM4.join();
- invVM5.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
- }
-
- //**************************************************************************
- //Utility methods used by tests
- //**************************************************************************
- private void createReplicatedRegions(Integer nyPort) throws Exception {
- //create receiver
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, false));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- //create senders
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1,ln2", false));
-
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1,ln2", false));
- }
-
- private void createCachesWith(Boolean socketPolicy, Integer nyPort, Integer lnPort) {
- vm2.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, nyPort));
-
- vm4.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
-
- vm5.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort));
- }
-
- private void exerciseFunctions() throws Exception {
- //do function calls that use a shared connection
- for (int x = 0; x < 1000; x++) {
- //setting it to Boolean.TRUE it should pass the test
- vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
- vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE));
- }
- for (int x = 0; x < 1000; x++) {
- //setting the Boolean.FALSE below will cause a deadlock in some GFE versions
- //setting it to Boolean.TRUE as above it should pass the test
- //this is similar to the customer found distributed deadlock
- vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
- vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE));
- }
- }
-
- private void createPartitionedRegions(Integer nyPort) throws Exception {
- //create remote receiver
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR",
- "", 0, 113, false));
-
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- //create sender vms
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false));
-
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false));
- }
-
- private void exerciseWANOperations() throws Exception {
- //note - some of these should be made async to truly exercise the
- //messaging between the WAN gateways and members
-
- //exercise region and gateway operations
- vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
- vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
- Wait.pause(2000); //wait for events to propagate
- vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
- vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
- vm5.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 100));
- Wait.pause(2000);//wait for events to propagate
- vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
- vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
- vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
- vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100));
- Wait.pause(2000); //wait for events to propagate
- vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
- vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100));
- vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doInvalidates(getTestMethodName() + "_RR", 100, 100));
- vm4.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
- vm5.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10));
- Wait.pause(2000);//wait for events to propagate
- vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
- vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000));
- vm4.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 1000));
- Wait.pause(2000);//wait for events to propagate
- vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
- vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
- vm4.invoke(() -> WANTestBase.doPutsPDXSerializable(getTestMethodName() + "_RR", 100));
- Wait.pause(2000);
- vm5.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
- vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100));
- }
-
- private void startSerialSenders() throws Exception {
- //get one primary sender on vm4 and another primary on vm5
- //the startup order matters here so that primaries are
- //on different JVM's
- vm4.invoke(() -> WANTestBase.startSender("ln1"));
-
- vm5.invoke(() -> WANTestBase.startSender("ln2"));
-
- //start secondaries
- vm5.invoke(() -> WANTestBase.startSender("ln1"));
-
- vm4.invoke(() -> WANTestBase.startSender("ln2"));
- }
-
- private void createSerialSenders() throws Exception {
-
- vm4.invoke(() -> WANTestBase.createSender("ln1", 2,
- false, 100, 10, false, false, null, true));
-
- vm5.invoke(() -> WANTestBase.createSender("ln1", 2,
- false, 100, 10, false, false, null, true));
-
- vm4.invoke(() -> WANTestBase.createSender("ln2", 2,
- false, 100, 10, false, false, null, true));
-
- vm5.invoke(() -> WANTestBase.createSender("ln2", 2,
- false, 100, 10, false, false, null, true));
- }
-
- public static void doFunctionPuts(String name, int num, Boolean useThreadOwnedSocket) throws Exception {
- Region region = CacheFactory.getAnyInstance().getRegion(name);
- FunctionService.registerFunction(new TestFunction());
- Execution exe = FunctionService.onRegion(region);
- for (int x = 0; x < num; x++) {
- exe.withArgs(useThreadOwnedSocket).execute("com.gemstone.gemfire.internal.cache.wan.serial.TestFunction");
- }
- }
-
- public static void doTxPutsPR(String regionName, int numPuts, int size) throws Exception {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- CacheTransactionManager mgr = cache.getCacheTransactionManager();
- for (int x = 0; x < numPuts; x++) {
- int temp = (int) (Math.floor(Math.random() * size));
- try {
- mgr.begin();
- r.put(temp, temp);
- mgr.commit();
- } catch (com.gemstone.gemfire.cache.TransactionDataNotColocatedException txe) {
- //ignore colocation issues or primary bucket issues
- } catch (com.gemstone.gemfire.cache.CommitConflictException cce) {
- //ignore - conflicts are ok and expected
- }
- }
- }
-
- public static void doInvalidates(String regionName, int numInvalidates, int size) throws Exception {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- for (int x = 0; x < numInvalidates; x++) {
- int temp = (int) (Math.floor(Math.random() * size));
- try {
- if (r.containsValueForKey(temp)) {
- r.invalidate(temp);
- }
- } catch (com.gemstone.gemfire.cache.EntryNotFoundException entryNotFoundException) {
- //ignore as an entry may not exist
- }
- }
- }
-
-}
-
-class TestFunction implements Function {
-
- @Override
- public boolean hasResult() {
- return false;
- }
-
- @Override
- public void execute(FunctionContext fc) {
- boolean option = (Boolean) fc.getArguments();
- if (option) {
- DistributedSystem.setThreadsSocketPolicy(false);
- }
- RegionFunctionContext context = (RegionFunctionContext) fc;
- Region local = context.getDataSet();
- local.put(randKeyValue(10), randKeyValue(10000));
- }
-
- @Override
- public String getId() {
- return this.getClass().getName();
- }
-
- @Override
- public boolean optimizeForWrite() {
- return false;
- }
-
- @Override
- public boolean isHA() {
- return false;
- }
-
- private int randKeyValue(int size) {
- double temp = Math.floor(Math.random() * size);
- return (int) temp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
deleted file mode 100644
index 8c255c1..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import com.jayway.awaitility.Awaitility;
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener;
-import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener2;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-import com.gemstone.gemfire.test.junit.categories.FlakyTest;
-
-@Category(DistributedTest.class)
-public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
-
- private static final long serialVersionUID = 1L;
-
- public SerialGatewaySenderEventListenerDUnitTest() {
- super();
- }
-
- /**
- * Test validates whether the listener attached receives all the events.
- * this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang
- */
- @Ignore
- @Test
- public void testGatewaySenderEventListenerInvocationWithoutLocator() {
- int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
- vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
- vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
- vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
-
- vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, false, true));
- vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, false, true));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- final Map keyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
- keyValues ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", keyValues.size() ));
-
- vm5.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", keyValues.size() ));
-
- vm4.invoke(() -> WANTestBase.printEventListenerMap());
- vm5.invoke(() -> WANTestBase.printEventListenerMap());
-
- fail("tried to invoke missing method");
-// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues ));
- }
-
- /**
- * Test validates whether the listener attached receives all the events.
- */
- @Test
- public void testGatewaySenderEventListenerInvocation() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, false, true));
- vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, false, true));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- final Map keyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
- keyValues ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", keyValues.size() ));
-
- vm5.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", keyValues.size() ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 0 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 0 ));
-
- vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues ));
- }
-
- /**
- * Test validates whether the listener attached receives all the events.
- * When there are 2 listeners attached to the GatewaySender.
- */
- @Test
- public void testGatewaySender2EventListenerInvocation() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, true, true));
- vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, true, true));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- final Map keyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
- keyValues ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 0 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 0 ));
-
- // TODO: move validateReceivedEventsMapSizeListener2 to a shared util class
- vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener2("ln", keyValues ));
- }
-
- /**
- * Test validates whether the PoolImpl is created. Ideally when a listener is attached
- * pool should not be created.
- */
- @Test
- public void testGatewaySenderEventListenerPoolImpl() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4);
-
- vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, false, false ));
-
- vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateNoPoolCreation("ln" ));
- }
-
- // Test start/stop/resume on listener invocation
- //this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang
- @Ignore
- @Test
- public void testGatewaySenderEventListener_GatewayOperations() {
-
- int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
- vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
- vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
- vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort ));
-
- vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
- false, 100, 10, false, false, null, false, true));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- final Map initialKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- initialKeyValues.put(i, i);
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
- initialKeyValues ));
-
- fail("tried to invoke missing method");
-// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues ));
-
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- final Map keyValues = new HashMap();
- for(int i=1000; i< 2000; i++) {
- keyValues.put(i, i);
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
- keyValues ));
-
- fail("tried to invoke missing method");
-// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-
- final Map finalKeyValues = new HashMap();
- for(int i=2000; i< 3000; i++) {
- finalKeyValues.put(i, i);
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR",
- finalKeyValues ));
-
- finalKeyValues.putAll(initialKeyValues);
- fail("tried to invoke missing method");
-// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", finalKeyValues ));
-
- }
-
- public static void validateNoPoolCreation(final String siteId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- for(GatewaySender sender: senders) {
- if (sender.getId().equals(siteId)) {
- AbstractGatewaySender sImpl = (AbstractGatewaySender)sender;
- assertNull(sImpl.getProxy());
- }
- }
- }
-
- public static void validateReceivedEventsMapSizeListener1(final String senderId, final Map map) {
-
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for(GatewaySender s : senders){
- if(s.getId().equals(senderId)){
- sender = s;
- break;
- }
- }
-
- final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners();
- if(listeners.size() == 1) {
- final AsyncEventListener l = listeners.get(0);
-
- WaitCriterion wc = new WaitCriterion() {
- Map listenerMap;
- public boolean done() {
- listenerMap = ((MyGatewaySenderEventListener)l)
- .getEventsMap();
- boolean sizeCorrect = map.size() == listenerMap.size();
- boolean keySetCorrect = listenerMap.keySet().containsAll(map.keySet());
- boolean valuesCorrect = listenerMap.values().containsAll(map.values());
- return sizeCorrect && keySetCorrect && valuesCorrect;
- }
-
- public String description() {
- return "Waiting for all sites to get updated, the sizes are " + listenerMap.size() + " and " + map.size();
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true);
- }
- }
-
- public static void validateReceivedEventsMapSizeListener2(final String senderId, final Map map) {
-
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for(GatewaySender s : senders){
- if(s.getId().equals(senderId)){
- sender = s;
- break;
- }
- }
-
- final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners();
- if(listeners.size() == 2) {
- final AsyncEventListener l1 = listeners.get(0);
- final AsyncEventListener l2 = listeners.get(1);
- Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS).until(()->{
- Map listenerMap1 = ((MyGatewaySenderEventListener)l1)
- .getEventsMap();
-
- Map listenerMap2 = ((MyGatewaySenderEventListener2)l2)
- .getEventsMap();
- int listener1MapSize = listenerMap1.size();
- int listener2MapSize = listenerMap1.size();
- int expectedMapSize = map.size();
- boolean sizeCorrect = expectedMapSize == listener1MapSize;
- boolean keySetCorrect = listenerMap1.keySet().containsAll(map.keySet());
- boolean valuesCorrect = listenerMap1.values().containsAll(map.values());
-
- boolean sizeCorrect2 = expectedMapSize== listener2MapSize;
- boolean keySetCorrect2 = listenerMap2.keySet().containsAll(map.keySet());
- boolean valuesCorrect2 = listenerMap2.values().containsAll(map.values());
-
- assertEquals("Failed while waiting for all sites to get updated with the correct events. \nThe " +
- "size of listener 1's map = "+ listener1MapSize + "\n The size of listener 2's map = " +
- ""+ listener2MapSize + "\n The expected map size =" + expectedMapSize ,
- true, sizeCorrect && keySetCorrect && valuesCorrect && sizeCorrect2 && keySetCorrect2 && valuesCorrect2);
- });
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
deleted file mode 100644
index 200e5b6..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ /dev/null
@@ -1,665 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
-import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.distributed.Locator;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.InternalLocator;
-import com.gemstone.gemfire.distributed.internal.ServerLocator;
-import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.RMIException;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-
-/**
- *
- */
-@Category(DistributedTest.class)
-public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
-
- private static final long serialVersionUID = 1L;
-
- public SerialGatewaySenderOperationsDUnitTest() {
- super();
- }
-
- @Override
- protected final void postSetUpWANTestBase() throws Exception {
- IgnoredException.addIgnoredException("Broken pipe");
- IgnoredException.addIgnoredException("Connection reset");
- IgnoredException.addIgnoredException("Unexpected IOException");
- IgnoredException.addIgnoredException("Connection refused");
- IgnoredException.addIgnoredException("could not get remote locator information");
-
- //Stopping the gateway closed the region,
- //which causes this exception to get logged
- IgnoredException.addIgnoredException(RegionDestroyedException.class.getSimpleName());
- }
-
- @Test
- public void testSerialGatewaySenderOperationsWithoutStarting() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 100 ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" ));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" ));
-
- }
-
- protected void createSenderRegions() {
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- }
-
- protected void createReceiverRegions() {
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- }
-
- protected void createSenderCaches(Integer lnPort) {
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- vm7.invoke(() -> WANTestBase.createCache( lnPort ));
- }
-
- protected void createSenderVM5() {
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, true, null, true ));
- }
-
- protected void createSenderVM4() {
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, true, null, true ));
- }
-
-
- @Test
- public void testStartPauseResumeSerialGatewaySender() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 100 ));
-
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" ));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" ));
-
- AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10 ));
-
- vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
-
- try {
- inv1.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail("Interrupted the async invocation.");
- }
-
- LogWriterUtils.getLogWriter().info("Completed puts in the region");
-
- validateQueueContents(vm4, "ln", 0);
- validateQueueContents(vm5, "ln", 0);
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 100 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 100 ));
-
- }
-
- @Test
- public void testStopSerialGatewaySender() throws Throwable {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 20 ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 20 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 20 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 20 ));
-
- vm2.invoke(() -> WANTestBase.stopReceivers());
- vm3.invoke(() -> WANTestBase.stopReceivers());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 20 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
- vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
-
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
- vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
- /**
- * Should have no effect on GatewaySenderState
- */
- vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
-
- AsyncInvocation vm4async = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
- AsyncInvocation vm5async = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
- int START_WAIT_TIME = 30000;
- vm4async.getResult(START_WAIT_TIME);
- vm5async.getResult(START_WAIT_TIME);
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
- vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 ));
-
- vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 110 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 ));
- vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 ));
-
- vm2.invoke(() -> WANTestBase.startReceivers());
- vm3.invoke(() -> WANTestBase.startReceivers());
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 110 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 110 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
- }
-
- @Test
- public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 100 ));
-
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 200 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 200 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 200 ));
-
- //Do some puts while restarting a sender
- AsyncInvocation asyncPuts = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 300 ));
-
- Thread.sleep(10);
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-
- asyncPuts.getResult();
- LogWriterUtils.getLogWriter().info("Completed puts in the region");
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 300 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 300 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
-
-
- }
-
- @Test
- public void testStopOneSerialGatewaySender_PrimarySecondary() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
-
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 100 ));
-
- LogWriterUtils.getLogWriter().info("Completed puts in the region");
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 100 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 100 ));
- }
-
- @Test
- public void testStopOneSender_StartAnotherSender() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- createCacheInVMs(lnPort, vm4);
- createSenderVM4();
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
-
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- createSenderVM5();
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 100 ));
- LogWriterUtils.getLogWriter().info("Completed puts in the region");
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 100 ));
- }
-
- @Test
- public void test_Bug44153_StopOneSender_StartAnotherSender_CheckQueueSize() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- createSenderVM4();
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
- validateQueueContents(vm4, "ln", 10);
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
-
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- createSenderVM5();
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_RR", 10, 110 ));
-
- validateQueueContents(vm5, "ln", 100);
- vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- validateQueueContents(vm4, "ln", 10);
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- LogWriterUtils.getLogWriter().info("Completed puts in the region");
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 100 ));
- vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 110 ));
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
- }
-
- private void validateQueueContents(VM vm, String site, int size) {
- vm.invoke(() -> WANTestBase.validateQueueContents( site,
- size ));
- }
-
- /**
- * Destroy SerialGatewaySender on all the nodes.
- */
- @Test
- public void testDestroySerialGatewaySenderOnAllNodes() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
-
- //before destroying, stop the sender
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
- vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
- vm5.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
-
- vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
- vm5.invoke(() -> WANTestBase.destroySender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
- vm5.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
- }
-
- /**
- * Destroy SerialGatewaySender on a single node.
- */
- @Test
- public void testDestroySerialGatewaySenderOnSingleNode() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
-
- //before destroying, stop the sender
- vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" ));
-
- vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false ));
- vm5.invoke(() -> WANTestBase.verifySenderRunningState( "ln" ));
- }
-
- /**
- * Since the sender is attached to a region and in use, it can not be destroyed.
- * Hence, exception is thrown by the sender API.
- */
- @Test
- public void testDestroySerialGatewaySenderExceptionScenario() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 10 ));
-
- try {
- vm4.invoke(() -> WANTestBase.destroySender( "ln" ));
- } catch (RMIException e) {
- assertTrue("Cause of the exception should be GatewaySenderException", e.getCause() instanceof GatewaySenderException);
- }
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 10 ));
- }
-
-
- @Test
- public void testGatewaySenderNotRegisteredAsCacheServer() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, true, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, true, null, true ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- SerializableRunnable check = new SerializableRunnable("assert no cache servers") {
- public void run() {
- InternalLocator inl = (InternalLocator)Locator.getLocator();
- ServerLocator server = inl.getServerLocatorAdvisee();
- LogWriterUtils.getLogWriter().info("Server load map is " + server.getLoadMap());
- assertTrue("expected an empty map but found " + server.getLoadMap(),
- server.getLoadMap().isEmpty());
- QueueConnectionRequest request = new QueueConnectionRequest(
- ClientProxyMembershipID.getNewProxyMembership(InternalDistributedSystem.getConnectedInstance()),
- 1, new HashSet<>(), "", false);
- QueueConnectionResponse response = (QueueConnectionResponse)server.processRequest(request);
- assertTrue("expected no servers but found " + response.getServers(),
- response.getServers().isEmpty());
- }
- };
- vm0.invoke(check);
- vm1.invoke(check);
-
- }
-
-
-
- public static void verifySenderPausedState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender)s;
- break;
- }
- }
- assertTrue(sender.isPaused());
- }
-
- public static void verifySenderResumedState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender)s;
- break;
- }
- }
- assertFalse(sender.isPaused());
- assertTrue(sender.isRunning());
- }
-
- public static void verifySenderStoppedState(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender)s;
- break;
- }
- }
- assertFalse(sender.isRunning());
- assertFalse(sender.isPaused());
- }
-
- public static void verifyGatewaySenderOperations(String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- assertFalse(sender.isPaused());
- assertFalse(((AbstractGatewaySender)sender).isRunning());
- sender.pause();
- sender.resume();
- sender.stop();
- }
-}