You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/12/05 17:03:18 UTC

[2/2] incubator-geode git commit: GEODE-2123: Test batch removal threads are not shared across queues.

GEODE-2123: Test batch removal threads are not shared across queues.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0c62068c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0c62068c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0c62068c

Branch: refs/heads/develop
Commit: 0c62068cfb8ec4db5c1c15550c88fd22be237885
Parents: 8b90324
Author: Jason Huynh <hu...@gmail.com>
Authored: Wed Nov 23 09:31:14 2016 -0800
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon Dec 5 09:03:09 2016 -0800

----------------------------------------------------------------------
 .../cache/PartitionedRegionDataStore.java       |  9 ++++
 .../ConcurrentParallelGatewaySenderQueue.java   |  6 ++-
 .../parallel/ParallelGatewaySenderQueue.java    | 15 ++++--
 .../geode/internal/cache/wan/WANTestBase.java   | 20 +++++++-
 ...allelGatewaySenderOperation_2_DUnitTest.java | 48 ++++++++++++++++++++
 ...lelWANPropagationConcurrentOpsDUnitTest.java |  7 +--
 6 files changed, 93 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 2e9fdbc..5ddf0cb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2400,6 +2400,15 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
     return sizeOfLocalPrimaries;
   }
 
+  public int getSizeOfLocalBuckets(boolean includeSecondary) {
+    int sizeOfLocal = 0;
+    Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions();
+    for (BucketRegion br : primaryBuckets) {
+      sizeOfLocal += br.size();
+    }
+    return sizeOfLocal;
+  }
+
 
   /**
    * Interface for visiting buckets

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 603fd6c..c925166 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -123,7 +123,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   }
 
   public int localSize() {
-    return ((ParallelGatewaySenderQueue) (processors[0].getQueue())).localSize();
+    return localSize(false);
+  }
+
+  public int localSize(boolean includeSecondary) {
+    return ((ParallelGatewaySenderQueue) (processors[0].getQueue())).localSize(includeSecondary);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 553847d..69a887b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -964,7 +964,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   /**
    * TODO: Optimization needed. We are creating 1 array list for each peek!!
-   * 
+   *
    * @return BucketRegionQueue
    */
   private final BucketRegionQueue getRandomBucketRegionQueue() {
@@ -1033,7 +1033,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
        * Collections.shuffle(thisProcessorBuckets); for (Integer bucketId : thisProcessorBuckets) {
        * BucketRegionQueue br = (BucketRegionQueue)prQ.getDataStore()
        * .getBucketRegionQueueByBucketId(bucketId);
-       * 
+       *
        * if (br != null && br.isReadyForPeek()) { return br.getId(); } }
        */
     }
@@ -1455,12 +1455,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     return (BucketRegionQueue) prQ.getDataStore().getLocalBucketById(bucketId);
   }
 
-
   public int localSize() {
+    return localSize(false);
+  }
+
+  public int localSize(boolean includeSecondary) {
     int size = 0;
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
-        size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
+        if (includeSecondary) {
+          size += prQ.getDataStore().getSizeOfLocalBuckets(true);
+        } else {
+          size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
+        }
       }
       if (logger.isDebugEnabled()) {
         logger.debug("The name of the queue region is {} and the size is {}", prQ.getFullPath(),

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index fd1d4c9..9b9ef2b 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -3113,6 +3113,10 @@ public class WANTestBase extends JUnit4DistributedTestCase {
   }
 
   public static Integer getQueueContentSize(final String senderId) {
+    return getQueueContentSize(senderId, false);
+  }
+
+  public static Integer getQueueContentSize(final String senderId, boolean includeSecondary) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
     for (GatewaySender s : senders) {
@@ -3123,6 +3127,9 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     }
 
     if (!sender.isParallel()) {
+      if (includeSecondary) {
+        fail("Not implemented yet");
+      }
       final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
       int size = 0;
       for (RegionQueue q : queues) {
@@ -3132,8 +3139,19 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     } else if (sender.isParallel()) {
       RegionQueue regionQueue = null;
       regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
-      return regionQueue.getRegion().size();
+      if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) {
+        return ((ConcurrentParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
+      } else if (regionQueue instanceof ParallelGatewaySenderQueue) {
+        return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
+      } else {
+        if (includeSecondary) {
+          fail("Not Implemented yet");
+        }
+        regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
+        return regionQueue.getRegion().size();
+      }
     }
+    fail("Not yet implemented?");
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
index 931a49c..3ed867a 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
@@ -59,6 +59,54 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
     IgnoredException.addIgnoredException("Unexpected IOException");
   }
 
+  @Test
+  public void shuttingOneSenderInAVMShouldNotAffectOthersBatchRemovalThread() {
+    Integer lnport = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnport));
+
+    createCacheInVMs(lnport, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
+    vm2.invoke(() -> WANTestBase.createSender("ln2", 2, true, 100, 10, false, true, null, true));
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln,ln2", 1,
+        100, false));
+
+    createCacheInVMs(nyPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+        false));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", null, 1, 100,
+        false));
+    vm4.invoke(() -> WANTestBase.createReceiver());
+
+
+    vm2.invoke(() -> WANTestBase.startSender("ln"));
+    vm2.invoke(() -> WANTestBase.startSender("ln2"));
+
+    vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, true, null, true));
+    vm3.invoke(() -> WANTestBase.createSender("ln2", 2, true, 100, 10, false, true, null, true));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln,ln2", 1,
+        100, false));
+
+    vm3.invoke(() -> WANTestBase.startSender("ln"));
+    vm3.invoke(() -> WANTestBase.startSender("ln2"));
+
+    vm2.invokeAsync(() -> {
+      WANTestBase.doPuts(getTestMethodName() + "_PR", 10000);
+    });
+    vm4.invoke(() -> Awaitility.await().atMost(20, TimeUnit.SECONDS).until(
+        () -> assertEquals(true, WANTestBase.getRegionSize(getTestMethodName() + "_PR") > 100)));
+    vm2.invoke(() -> WANTestBase.stopSender("ln"));
+
+    vm2.invoke(() -> Awaitility.await().atMost(20, TimeUnit.SECONDS)
+        .until(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000)));
+    vm4.invoke(() -> Awaitility.await().atMost(20, TimeUnit.SECONDS)
+        .until(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000)));
+
+    vm3.invoke(() -> {
+      Awaitility.await().atMost(20, TimeUnit.SECONDS)
+          .until(() -> WANTestBase.getQueueContentSize("ln", true));
+    });
+  }
+
   // to test that when userPR is locally destroyed, shadow Pr is also locally
   // destroyed and on recreation userPr , shadow Pr is also recreated.
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c62068c/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
index 0ce6782..530dc43 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -80,8 +80,6 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
     vm4.invoke(() -> WANTestBase.pauseSender("ln"));
     vm5.invoke(() -> WANTestBase.pauseSender("ln"));
 
-    Wait.pause(5000);
-
     AsyncInvocation async1 =
         vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 700));
     AsyncInvocation async2 =
@@ -96,7 +94,7 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
     async3.join();
     async4.join();
 
-    int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize("ln"));
+    int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize("ln", true));
     assertEquals("Actual queue size is not matching with the expected", 3500, queueSize);
 
     // resume the senders now
@@ -195,14 +193,11 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
     vm6.invoke(() -> WANTestBase.addQueueListener("ln", true));
     vm7.invoke(() -> WANTestBase.addQueueListener("ln", true));
 
-    Wait.pause(2000);
     vm4.invoke(() -> WANTestBase.pauseSender("ln"));
     vm5.invoke(() -> WANTestBase.pauseSender("ln"));
     vm6.invoke(() -> WANTestBase.pauseSender("ln"));
     vm7.invoke(() -> WANTestBase.pauseSender("ln"));
 
-    Wait.pause(2000);
-
     vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 4));
     vm4.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
     vm5.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));