You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/24 00:38:36 UTC

[geode] branch feature/GEODE-4624 updated (f0225ad -> bec29d6)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit f0225ad  GEODE-4624: add a stats to trace total event removed by PQRM
     new bec29d6  GEODE-4624: add a stats to trace total event removed by PQRM

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f0225ad)
            \
             N -- N -- N   refs/heads/feature/GEODE-4624 (bec29d6)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/cache/asyncqueue/internal/AsyncEventQueueStats.java       | 3 ++-
 .../org/apache/geode/internal/cache/wan/GatewaySenderStats.java     | 3 ++-
 .../internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java | 6 ++----
 3 files changed, 6 insertions(+), 6 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.

[geode] 01/01: GEODE-4624: add a stats to trace total event removed by PQRM

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit bec29d63c7dbef2bb950d3a08a04d0ee9a7123ce
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Apr 18 22:12:40 2018 -0700

    GEODE-4624: add a stats to trace total event removed by PQRM
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  8 ++-
 .../internal/cache/AbstractBucketRegionQueue.java  |  4 ++
 .../geode/internal/cache/BucketRegionQueue.java    |  2 +-
 .../internal/cache/wan/AbstractGatewaySender.java  |  5 +-
 .../wan/AbstractGatewaySenderEventProcessor.java   |  2 +-
 .../internal/cache/wan/GatewaySenderStats.java     | 79 +++++++++++++++-------
 .../wan/parallel/ParallelQueueRemovalMessage.java  | 10 +++
 .../SerialAsyncEventQueueImplJUnitTest.java        |  7 +-
 .../cache/wan/AsyncEventQueueTestBase.java         |  6 +-
 .../ParallelQueueRemovalMessageJUnitTest.java      | 10 +--
 .../bean/stats/AsyncEventQueueStatsJUnitTest.java  |  5 ++
 .../geode/internal/cache/wan/WANTestBase.java      | 13 ++--
 .../parallel/ParallelWANConflationDUnitTest.java   | 51 ++++++++++----
 13 files changed, 142 insertions(+), 60 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 8d68cee..963b828 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -42,7 +42,10 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
-            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.",
+            f.createIntGauge(SECONDARY_EVENT_QUEUE_SIZE, "Size of the secondary event queue.",
+                "operations", false),
+            f.createIntGauge(EVENTS_PROCESSED_BY_PQRM,
+                "Total number of events processed by Parallel Queue Removal Message(PQRM).",
                 "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
                 "operations", false),
@@ -110,7 +113,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
-    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    secondaryEventQueueSizeId = type.nameToId(SECONDARY_EVENT_QUEUE_SIZE);
+    eventsProcessedByPQRMId = type.nameToId(EVENTS_PROCESSED_BY_PQRM);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 2406b18..be0eb88 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     this.gatewaySenderStats.incSecondaryQueueSize(size);
   }
 
+  public void incEventsProcessedByPQRM(int size) {
+    this.gatewaySenderStats.incEventsProcessedByPQRM(size);
+  }
+
   public void incQueueSize() {
     this.gatewaySenderStats.incQueueSize();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index afc1544..26309a3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -292,7 +292,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       Long previousTailKey = (Long) latestIndexesForRegion.put(keyToConflate, tailKey);
       if (previousTailKey != null) {
         if (logger.isDebugEnabled()) {
-          logger.debug("{}: Conflating {} at queue index={} and previousTailKey: ", this, object,
+          logger.debug("{}: Conflating {} at queue index={} and previousTailKey={} ", this, object,
               tailKey, previousTailKey);
         }
         AbstractGatewaySenderEventProcessor ep =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 59547b2..1027582 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
 
     statistics.setQueueSize(0);
     statistics.setSecondaryQueueSize(0);
+    statistics.setEventsProcessedByPQRM(0);
     statistics.setTempQueueSize(0);
   }
 
@@ -1252,9 +1253,9 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
-  public int getEventSecondaryQueueSize() {
+  public int getSecondaryEventQueueSize() {
     AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
-    return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+    return localProcessor == null ? 0 : localProcessor.secondaryEventQueueSize();
   }
 
   public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index baaabd0..cdf6125 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -265,7 +265,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     return this.queue.size();
   }
 
-  public int eventSecondaryQueueSize() {
+  public int secondaryEventQueueSize() {
     if (queue == null) {
       return 0;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 15ff18e..a9b4c21 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -46,8 +46,10 @@ public class GatewaySenderStats {
   protected static final String EVENT_QUEUE_TIME = "eventQueueTime";
   /** Name of the event queue size statistic */
   protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
-  /** Name of the event secondary queue size statistic */
-  protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
+  /** Name of the secondary event queue size statistic */
+  protected static final String SECONDARY_EVENT_QUEUE_SIZE = "secondaryEventQueueSize";
+  /** Total number of events processed by queue removal message statistic */
+  protected static final String EVENTS_PROCESSED_BY_PQRM = "eventsProcessedByPQRM";
   /** Name of the event temporary queue size statistic */
   protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
   /** Name of the events distributed statistic */
@@ -107,7 +109,9 @@ public class GatewaySenderStats {
   /** Id of the event queue size statistic */
   protected static int eventQueueSizeId;
   /** Id of the event in secondary queue size statistic */
-  protected static int eventSecondaryQueueSizeId;
+  protected static int secondaryEventQueueSizeId;
+  /** Id of the events processed by Parallel Queue Removal Message(PQRM) statistic */
+  protected static int eventsProcessedByPQRMId;
   /** Id of the temp event queue size statistic */
   protected static int eventTmpQueueSizeId;
   /** Id of the events distributed statistic */
@@ -172,7 +176,10 @@ public class GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
-            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.",
+            f.createIntGauge(SECONDARY_EVENT_QUEUE_SIZE, "Size of secondary event queue.",
+                "operations", false),
+            f.createIntGauge(EVENTS_PROCESSED_BY_PQRM,
+                "Total number of events processed by Parallel Queue Removal Message(PQRM).",
                 "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
                 false),
@@ -244,7 +251,8 @@ public class GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
-    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    secondaryEventQueueSizeId = type.nameToId(SECONDARY_EVENT_QUEUE_SIZE);
+    eventsProcessedByPQRMId = type.nameToId(EVENTS_PROCESSED_BY_PQRM);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -365,12 +373,21 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Returns the current value of the "eventSecondaryQueueSize" stat.
+   * Returns the current value of the "secondaryEventQueueSize" stat.
    *
-   * @return the current value of the "eventSecondaryQueueSize" stat
+   * @return the current value of the "secondaryEventQueueSize" stat
    */
-  public int getEventSecondaryQueueSize() {
-    return this.stats.getInt(eventSecondaryQueueSizeId);
+  public int getSecondaryEventQueueSize() {
+    return this.stats.getInt(secondaryEventQueueSizeId);
+  }
+
+  /**
+   * Returns the current value of the "eventsProcessedByPQRM" stat.
+   *
+   * @return the current value of the "eventsProcessedByPQRM" stat
+   */
+  public int getEventsProcessedByPQRM() {
+    return this.stats.getInt(eventsProcessedByPQRMId);
   }
 
   /**
@@ -478,12 +495,21 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Sets the "eventSecondaryQueueSize" stat.
+   * Sets the "secondaryEventQueueSize" stat.
    *
    * @param size The size of the secondary queue
    */
   public void setSecondaryQueueSize(int size) {
-    this.stats.setInt(eventSecondaryQueueSizeId, size);
+    this.stats.setInt(secondaryEventQueueSizeId, size);
+  }
+
+  /**
+   * Sets the "eventsProcessedByPQRM" stat.
+   *
+   * @param size The total number of the events processed by queue removal message
+   */
+  public void setEventsProcessedByPQRM(int size) {
+    this.stats.setInt(eventsProcessedByPQRMId, size);
   }
 
   /**
@@ -504,11 +530,10 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Increments the "eventSecondaryQueueSize" stat by 1.
+   * Increments the "secondaryEventQueueSize" stat by 1.
    */
   public void incSecondaryQueueSize() {
-    this.stats.incInt(eventSecondaryQueueSizeId, 1);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, 1);
   }
 
   /**
@@ -528,13 +553,21 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Increments the "eventSecondaryQueueSize" stat by given delta.
+   * Increments the "secondaryEventQueueSize" stat by given delta.
    *
-   * @param delta an integer by which secondary queue size to be increased
+   * @param delta an integer by which secondary event queue size to be increased
    */
   public void incSecondaryQueueSize(int delta) {
-    this.stats.incInt(eventSecondaryQueueSizeId, delta);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, delta);
+  }
+
+  /**
+   * Increments the "eventsProcessedByPQRM" stat by given delta.
+   *
+   * @param delta an integer by which events are processed by queue removal message
+   */
+  public void incEventsProcessedByPQRM(int delta) {
+    this.stats.incInt(eventsProcessedByPQRMId, delta);
   }
 
   /**
@@ -554,11 +587,10 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Decrements the "eventSecondaryQueueSize" stat by 1.
+   * Decrements the "secondaryEventQueueSize" stat by 1.
    */
   public void decSecondaryQueueSize() {
-    this.stats.incInt(eventSecondaryQueueSizeId, -1);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, -1);
   }
 
   /**
@@ -578,13 +610,12 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Decrements the "eventSecondaryQueueSize" stat by given delta.
+   * Decrements the "secondaryEventQueueSize" stat by given delta.
    *
    * @param delta an integer by which secondary queue size to be increased
    */
   public void decSecondaryQueueSize(int delta) {
-    this.stats.incInt(eventSecondaryQueueSizeId, -delta);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, -delta);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index df89e36..6d47266 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -70,6 +70,15 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
   }
 
   @Override
+  public String toString() {
+    String cname = getShortClassName();
+    final StringBuilder sb = new StringBuilder(cname);
+    sb.append("regionToDispatchedKeysMap=" + regionToDispatchedKeysMap);
+    sb.append(" sender=").append(getSender());
+    return sb.toString();
+  }
+
+  @Override
   protected void process(ClusterDistributionManager dm) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     final InternalCache cache = dm.getCache();
@@ -185,6 +194,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
       brq.destroyKey(key);
       if (!brq.getBucketAdvisor().isPrimary()) {
         prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+        prQ.getParallelGatewaySender().getStatistics().incEventsProcessedByPQRM(1);
       }
       if (isDebugEnabled) {
         logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 4c5caa2..666a0e8 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest {
     queue.getStatistics().incQueueSize(5);
     queue.getStatistics().incSecondaryQueueSize(6);
     queue.getStatistics().incTempQueueSize(10);
+    queue.getStatistics().incEventsProcessedByPQRM(3);
 
     assertEquals(5, queue.getStatistics().getEventQueueSize());
-    assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
+    assertEquals(6, queue.getStatistics().getSecondaryEventQueueSize());
     assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(3, queue.getStatistics().getEventsProcessedByPQRM());
 
     queue.stop();
 
     assertEquals(0, queue.getStatistics().getEventQueueSize());
-    assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
+    assertEquals(0, queue.getStatistics().getSecondaryEventQueueSize());
     assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(0, queue.getStatistics().getEventsProcessedByPQRM());
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 2074e9e..2ff8886 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -738,12 +738,12 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
       Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
         assertEquals(
             "Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is "
-                + statistics.getEventSecondaryQueueSize(),
-            secondaryQueueSize, statistics.getEventSecondaryQueueSize());
+                + statistics.getSecondaryEventQueueSize(),
+            secondaryQueueSize, statistics.getSecondaryEventQueueSize());
       });
     } else {
       // for serial queue, evenvSecondaryQueueSize is not used
-      assertEquals(0, statistics.getEventSecondaryQueueSize());
+      assertEquals(0, statistics.getSecondaryEventQueueSize());
     }
     assertEquals(queueSize, statistics.getEventQueueSize());
     assertEquals(eventsReceived, statistics.getEventsReceived());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index d1ea59f..578c030 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -197,7 +197,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
     assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
     // failed BatchRemovalMessage will not modify stats
-    assertEquals(1, stats.getEventSecondaryQueueSize());
+    assertEquals(1, stats.getSecondaryEventQueueSize());
   }
 
   @Test
@@ -209,7 +209,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
-    assertEquals(1, stats.getEventSecondaryQueueSize());
+    assertEquals(1, stats.getSecondaryEventQueueSize());
 
     // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
     // DESTROYED)
@@ -219,7 +219,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Clean up destroyed tokens and validate BucketRegionQueue
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
     assertEquals(0, this.bucketRegionQueue.size());
-    assertEquals(0, stats.getEventSecondaryQueueSize());
+    assertEquals(0, stats.getSecondaryEventQueueSize());
   }
 
   @Test
@@ -257,7 +257,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
-    assertEquals(1, stats.getEventSecondaryQueueSize());
+    assertEquals(1, stats.getSecondaryEventQueueSize());
 
     // Add a mock GatewaySenderEventImpl to the temp queue
     BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event);
@@ -270,7 +270,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
     // Validate temp queue is empty after processing ParallelQueueRemovalMessage
     assertEquals(0, tempQueue.size());
-    assertEquals(0, stats.getEventSecondaryQueueSize());
+    assertEquals(0, stats.getSecondaryEventQueueSize());
 
     // Clean up destroyed tokens
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index 7c485be..c475d4e 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
     StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics();
     int notQueueEvents = 0;
     int notQueueToPrimary = 0;
+    int eventsProcessedByPQRM = 0;
     for (StatisticDescriptor s : sds) {
       if (s.getName().equals("notQueuedEvent")) {
         notQueueEvents++;
@@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
       if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) {
         notQueueToPrimary++;
       }
+      if (s.getName().equals("eventsProcessedByPQRM")) {
+        eventsProcessedByPQRM++;
+      }
     }
     assertEquals(1, notQueueEvents);
     assertEquals(1, notQueueToPrimary);
+    assertEquals(1, eventsProcessedByPQRM);
   }
 
 }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index f989405..f429919 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1170,7 +1170,7 @@ public class WANTestBase extends DistributedTestCase {
   public static int getSecondaryQueueSizeInStats(String senderId) {
     AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
     GatewaySenderStats statistics = sender.getStatistics();
-    return statistics.getEventSecondaryQueueSize();
+    return statistics.getSecondaryEventQueueSize();
   }
 
   public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
@@ -1201,7 +1201,8 @@ public class WANTestBase extends DistributedTestCase {
     stats.add(statistics.getEventsNotQueuedConflated());
     stats.add(statistics.getEventsConflatedFromBatches());
     stats.add(statistics.getConflationIndexesMapSize());
-    stats.add(statistics.getEventSecondaryQueueSize());
+    stats.add(statistics.getSecondaryEventQueueSize());
+    stats.add(statistics.getEventsProcessedByPQRM());
     return stats;
   }
 
@@ -3154,7 +3155,7 @@ public class WANTestBase extends DistributedTestCase {
       }
     }
     AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
-    int size = abstractSender.getEventSecondaryQueueSize();
+    int size = abstractSender.getSecondaryEventQueueSize();
     return size;
   }
 
@@ -3256,11 +3257,11 @@ public class WANTestBase extends DistributedTestCase {
           abstractSender.getEventQueueSize());
       Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
         assertEquals("Expected events in all secondary queues are drained but actual is "
-            + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
-            + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
+            + abstractSender.getSecondaryEventQueueSize() + ". Queue content is: "
+            + displayQueueContent(queue), 0, abstractSender.getSecondaryEventQueueSize());
       });
       assertEquals("Except events in all secondary queues after drain is 0", 0,
-          abstractSender.getEventSecondaryQueueSize());
+          abstractSender.getSecondaryEventQueueSize());
     } finally {
       exp.remove();
       exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 41cd89a..1542b0d 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -75,7 +75,6 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
 
     keyValues.putAll(updateKeyValues);
     validateReceiverRegionSize(keyValues);
-
   }
 
   /**
@@ -138,11 +137,12 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
     assertTrue("No events conflated in batch",
         (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
 
-    verifyEventSecondaryQueuesDrained("ln");
+    verifySecondaryEventQueuesDrained("ln");
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
+    validateEventsProcessedByPQRM(100, 1);
   }
 
-  private void verifyEventSecondaryQueuesDrained(final String senderId) {
+  private void verifySecondaryEventQueuesDrained(final String senderId) {
     Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
       int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"));
       int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"));
@@ -185,14 +185,18 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
     vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
                                                                                        // aren't
                                                                                        // conflated
-    validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
+    validateSecondaryEventQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
 
     vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
 
     int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size();
+
+    // ParallelQueueRemovalMessage will send for each event conflated at primary to secondary queues
+    int totalEventsProcessedByPQRM = expectedEventNumAfterConflation + updateKeyValues.size();
+
     vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation));
 
-    validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy);
+    validateSecondaryEventQueueSize(expectedEventNumAfterConflation, redundancy);
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
 
@@ -203,21 +207,40 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
 
     // after dispatch, both primary and secondary queues are empty
     vm4.invoke(() -> checkQueueSize("ln", 0));
-    verifyEventSecondaryQueuesDrained("ln");
-    validateEventSecondaryQueueSize(0, redundancy);
+    verifySecondaryEventQueuesDrained("ln");
+    validateSecondaryEventQueueSize(0, redundancy);
+    validateEventsProcessedByPQRM(totalEventsProcessedByPQRM, redundancy);
   }
 
-  private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) {
-    ArrayList<Integer> v4List =
+  private void validateSecondaryEventQueueSize(int expectedNum, int redundancy) {
+    ArrayList<Integer> vm4List =
         (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> v5List =
+    ArrayList<Integer> vm5List =
         (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> v6List =
+    ArrayList<Integer> vm6List =
         (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> v7List =
+    ArrayList<Integer> vm7List =
         (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    assertTrue("Event in secondary queue should be 100",
-        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum
+    assertTrue(
+        "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is "
+            + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)),
+        (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum
+            * redundancy);
+  }
+
+  private void validateEventsProcessedByPQRM(int expectedNum, int redundancy) {
+    ArrayList<Integer> vm4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> vm5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> vm6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> vm7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    assertTrue(
+        "Event processed by queue removal message should be " + (expectedNum * redundancy)
+            + ", but is " + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)),
+        (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum
             * redundancy);
   }
 

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.