You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/03/16 18:07:33 UTC

[geode] branch develop updated: GEODE-4868: depose primary should reduce brq's size in deposePrimaryForColocatedChildren (#1625)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new d48607d  GEODE-4868: depose primary should reduce brq's size in deposePrimaryForColocatedChildren (#1625)
d48607d is described below

commit d48607d51f718a6c497d7e6b14a8dc9b87fe2e67
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Fri Mar 16 11:07:30 2018 -0700

    GEODE-4868: depose primary should reduce brq's size in deposePrimaryForColocatedChildren (#1625)
---
 .../org/apache/geode/internal/cache/BucketAdvisor.java  |  8 ++++----
 .../internal/cache/wan/AsyncEventQueueTestBase.java     |  3 +++
 .../wan/asyncqueue/AsyncEventListenerDUnitTest.java     | 17 +++++++++++++++++
 3 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index fc14773..074a60d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -279,10 +279,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
       } finally {
         this.activePrimaryMoveLock.unlock();
         if (needToSendProfileUpdate) {
-          if (this.getBucket() instanceof BucketRegionQueue) {
-            BucketRegionQueue brq = (BucketRegionQueue) this.getBucket();
-            brq.decQueueSize(brq.size());
-          }
           sendProfileUpdate();
         }
       }
@@ -316,6 +312,10 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
         if (b != null) {
           BucketAdvisor ba = b.getBucketAdvisor();
           deposedChildPrimaries = ba.deposePrimary() && deposedChildPrimaries;
+          if (b instanceof BucketRegionQueue) {
+            BucketRegionQueue brq = (BucketRegionQueue) b;
+            brq.decQueueSize(brq.size());
+          }
         }
       }
     }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 545d0ca..7a956c8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -717,6 +717,9 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
       }
     }
     final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .until(() -> assertEquals("Expected queue entries: " + queueSize + " but actual entries: "
+            + statistics.getEventQueueSize(), queueSize, statistics.getEventQueueSize()));
     assertEquals(queueSize, statistics.getEventQueueSize());
     assertEquals(eventsReceived, statistics.getEventsReceived());
     assertEquals(eventsQueued, statistics.getEventsQueued());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
index 465f35a..aa1db53 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -1519,6 +1519,11 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
         () -> AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(getTestMethodName() + "_PR"));
 
     LogWriterUtils.getLogWriter().info("Primary buckets on vm2: " + primaryBucketsvm2);
+
+    // before shutdown vm2, both vm1 and vm2 should have 40 events in primary queue
+    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 80, 80, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 80, 80, 0));
+
     // ---------------------------- Kill vm2 --------------------------
     vm2.invoke(() -> AsyncEventQueueTestBase.killSender());
     // ----------------------------------------------------------------
@@ -1527,6 +1532,8 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
     vm3.invoke(createCacheRunnable(lnPort));
     vm3.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueueWithListener2("ln", true, 100, 5,
         false, null));
+    // vm3 will move some primary buckets from vm1, but vm1's primary queue size did not reduce
+    vm3.invoke(pauseAsyncEventQueueRunnable());
     vm3.invoke(() -> AsyncEventQueueTestBase.createPRWithRedundantCopyWithAsyncEventQueue(
         getTestMethodName() + "_PR", "ln", isOffHeap()));
 
@@ -1535,7 +1542,17 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
     String regionName = getTestMethodName() + "_PR";
     Set<Integer> primaryBucketsvm3 = (Set<Integer>) vm3
         .invoke(() -> AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(regionName));
+    LogWriterUtils.getLogWriter().info("Primary buckets on vm3: " + primaryBucketsvm3);
+    Set<Integer> primaryBucketsvm1 = (Set<Integer>) vm1.invoke(
+        () -> AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(getTestMethodName() + "_PR"));
+    LogWriterUtils.getLogWriter()
+        .info("After shutdown vm2, started vm3, Primary buckets on vm1: " + primaryBucketsvm1);
+
+    // vm1.invoke(()->AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 80, 80, 80, 0));
+    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 80, 80, 0));
+    vm3.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 40, 0, 0, 0));
 
+    vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
     vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));
 
     vm1.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty("ln"));

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.