You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/01 21:55:20 UTC
[03/50] [abbrv] incubator-geode git commit: GEODE-864: Waiting for
conflation thread in conflation tests
GEODE-864: Waiting for conflation thread in conflation tests
For parallel queues, there is a actually a separate conflation thread
that removes entries from the queue on conflation. The tests need to
wait for this thread to finish it's work.
As part of this change, I refactored the ParallelWANConflationDUnitTest
to remove a bunch of duplicate code, replaced the VM.invoke(String)
calls with lambdas, and scaled the test down to create fewer buckets and
do fewer puts so that it runs in 1/4 of the time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a7249514
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a7249514
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a7249514
Branch: refs/heads/feature/GEODE-773-2
Commit: a7249514eb2f09e5ca4920e7dc7aedfce4c2fe12
Parents: 27d965d
Author: Dan Smith <up...@apache.org>
Authored: Tue Jan 26 11:26:21 2016 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Tue Jan 26 16:50:11 2016 -0800
----------------------------------------------------------------------
.../gemfire/internal/cache/wan/WANTestBase.java | 149 ++-
...arallelGatewaySenderOperationsDUnitTest.java | 8 +-
.../ParallelWANConflationDUnitTest.java | 1017 ++++++------------
.../wan/serial/SerialWANStatsDUnitTest.java | 2 +-
4 files changed, 424 insertions(+), 752 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 230c8d8..5539eb1 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.gemstone.gemfire.cache.AttributesFactory;
@@ -117,6 +118,7 @@ import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.Host;
import com.gemstone.gemfire.test.dunit.VM;
import com.gemstone.gemfire.util.test.TestUtil;
+import com.jayway.awaitility.Awaitility;
import junit.framework.Assert;
@@ -2160,6 +2162,8 @@ public class WANTestBase extends DistributedTestCase{
}
}
sender.pause();
+ ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause();
+
}
finally {
exp.remove();
@@ -2167,27 +2171,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void pauseSenderAndWaitForDispatcherToPause(String senderId) {
- final ExpectedException exln = addExpectedException("Could not connect");
- ExpectedException exp = addExpectedException(ForceReattemptException.class
- .getName());
- try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- sender.pause();
- ((AbstractGatewaySender)sender).getEventProcessor().waitForDispatcherToPause();
- } finally {
- exp.remove();
- exln.remove();
- }
- }
-
public static void resumeSender(String senderId) {
final ExpectedException exln = addExpectedException("Could not connect");
ExpectedException exp = addExpectedException(ForceReattemptException.class
@@ -3189,35 +3172,23 @@ public class WANTestBase extends DistributedTestCase{
public static Map putCustomerPartitionedRegion(int numPuts) {
- assertNotNull(cache);
- assertNotNull(customerRegion);
- Map custKeyValues = new HashMap();
- for (int i = 1; i <= numPuts; i++) {
- CustId custid = new CustId(i);
- Customer customer = new Customer("name" + i, "Address" + i);
- try {
- customerRegion.put(custid, customer);
- custKeyValues.put(custid, customer);
- assertTrue(customerRegion.containsKey(custid));
- assertEquals(customer,customerRegion.get(custid));
- }
- catch (Exception e) {
- fail(
- "putCustomerPartitionedRegion : failed while doing put operation in CustomerPartitionedRegion ",
- e);
- }
- getLogWriter().info("Customer :- { " + custid + " : " + customer + " }");
- }
- return custKeyValues;
+ String valueSuffix = "";
+ return putCustomerPartitionedRegion(numPuts, valueSuffix);
}
public static Map updateCustomerPartitionedRegion(int numPuts) {
+ String valueSuffix = "_update";
+ return putCustomerPartitionedRegion(numPuts, valueSuffix);
+ }
+
+ protected static Map putCustomerPartitionedRegion(int numPuts,
+ String valueSuffix) {
assertNotNull(cache);
assertNotNull(customerRegion);
- Map custKeyValues = new HashMap();
+ Map custKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- Customer customer = new Customer("name" + i, "Address" + i + "_update");
+ Customer customer = new Customer("name" + i, "Address" + i + valueSuffix);
try {
customerRegion.put(custid, customer);
assertTrue(customerRegion.containsKey(custid));
@@ -3240,24 +3211,22 @@ public class WANTestBase extends DistributedTestCase{
Map orderKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- Order order = new Order("OREDR" + oid);
- try {
- orderRegion.put(orderId, order);
- orderKeyValues.put(orderId, order);
- assertTrue(orderRegion.containsKey(orderId));
- assertEquals(order,orderRegion.get(orderId));
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ Order order = new Order("OREDR" + oid);
+ try {
+ orderRegion.put(orderId, order);
+ orderKeyValues.put(orderId, order);
+ assertTrue(orderRegion.containsKey(orderId));
+ assertEquals(order,orderRegion.get(orderId));
- }
- catch (Exception e) {
- fail(
- "putOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
- e);
- }
- getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
}
+ catch (Exception e) {
+ fail(
+ "putOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
+ e);
+ }
+ getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
}
return orderKeyValues;
}
@@ -3412,36 +3381,7 @@ public class WANTestBase extends DistributedTestCase{
}
return shipmentKeyValue;
}
-
- public static void doPutsPDXSerializable(String regionName, int numPuts) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (int i = 0; i < numPuts; i++) {
- r.put("Key_" + i, new SimpleClass(i, (byte)i));
- }
- }
- public static void doPutsPDXSerializable2(String regionName, int numPuts) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (int i = 0; i < numPuts; i++) {
- r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i));
- }
- }
-
-
- public static void doTxPuts(String regionName, int numPuts) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- CacheTransactionManager mgr = cache.getCacheTransactionManager();
-
- mgr.begin();
- r.put(0, 0);
- r.put(100, 100);
- r.put(200, 200);
- mgr.commit();
- }
-
public static Map updateShipmentPartitionedRegion(int numPuts) {
assertNotNull(cache);
assertNotNull(shipmentRegion);
@@ -3495,7 +3435,36 @@ public class WANTestBase extends DistributedTestCase{
}
return shipmentKeyValue;
}
+
+ public static void doPutsPDXSerializable(String regionName, int numPuts) {
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ for (int i = 0; i < numPuts; i++) {
+ r.put("Key_" + i, new SimpleClass(i, (byte)i));
+ }
+ }
+
+ public static void doPutsPDXSerializable2(String regionName, int numPuts) {
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ for (int i = 0; i < numPuts; i++) {
+ r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i));
+ }
+ }
+
+ public static void doTxPuts(String regionName, int numPuts) {
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ CacheTransactionManager mgr = cache.getCacheTransactionManager();
+
+ mgr.begin();
+ r.put(0, 0);
+ r.put(100, 100);
+ r.put(200, 200);
+ mgr.commit();
+ }
+
public static void doNextPuts(String regionName, int start, int numPuts) {
//waitForSitesToUpdate();
ExpectedException exp = addExpectedException(CacheClosedException.class
@@ -3512,6 +3481,10 @@ public class WANTestBase extends DistributedTestCase{
}
public static void checkQueueSize(String senderId, int numQueueEntries) {
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> testQueueSize(senderId, numQueueEntries));
+ }
+
+ public static void testQueueSize(String senderId, int numQueueEntries) {
GatewaySender sender = null;
for (GatewaySender s : cache.getGatewaySenders()) {
if (s.getId().equals(senderId)) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 19f6c4b..3d0eb5b 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -210,10 +210,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 100 });
//now, pause all of the senders
- vm4.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
+ vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
//SECOND RUN: keep one thread doing puts to the region
vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 4acaaf4..763b9c4 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -40,92 +40,31 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
}
public void testParallelPropagationConflationDisabled() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- pause(3000);
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
-
- pause(2000);
-
- final Map keyValues = new HashMap();
- final Map updateKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
-
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+ initialSetUp();
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
- for(int i=0;i<500;i++) {
- updateKeyValues.put(i, i+"_updated");
- }
+ createSendersNoConflation();
+
+ createSenderPRs();
+
+ startPausedSenders();
+
+ createReceiverPrs();
+
+ final Map keyValues = putKeyValues();
+
+ vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() ));
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+ final Map updateKeyValues = updateKeyValues();
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (keyValues.size() + updateKeyValues.size()) });
+ vm4.invoke(() ->checkQueueSize( "ln", (keyValues.size() + updateKeyValues.size()) ));
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, 0 });
+ vm2.invoke(() ->validateRegionSize(
+ testName, 0 ));
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+ resumeSenders();
keyValues.putAll(updateKeyValues);
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
-
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
+ validateReceiverRegionSize(keyValues);
}
@@ -137,661 +76,421 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
* @throws Exception
*/
public void testParallelPropagationBatchConflation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 50, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 50, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 50, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 50, false, false, null, true });
+ initialSetUp();
+
+ vm4.invoke(() ->createSender( "ln", 2,
+ true, 100, 50, false, false, null, true ));
+ vm5.invoke(() ->createSender( "ln", 2,
+ true, 100, 50, false, false, null, true ));
+ vm6.invoke(() ->createSender( "ln", 2,
+ true, 100, 50, false, false, null, true ));
+ vm7.invoke(() ->createSender( "ln", 2,
+ true, 100, 50, false, false, null, true ));
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
+ createSenderPRs();
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ startSenders();
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+ pauseSenders();
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
+ createReceiverPrs();
final Map keyValues = new HashMap();
- for (int i = 1; i <= 100; i++) {
+ for (int i = 1; i <= 10; i++) {
for (int j = 1; j <= 10; j++) {
keyValues.put(j, i) ;
}
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] {
- testName, keyValues });
+ vm4.invoke(() ->putGivenKeyValue(
+ testName, keyValues ));
}
- vm4.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
+ vm4.invoke(() ->enableConflation( "ln" ));
+ vm5.invoke(() ->enableConflation( "ln" ));
+ vm6.invoke(() ->enableConflation( "ln" ));
+ vm7.invoke(() ->enableConflation( "ln" ));
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+ resumeSenders();
- pause(2000);
- ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
- ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
- ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
- ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
+ ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
+ ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() ->
+ WANTestBase.getSenderStats( "ln", 0 ));
- getLogWriter().info("KBKBKB: batch conflated events : vm4 : " + v4List.get(8));
- getLogWriter().info("KBKBKB: batch conflated events : vm5 : " + v5List.get(8));
- getLogWriter().info("KBKBKB: batch conflated events : vm6 : " + v6List.get(8));
- getLogWriter().info("KBKBKB: batch conflated events : vm7 : " + v7List.get(8));
- getLogWriter().info("KBKBKB: batch conflated events : " + (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)));
assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, 10 });
+ vm2.invoke(() ->validateRegionSize(
+ testName, 10 ));
}
public void testParallelPropagationConflation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- pause(3000);
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
-
- pause(2000);
-
- final Map keyValues = new HashMap();
- final Map updateKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
-
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+ doTestParallelPropagationConflation(0);
+ }
+
+ public void testParallelPropagationConflationRedundancy2() throws Exception {
+ doTestParallelPropagationConflation(2);
+ }
+
+ public void doTestParallelPropagationConflation(int redundancy) throws Exception {
+ initialSetUp();
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
- for(int i=0;i<500;i++) {
- updateKeyValues.put(i, i+"_updated");
- }
-
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+ createSendersWithConflation();
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
-
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+ createSenderPRs(redundancy);
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
+ startPausedSenders();
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, 0 });
+ createReceiverPrs();
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+ final Map keyValues = putKeyValues();
- keyValues.putAll(updateKeyValues);
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
-
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
- }
-
-
- /**
- * Reproduce the bug #47213.
- * The test is same as above test, with the only difference that
- * redundancy is set to 1.
- * @throws Exception
- */
- public void testParallelPropagationConflation_Bug47213() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 2, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 2, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 2, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 2, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- pause(3000);
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
-
- pause(2000);//give some time for all the senders to pause
-
- final Map keyValues = new HashMap();
- final Map updateKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+ vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() ));
+ final Map updateKeyValues = updateKeyValues();
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
- for(int i=0;i<500;i++) {
- updateKeyValues.put(i, i+"_updated");
- }
+ vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+ vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
+ vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, 0 });
+ vm2.invoke(() ->validateRegionSize(
+ testName, 0 ));
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+ resumeSenders();
keyValues.putAll(updateKeyValues);
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
-
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
+ validateReceiverRegionSize(keyValues);
}
public void testParallelPropagationConflationOfRandomKeys() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- pause(3000);
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
-
- pause(2000);
+ initialSetUp();
- final Map keyValues = new HashMap();
- final Map updateKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+ createSendersWithConflation();
+
+ createSenderPRs();
+
+ startPausedSenders();
+
+ createReceiverPrs();
+
+ final Map keyValues = putKeyValues();
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+ vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() ));
- while(updateKeyValues.size()!=500) {
+ final Map updateKeyValues = new HashMap();
+ while(updateKeyValues.size()!=10) {
int key = (new Random()).nextInt(keyValues.size());
updateKeyValues.put(key, key+"_updated");
}
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+ vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+ vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() ));
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+ vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+ vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() ));
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, 0 });
+ vm2.invoke(() ->validateRegionSize(
+ testName, 0 ));
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+ resumeSenders();
keyValues.putAll(updateKeyValues);
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName, keyValues.size() });
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- testName, keyValues });
+ validateReceiverRegionSize(keyValues);
}
public void testParallelPropagationColocatedRegionConflation()
throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
-
- vm4.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- pause(3000);
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
-
- pause(2000);
-
- Map custKeyValues = (Map)vm4.invoke(WANTestBase.class, "putCustomerPartitionedRegion",
- new Object[] { 1000 });
- Map orderKeyValues = (Map)vm4.invoke(WANTestBase.class, "putOrderPartitionedRegion",
- new Object[] { 1000 });
- Map shipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "putShipmentPartitionedRegion",
- new Object[] { 1000 });
-
- vm4.invoke(
- WANTestBase.class,
- "checkQueueSize",
- new Object[] {
+ initialSetUp();
+
+ createSendersWithConflation();
+
+ createOrderShipmentOnSenders();
+
+ startPausedSenders();
+
+ createOrderShipmentOnReceivers();
+
+ Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 ));
+ Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegion( 20 ));
+ Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegion( 20 ));
+
+ vm4.invoke(() ->
+ WANTestBase.checkQueueSize(
"ln",
(custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
- .size()) });
-
- Map updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
- new Object[] { 500 });
- Map updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegion",
- new Object[] { 500 });
- Map updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegion",
- new Object[] { 500 });
-
- vm4.invoke(
- WANTestBase.class,
- "checkQueueSize",
- new Object[] {
+ .size()) ));
+
+ Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+ Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 ));
+ Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 ));
+ int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+ .size())
+ + updatedCustKeyValues.size()
+ + updatedOrderKeyValues.size()
+ + updatedShipmentKeyValues.size();
+ vm4.invoke(() ->
+ WANTestBase.checkQueueSize(
"ln",
- (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
- .size())
- + updatedCustKeyValues.size()
- + updatedOrderKeyValues.size()
- + updatedShipmentKeyValues.size() });
-
- updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
- new Object[] { 500 });
- updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegion",
- new Object[] { 500 });
- updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegion",
- new Object[] { 500 });
-
- vm4.invoke(
- WANTestBase.class,
- "checkQueueSize",
- new Object[] {
+ sum));
+
+
+ updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+ updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 ));
+ updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 ));
+ int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+ .size())
+ + updatedCustKeyValues.size()
+ + updatedOrderKeyValues.size()
+ + updatedShipmentKeyValues.size();
+ vm4.invoke(() ->
+ WANTestBase.checkQueueSize(
"ln",
- (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
- .size())
- + updatedCustKeyValues.size()
- + updatedOrderKeyValues.size()
- + updatedShipmentKeyValues.size() });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.customerRegionName, 0 });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.orderRegionName, 0 });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.shipmentRegionName, 0 });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+ sum2));
+
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.customerRegionName, 0 ));
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.orderRegionName, 0 ));
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.shipmentRegionName, 0 ));
+
+ resumeSenders();
custKeyValues.putAll(updatedCustKeyValues);
orderKeyValues.putAll(updatedOrderKeyValues);
shipmentKeyValues.putAll(updatedShipmentKeyValues);
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.customerRegionName, custKeyValues.size() });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues.size() });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.customerRegionName, custKeyValues });
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues });
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues });
-
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.customerRegionName, custKeyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.customerRegionName, custKeyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues });
+ validateColocatedRegionContents(custKeyValues, orderKeyValues,
+ shipmentKeyValues);
}
+ //
+ //This is the same as the previous test, except for the UsingCustId methods
public void testParallelPropagationColoatedRegionConflationSameKey()
throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
-
- vm4.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- pause(3000);
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
-
- pause(2000);
-
- Map custKeyValues = (Map)vm4.invoke(WANTestBase.class, "putCustomerPartitionedRegion",
- new Object[] { 1000 });
- Map orderKeyValues = (Map)vm4.invoke(WANTestBase.class, "putOrderPartitionedRegionUsingCustId",
- new Object[] { 1000 });
- Map shipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "putShipmentPartitionedRegionUsingCustId",
- new Object[] { 1000 });
-
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
- .size()) });
-
- Map updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
- new Object[] { 500 });
- Map updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegionUsingCustId",
- new Object[] { 500 });
- Map updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegionUsingCustId",
- new Object[] { 500 });
-
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
- .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() });
-
- updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
- new Object[] { 500 });
- updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegionUsingCustId",
- new Object[] { 500 });
- updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegionUsingCustId",
- new Object[] { 500 });
-
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
- .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.customerRegionName, 0 });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.orderRegionName, 0 });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.shipmentRegionName, 0 });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+ initialSetUp();
+
+ createSendersWithConflation();
+
+ createOrderShipmentOnSenders();
+
+ startPausedSenders();
+
+ createOrderShipmentOnReceivers();
+
+ Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 ));
+ Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegionUsingCustId( 20 ));
+ Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegionUsingCustId( 20 ));
+
+ vm4.invoke(() ->checkQueueSize( "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+ .size()) ));
+
+ Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+ Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 ));
+ Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 ));
+ int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+ .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() ;
+
+ vm4.invoke(() ->checkQueueSize( "ln", sum));
+
+ updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+ updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 ));
+ updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 ));
+
+ int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+ .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size();
+ vm4.invoke(() ->checkQueueSize( "ln", sum2));
+
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.customerRegionName, 0 ));
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.orderRegionName, 0 ));
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.shipmentRegionName, 0 ));
+
+ resumeSenders();
custKeyValues.putAll(updatedCustKeyValues);
orderKeyValues.putAll(updatedOrderKeyValues);
shipmentKeyValues.putAll(updatedShipmentKeyValues);
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.customerRegionName, custKeyValues.size() });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues.size() });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.customerRegionName, custKeyValues });
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues });
- vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues });
-
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.customerRegionName, custKeyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues.size() });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.customerRegionName, custKeyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.orderRegionName, orderKeyValues });
- vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
- WANTestBase.shipmentRegionName, shipmentKeyValues });
+ validateColocatedRegionContents(custKeyValues, orderKeyValues,
+ shipmentKeyValues);
+ }
+
+ protected void validateColocatedRegionContents(Map custKeyValues,
+ Map orderKeyValues, Map shipmentKeyValues) {
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.customerRegionName, custKeyValues.size() ));
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.orderRegionName, orderKeyValues.size() ));
+ vm2.invoke(() ->validateRegionSize(
+ WANTestBase.shipmentRegionName, shipmentKeyValues.size() ));
+
+ vm2.invoke(() ->validateRegionContents(
+ WANTestBase.customerRegionName, custKeyValues ));
+ vm2.invoke(() ->validateRegionContents(
+ WANTestBase.orderRegionName, orderKeyValues ));
+ vm2.invoke(() ->validateRegionContents(
+ WANTestBase.shipmentRegionName, shipmentKeyValues ));
+
+ vm3.invoke(() ->validateRegionSize(
+ WANTestBase.customerRegionName, custKeyValues.size() ));
+ vm3.invoke(() ->validateRegionSize(
+ WANTestBase.orderRegionName, orderKeyValues.size() ));
+ vm3.invoke(() ->validateRegionSize(
+ WANTestBase.shipmentRegionName, shipmentKeyValues.size() ));
+
+ vm3.invoke(() ->validateRegionContents(
+ WANTestBase.customerRegionName, custKeyValues ));
+ vm3.invoke(() ->validateRegionContents(
+ WANTestBase.orderRegionName, orderKeyValues ));
+ vm3.invoke(() ->validateRegionContents(
+ WANTestBase.shipmentRegionName, shipmentKeyValues ));
+ }
+
+ protected void createOrderShipmentOnReceivers() {
+ vm2.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+ testName, null, 1, 8, isOffHeap() ));
+ vm3.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+ testName, null, 1, 8, isOffHeap() ));
+ }
+
+ protected void createOrderShipmentOnSenders() {
+ vm4.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+ testName, "ln", 0, 8, isOffHeap() ));
+ vm5.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+ testName, "ln", 0, 8, isOffHeap() ));
+ vm6.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+ testName, "ln", 0, 8, isOffHeap() ));
+ vm7.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+ testName, "ln", 0, 8, isOffHeap() ));
+ }
+
+ protected Map updateKeyValues() {
+ final Map updateKeyValues = new HashMap();
+ for(int i=0;i<10;i++) {
+ updateKeyValues.put(i, i+"_updated");
+ }
+
+ vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
+ return updateKeyValues;
+ }
+
+ protected Map putKeyValues() {
+ final Map keyValues = new HashMap();
+ for(int i=0; i< 20; i++) {
+ keyValues.put(i, i);
+ }
+
+
+ vm4.invoke(() ->putGivenKeyValue( testName, keyValues ));
+ return keyValues;
+ }
+
+ protected void validateReceiverRegionSize(final Map keyValues) {
+ vm2.invoke(() ->validateRegionSize(
+ testName, keyValues.size() ));
+ vm3.invoke(() ->validateRegionSize(
+ testName, keyValues.size() ));
+
+ vm2.invoke(() ->validateRegionContents(
+ testName, keyValues ));
+ vm3.invoke(() ->validateRegionContents(
+ testName, keyValues ));
+ }
+
+ protected void resumeSenders() {
+ vm4.invoke(() ->resumeSender( "ln" ));
+ vm5.invoke(() ->resumeSender( "ln" ));
+ vm6.invoke(() ->resumeSender( "ln" ));
+ vm7.invoke(() ->resumeSender( "ln" ));
+ }
+
+ protected void createReceiverPrs() {
+ vm2.invoke(() ->createPartitionedRegion(
+ testName, null, 1, 8, isOffHeap() ));
+ vm3.invoke(() ->createPartitionedRegion(
+ testName, null, 1, 8, isOffHeap() ));
+ }
+
+ protected void startPausedSenders() {
+ startSenders();
+
+ pauseSenders();
+ }
+
+ protected void pauseSenders() {
+ vm4.invoke(() ->pauseSender( "ln" ));
+ vm5.invoke(() ->pauseSender( "ln" ));
+ vm6.invoke(() ->pauseSender( "ln" ));
+ vm7.invoke(() ->pauseSender( "ln" ));
+ }
+
+ protected void startSenders() {
+ vm4.invoke(() ->startSender( "ln" ));
+ vm5.invoke(() ->startSender( "ln" ));
+ vm6.invoke(() ->startSender( "ln" ));
+ vm7.invoke(() ->startSender( "ln" ));
+ }
+
+ protected void createSenderPRs() {
+ createSenderPRs(0);
+ }
+
+ protected void createSenderPRs(int redundancy) {
+ vm4.invoke(() ->createPartitionedRegion(
+ testName, "ln", redundancy, 8, isOffHeap() ));
+ vm5.invoke(() ->createPartitionedRegion(
+ testName, "ln", redundancy, 8, isOffHeap() ));
+ vm6.invoke(() ->createPartitionedRegion(
+ testName, "ln", redundancy, 8, isOffHeap() ));
+ vm7.invoke(() ->createPartitionedRegion(
+ testName, "ln", redundancy, 8, isOffHeap() ));
+ }
+
+ protected void initialSetUp() {
+ Integer lnPort = (Integer)vm0.invoke(() ->createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() ->createFirstRemoteLocator( 2, lnPort ));
+
+ vm2.invoke(() ->createReceiver( nyPort ));
+ vm3.invoke(() ->createReceiver( nyPort ));
+
+ vm4.invoke(() ->createCache(lnPort ));
+ vm5.invoke(() ->createCache(lnPort ));
+ vm6.invoke(() ->createCache(lnPort ));
+ vm7.invoke(() ->createCache(lnPort ));
+ }
+
+ protected void createSendersNoConflation() {
+ vm4.invoke(() ->createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() ->createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() ->createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() ->createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ }
+
+ protected void createSendersWithConflation() {
+ vm4.invoke(() ->createSender( "ln", 2,
+ true, 100, 2, true, false, null, true ));
+ vm5.invoke(() ->createSender( "ln", 2,
+ true, 100, 2, true, false, null, true ));
+ vm6.invoke(() ->createSender( "ln", 2,
+ true, 100, 2, true, false, null, true ));
+ vm7.invoke(() ->createSender( "ln", 2,
+ true, 100, 2, true, false, null, true ));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index 979439b..a142f82 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -537,7 +537,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm4.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
+ vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
testName, null,1, 100, isOffHeap() });