You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/12/05 17:03:18 UTC
[2/2] incubator-geode git commit: GEODE-2123: Test batch removal
threads are not shared across queues.
GEODE-2123: Test batch removal threads are not shared across queues.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0c62068c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0c62068c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0c62068c
Branch: refs/heads/develop
Commit: 0c62068cfb8ec4db5c1c15550c88fd22be237885
Parents: 8b90324
Author: Jason Huynh <hu...@gmail.com>
Authored: Wed Nov 23 09:31:14 2016 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon Dec 5 09:03:09 2016 -0800
----------------------------------------------------------------------
.../cache/PartitionedRegionDataStore.java | 9 ++++
.../ConcurrentParallelGatewaySenderQueue.java | 6 ++-
.../parallel/ParallelGatewaySenderQueue.java | 15 ++++--
.../geode/internal/cache/wan/WANTestBase.java | 20 +++++++-
...allelGatewaySenderOperation_2_DUnitTest.java | 48 ++++++++++++++++++++
...lelWANPropagationConcurrentOpsDUnitTest.java | 7 +--
6 files changed, 93 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 2e9fdbc..5ddf0cb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2400,6 +2400,15 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
return sizeOfLocalPrimaries;
}
+ public int getSizeOfLocalBuckets(boolean includeSecondary) {
+ int sizeOfLocal = 0;
+ Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions();
+ for (BucketRegion br : primaryBuckets) {
+ sizeOfLocal += br.size();
+ }
+ return sizeOfLocal;
+ }
+
/**
* Interface for visiting buckets
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 603fd6c..c925166 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -123,7 +123,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
}
public int localSize() {
- return ((ParallelGatewaySenderQueue) (processors[0].getQueue())).localSize();
+ return localSize(false);
+ }
+
+ public int localSize(boolean includeSecondary) {
+ return ((ParallelGatewaySenderQueue) (processors[0].getQueue())).localSize(includeSecondary);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 553847d..69a887b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -964,7 +964,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
/**
* TODO: Optimization needed. We are creating 1 array list for each peek!!
- *
+ *
* @return BucketRegionQueue
*/
private final BucketRegionQueue getRandomBucketRegionQueue() {
@@ -1033,7 +1033,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* Collections.shuffle(thisProcessorBuckets); for (Integer bucketId : thisProcessorBuckets) {
* BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore()
* .getBucketRegionQueueByBucketId(bucketId);
- *
+ *
* if (br != null && br.isReadyForPeek()) { return br.getId(); } }
*/
}
@@ -1455,12 +1455,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId);
}
-
public int localSize() {
+ return localSize(false);
+ }
+
+ public int localSize(boolean includeSecondary) {
int size = 0;
for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
if (prQ != null && prQ.getDataStore() != null) {
- size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
+ if (includeSecondary) {
+ size += prQ.getDataStore().getSizeOfLocalBuckets(true);
+ } else {
+ size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
+ }
}
if (logger.isDebugEnabled()) {
logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(),
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index fd1d4c9..9b9ef2b 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -3113,6 +3113,10 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
public static Integer getQueueContentSize(final String senderId) {
+ return getQueueContentSize(senderId, false);
+ }
+
+ public static Integer getQueueContentSize(final String senderId, boolean includeSecondary) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
for (GatewaySender s : senders) {
@@ -3123,6 +3127,9 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
if (!sender.isParallel()) {
+ if (includeSecondary) {
+ fail("Not implemented yet");
+ }
final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
int size = 0;
for (RegionQueue q : queues) {
@@ -3132,8 +3139,19 @@ public class WANTestBase extends JUnit4DistributedTestCase {
} else if (sender.isParallel()) {
RegionQueue regionQueue = null;
regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
- return regionQueue.getRegion().size();
+ if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) {
+ return ((ConcurrentParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
+ } else if (regionQueue instanceof ParallelGatewaySenderQueue) {
+ return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
+ } else {
+ if (includeSecondary) {
+ fail("Not Implemented yet");
+ }
+ regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
+ return regionQueue.getRegion().size();
+ }
}
+ fail("Not yet implemented?");
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
index 931a49c..3ed867a 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
@@ -59,6 +59,54 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
IgnoredException.addIgnoredException("Unexpected IOException");
}
+ @Test
+ public void shuttingOneSenderInAVMShouldNotAffectOthersBatchRemovalThread() {
+ Integer lnport = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnport));
+
+ createCacheInVMs(lnport, vm2, vm3);
+ vm2.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
+ vm2.invoke(() -> WANTestBase.createSender("ln2", 2, true, 100, 10, false, true, null, true));
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln,ln2", 1,
+ 100, false));
+
+ createCacheInVMs(nyPort, vm4, vm5);
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+ false));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+ false));
+ vm4.invoke(() -> WANTestBase.createReceiver());
+
+
+ vm2.invoke(() -> WANTestBase.startSender("ln"));
+ vm2.invoke(() -> WANTestBase.startSender("ln2"));
+
+ vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
+ vm3.invoke(() -> WANTestBase.createSender("ln2", 2, true, 100, 10, false, true, null, true));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln,ln2", 1,
+ 100, false));
+
+ vm3.invoke(() -> WANTestBase.startSender("ln"));
+ vm3.invoke(() -> WANTestBase.startSender("ln2"));
+
+ vm2.invokeAsync(() -> {
+ WANTestBase.doPuts(getTestMethodName() + "_PR", 10000);
+ });
+ vm4.invoke(() -> Awaitility.await().atMost(20, TimeUnit.SECONDS).until(
+ () -> assertEquals(true, WANTestBase.getRegionSize(getTestMethodName() + "_PR") > 100)));
+ vm2.invoke(() -> WANTestBase.stopSender("ln"));
+
+ vm2.invoke(() -> Awaitility.await().atMost(20, TimeUnit.SECONDS)
+ .until(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000)));
+ vm4.invoke(() -> Awaitility.await().atMost(20, TimeUnit.SECONDS)
+ .until(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000)));
+
+ vm3.invoke(() -> {
+ Awaitility.await().atMost(20, TimeUnit.SECONDS)
+ .until(() -> WANTestBase.getQueueContentSize("ln", true));
+ });
+ }
+
// to test that when userPR is locally destroyed, shadow Pr is also locally
// destroyed and on recreation userPr , shadow Pr is also recreated.
@Test
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
index 0ce6782..530dc43 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -80,8 +80,6 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm5.invoke(() -> WANTestBase.pauseSender("ln"));
- Wait.pause(5000);
-
AsyncInvocation async1 =
vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 700));
AsyncInvocation async2 =
@@ -96,7 +94,7 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
async3.join();
async4.join();
- int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize("ln"));
+ int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize("ln", true));
assertEquals("Actual queue size is not matching with the expected", 3500, queueSize);
// resume the senders now
@@ -195,14 +193,11 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
vm6.invoke(() -> WANTestBase.addQueueListener("ln", true));
vm7.invoke(() -> WANTestBase.addQueueListener("ln", true));
- Wait.pause(2000);
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm5.invoke(() -> WANTestBase.pauseSender("ln"));
vm6.invoke(() -> WANTestBase.pauseSender("ln"));
vm7.invoke(() -> WANTestBase.pauseSender("ln"));
- Wait.pause(2000);
-
vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 4));
vm4.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
vm5.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));