You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/24 16:03:29 UTC

[geode] branch develop updated: GEODE-4624: add a stats to trace total event removed by PQRM (#1850)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a53b935  GEODE-4624: add a stats to trace total event removed by PQRM (#1850)
a53b935 is described below

commit a53b9350bbf72a5b3be27983c7b07d73daad6884
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Tue Apr 24 09:03:24 2018 -0700

    GEODE-4624: add a stats to trace total event removed by PQRM (#1850)
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  8 ++-
 .../internal/cache/AbstractBucketRegionQueue.java  |  4 ++
 .../geode/internal/cache/BucketRegionQueue.java    |  2 +-
 .../internal/cache/wan/AbstractGatewaySender.java  |  5 +-
 .../wan/AbstractGatewaySenderEventProcessor.java   |  2 +-
 .../internal/cache/wan/GatewaySenderStats.java     | 79 +++++++++++++++-------
 .../wan/parallel/ParallelQueueRemovalMessage.java  | 10 +++
 .../SerialAsyncEventQueueImplJUnitTest.java        |  7 +-
 .../cache/wan/AsyncEventQueueTestBase.java         |  6 +-
 .../ParallelQueueRemovalMessageJUnitTest.java      | 10 +--
 .../bean/stats/AsyncEventQueueStatsJUnitTest.java  |  5 ++
 .../geode/internal/cache/wan/WANTestBase.java      | 13 ++--
 .../parallel/ParallelWANConflationDUnitTest.java   | 51 ++++++++++----
 13 files changed, 142 insertions(+), 60 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 8d68cee..963b828 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -42,7 +42,10 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
-            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.",
+            f.createIntGauge(SECONDARY_EVENT_QUEUE_SIZE, "Size of the secondary event queue.",
+                "operations", false),
+            f.createIntGauge(EVENTS_PROCESSED_BY_PQRM,
+                "Total number of events processed by Parallel Queue Removal Message(PQRM).",
                 "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
                 "operations", false),
@@ -110,7 +113,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
-    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    secondaryEventQueueSizeId = type.nameToId(SECONDARY_EVENT_QUEUE_SIZE);
+    eventsProcessedByPQRMId = type.nameToId(EVENTS_PROCESSED_BY_PQRM);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 2406b18..be0eb88 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     this.gatewaySenderStats.incSecondaryQueueSize(size);
   }
 
+  public void incEventsProcessedByPQRM(int size) {
+    this.gatewaySenderStats.incEventsProcessedByPQRM(size);
+  }
+
   public void incQueueSize() {
     this.gatewaySenderStats.incQueueSize();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index afc1544..26309a3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -292,7 +292,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       Long previousTailKey = (Long) latestIndexesForRegion.put(keyToConflate, tailKey);
       if (previousTailKey != null) {
         if (logger.isDebugEnabled()) {
-          logger.debug("{}: Conflating {} at queue index={} and previousTailKey: ", this, object,
+          logger.debug("{}: Conflating {} at queue index={} and previousTailKey={} ", this, object,
               tailKey, previousTailKey);
         }
         AbstractGatewaySenderEventProcessor ep =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 59547b2..1027582 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
 
     statistics.setQueueSize(0);
     statistics.setSecondaryQueueSize(0);
+    statistics.setEventsProcessedByPQRM(0);
     statistics.setTempQueueSize(0);
   }
 
@@ -1252,9 +1253,9 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
-  public int getEventSecondaryQueueSize() {
+  public int getSecondaryEventQueueSize() {
     AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
-    return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+    return localProcessor == null ? 0 : localProcessor.secondaryEventQueueSize();
   }
 
   public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index baaabd0..cdf6125 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -265,7 +265,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     return this.queue.size();
   }
 
-  public int eventSecondaryQueueSize() {
+  public int secondaryEventQueueSize() {
     if (queue == null) {
       return 0;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 15ff18e..a9b4c21 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -46,8 +46,10 @@ public class GatewaySenderStats {
   protected static final String EVENT_QUEUE_TIME = "eventQueueTime";
   /** Name of the event queue size statistic */
   protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
-  /** Name of the event secondary queue size statistic */
-  protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
+  /** Name of the secondary event queue size statistic */
+  protected static final String SECONDARY_EVENT_QUEUE_SIZE = "secondaryEventQueueSize";
+  /** Total number of events processed by queue removal message statistic */
+  protected static final String EVENTS_PROCESSED_BY_PQRM = "eventsProcessedByPQRM";
   /** Name of the event temporary queue size statistic */
   protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
   /** Name of the events distributed statistic */
@@ -107,7 +109,9 @@ public class GatewaySenderStats {
   /** Id of the event queue size statistic */
   protected static int eventQueueSizeId;
   /** Id of the event in secondary queue size statistic */
-  protected static int eventSecondaryQueueSizeId;
+  protected static int secondaryEventQueueSizeId;
+  /** Id of the events processed by Parallel Queue Removal Message(PQRM) statistic */
+  protected static int eventsProcessedByPQRMId;
   /** Id of the temp event queue size statistic */
   protected static int eventTmpQueueSizeId;
   /** Id of the events distributed statistic */
@@ -172,7 +176,10 @@ public class GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
-            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.",
+            f.createIntGauge(SECONDARY_EVENT_QUEUE_SIZE, "Size of secondary event queue.",
+                "operations", false),
+            f.createIntGauge(EVENTS_PROCESSED_BY_PQRM,
+                "Total number of events processed by Parallel Queue Removal Message(PQRM).",
                 "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
                 false),
@@ -244,7 +251,8 @@ public class GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
-    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    secondaryEventQueueSizeId = type.nameToId(SECONDARY_EVENT_QUEUE_SIZE);
+    eventsProcessedByPQRMId = type.nameToId(EVENTS_PROCESSED_BY_PQRM);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -365,12 +373,21 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Returns the current value of the "eventSecondaryQueueSize" stat.
+   * Returns the current value of the "secondaryEventQueueSize" stat.
    *
-   * @return the current value of the "eventSecondaryQueueSize" stat
+   * @return the current value of the "secondaryEventQueueSize" stat
    */
-  public int getEventSecondaryQueueSize() {
-    return this.stats.getInt(eventSecondaryQueueSizeId);
+  public int getSecondaryEventQueueSize() {
+    return this.stats.getInt(secondaryEventQueueSizeId);
+  }
+
+  /**
+   * Returns the current value of the "eventsProcessedByPQRM" stat.
+   *
+   * @return the current value of the "eventsProcessedByPQRM" stat
+   */
+  public int getEventsProcessedByPQRM() {
+    return this.stats.getInt(eventsProcessedByPQRMId);
   }
 
   /**
@@ -478,12 +495,21 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Sets the "eventSecondaryQueueSize" stat.
+   * Sets the "secondaryEventQueueSize" stat.
    *
    * @param size The size of the secondary queue
    */
   public void setSecondaryQueueSize(int size) {
-    this.stats.setInt(eventSecondaryQueueSizeId, size);
+    this.stats.setInt(secondaryEventQueueSizeId, size);
+  }
+
+  /**
+   * Sets the "eventsProcessedByPQRM" stat.
+   *
+   * @param size The total number of the events processed by queue removal message
+   */
+  public void setEventsProcessedByPQRM(int size) {
+    this.stats.setInt(eventsProcessedByPQRMId, size);
   }
 
   /**
@@ -504,11 +530,10 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Increments the "eventSecondaryQueueSize" stat by 1.
+   * Increments the "secondaryEventQueueSize" stat by 1.
    */
   public void incSecondaryQueueSize() {
-    this.stats.incInt(eventSecondaryQueueSizeId, 1);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, 1);
   }
 
   /**
@@ -528,13 +553,21 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Increments the "eventSecondaryQueueSize" stat by given delta.
+   * Increments the "secondaryEventQueueSize" stat by given delta.
    *
-   * @param delta an integer by which secondary queue size to be increased
+   * @param delta an integer by which secondary event queue size to be increased
    */
   public void incSecondaryQueueSize(int delta) {
-    this.stats.incInt(eventSecondaryQueueSizeId, delta);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, delta);
+  }
+
+  /**
+   * Increments the "eventsProcessedByPQRM" stat by given delta.
+   *
+   * @param delta an integer by which events are processed by queue removal message
+   */
+  public void incEventsProcessedByPQRM(int delta) {
+    this.stats.incInt(eventsProcessedByPQRMId, delta);
   }
 
   /**
@@ -554,11 +587,10 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Decrements the "eventSecondaryQueueSize" stat by 1.
+   * Decrements the "secondaryEventQueueSize" stat by 1.
    */
   public void decSecondaryQueueSize() {
-    this.stats.incInt(eventSecondaryQueueSizeId, -1);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, -1);
   }
 
   /**
@@ -578,13 +610,12 @@ public class GatewaySenderStats {
   }
 
   /**
-   * Decrements the "eventSecondaryQueueSize" stat by given delta.
+   * Decrements the "secondaryEventQueueSize" stat by given delta.
    *
    * @param delta an integer by which secondary queue size to be increased
    */
   public void decSecondaryQueueSize(int delta) {
-    this.stats.incInt(eventSecondaryQueueSizeId, -delta);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+    this.stats.incInt(secondaryEventQueueSizeId, -delta);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index df89e36..6d47266 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -70,6 +70,15 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
   }
 
   @Override
+  public String toString() {
+    String cname = getShortClassName();
+    final StringBuilder sb = new StringBuilder(cname);
+    sb.append("regionToDispatchedKeysMap=" + regionToDispatchedKeysMap);
+    sb.append(" sender=").append(getSender());
+    return sb.toString();
+  }
+
+  @Override
   protected void process(ClusterDistributionManager dm) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     final InternalCache cache = dm.getCache();
@@ -185,6 +194,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
       brq.destroyKey(key);
       if (!brq.getBucketAdvisor().isPrimary()) {
         prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+        prQ.getParallelGatewaySender().getStatistics().incEventsProcessedByPQRM(1);
       }
       if (isDebugEnabled) {
         logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 4c5caa2..666a0e8 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest {
     queue.getStatistics().incQueueSize(5);
     queue.getStatistics().incSecondaryQueueSize(6);
     queue.getStatistics().incTempQueueSize(10);
+    queue.getStatistics().incEventsProcessedByPQRM(3);
 
     assertEquals(5, queue.getStatistics().getEventQueueSize());
-    assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
+    assertEquals(6, queue.getStatistics().getSecondaryEventQueueSize());
     assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(3, queue.getStatistics().getEventsProcessedByPQRM());
 
     queue.stop();
 
     assertEquals(0, queue.getStatistics().getEventQueueSize());
-    assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
+    assertEquals(0, queue.getStatistics().getSecondaryEventQueueSize());
     assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(0, queue.getStatistics().getEventsProcessedByPQRM());
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 2074e9e..2ff8886 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -738,12 +738,12 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
       Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
         assertEquals(
             "Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is "
-                + statistics.getEventSecondaryQueueSize(),
-            secondaryQueueSize, statistics.getEventSecondaryQueueSize());
+                + statistics.getSecondaryEventQueueSize(),
+            secondaryQueueSize, statistics.getSecondaryEventQueueSize());
       });
     } else {
       // for serial queue, evenvSecondaryQueueSize is not used
-      assertEquals(0, statistics.getEventSecondaryQueueSize());
+      assertEquals(0, statistics.getSecondaryEventQueueSize());
     }
     assertEquals(queueSize, statistics.getEventQueueSize());
     assertEquals(eventsReceived, statistics.getEventsReceived());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index d1ea59f..578c030 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -197,7 +197,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
     assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
     // failed BatchRemovalMessage will not modify stats
-    assertEquals(1, stats.getEventSecondaryQueueSize());
+    assertEquals(1, stats.getSecondaryEventQueueSize());
   }
 
   @Test
@@ -209,7 +209,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
-    assertEquals(1, stats.getEventSecondaryQueueSize());
+    assertEquals(1, stats.getSecondaryEventQueueSize());
 
     // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
     // DESTROYED)
@@ -219,7 +219,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Clean up destroyed tokens and validate BucketRegionQueue
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
     assertEquals(0, this.bucketRegionQueue.size());
-    assertEquals(0, stats.getEventSecondaryQueueSize());
+    assertEquals(0, stats.getSecondaryEventQueueSize());
   }
 
   @Test
@@ -257,7 +257,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
-    assertEquals(1, stats.getEventSecondaryQueueSize());
+    assertEquals(1, stats.getSecondaryEventQueueSize());
 
     // Add a mock GatewaySenderEventImpl to the temp queue
     BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event);
@@ -270,7 +270,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
     // Validate temp queue is empty after processing ParallelQueueRemovalMessage
     assertEquals(0, tempQueue.size());
-    assertEquals(0, stats.getEventSecondaryQueueSize());
+    assertEquals(0, stats.getSecondaryEventQueueSize());
 
     // Clean up destroyed tokens
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index 7c485be..c475d4e 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
     StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics();
     int notQueueEvents = 0;
     int notQueueToPrimary = 0;
+    int eventsProcessedByPQRM = 0;
     for (StatisticDescriptor s : sds) {
       if (s.getName().equals("notQueuedEvent")) {
         notQueueEvents++;
@@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
       if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) {
         notQueueToPrimary++;
       }
+      if (s.getName().equals("eventsProcessedByPQRM")) {
+        eventsProcessedByPQRM++;
+      }
     }
     assertEquals(1, notQueueEvents);
     assertEquals(1, notQueueToPrimary);
+    assertEquals(1, eventsProcessedByPQRM);
   }
 
 }
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index f989405..f429919 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1170,7 +1170,7 @@ public class WANTestBase extends DistributedTestCase {
   public static int getSecondaryQueueSizeInStats(String senderId) {
     AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
     GatewaySenderStats statistics = sender.getStatistics();
-    return statistics.getEventSecondaryQueueSize();
+    return statistics.getSecondaryEventQueueSize();
   }
 
   public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
@@ -1201,7 +1201,8 @@ public class WANTestBase extends DistributedTestCase {
     stats.add(statistics.getEventsNotQueuedConflated());
     stats.add(statistics.getEventsConflatedFromBatches());
     stats.add(statistics.getConflationIndexesMapSize());
-    stats.add(statistics.getEventSecondaryQueueSize());
+    stats.add(statistics.getSecondaryEventQueueSize());
+    stats.add(statistics.getEventsProcessedByPQRM());
     return stats;
   }
 
@@ -3154,7 +3155,7 @@ public class WANTestBase extends DistributedTestCase {
       }
     }
     AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
-    int size = abstractSender.getEventSecondaryQueueSize();
+    int size = abstractSender.getSecondaryEventQueueSize();
     return size;
   }
 
@@ -3256,11 +3257,11 @@ public class WANTestBase extends DistributedTestCase {
           abstractSender.getEventQueueSize());
       Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
         assertEquals("Expected events in all secondary queues are drained but actual is "
-            + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
-            + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
+            + abstractSender.getSecondaryEventQueueSize() + ". Queue content is: "
+            + displayQueueContent(queue), 0, abstractSender.getSecondaryEventQueueSize());
       });
       assertEquals("Except events in all secondary queues after drain is 0", 0,
-          abstractSender.getEventSecondaryQueueSize());
+          abstractSender.getSecondaryEventQueueSize());
     } finally {
       exp.remove();
       exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 41cd89a..1542b0d 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -75,7 +75,6 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
 
     keyValues.putAll(updateKeyValues);
     validateReceiverRegionSize(keyValues);
-
   }
 
   /**
@@ -138,11 +137,12 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
     assertTrue("No events conflated in batch",
         (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
 
-    verifyEventSecondaryQueuesDrained("ln");
+    verifySecondaryEventQueuesDrained("ln");
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
+    validateEventsProcessedByPQRM(100, 1);
   }
 
-  private void verifyEventSecondaryQueuesDrained(final String senderId) {
+  private void verifySecondaryEventQueuesDrained(final String senderId) {
     Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
       int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"));
       int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"));
@@ -185,14 +185,18 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
     vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
                                                                                        // aren't
                                                                                        // conflated
-    validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
+    validateSecondaryEventQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
 
     vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
 
     int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size();
+
+    // ParallelQueueRemovalMessage will send for each event conflated at primary to secondary queues
+    int totalEventsProcessedByPQRM = expectedEventNumAfterConflation + updateKeyValues.size();
+
     vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation));
 
-    validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy);
+    validateSecondaryEventQueueSize(expectedEventNumAfterConflation, redundancy);
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
 
@@ -203,21 +207,40 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
 
     // after dispatch, both primary and secondary queues are empty
     vm4.invoke(() -> checkQueueSize("ln", 0));
-    verifyEventSecondaryQueuesDrained("ln");
-    validateEventSecondaryQueueSize(0, redundancy);
+    verifySecondaryEventQueuesDrained("ln");
+    validateSecondaryEventQueueSize(0, redundancy);
+    validateEventsProcessedByPQRM(totalEventsProcessedByPQRM, redundancy);
   }
 
-  private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) {
-    ArrayList<Integer> v4List =
+  private void validateSecondaryEventQueueSize(int expectedNum, int redundancy) {
+    ArrayList<Integer> vm4List =
         (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> v5List =
+    ArrayList<Integer> vm5List =
         (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> v6List =
+    ArrayList<Integer> vm6List =
         (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    ArrayList<Integer> v7List =
+    ArrayList<Integer> vm7List =
         (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
-    assertTrue("Event in secondary queue should be 100",
-        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum
+    assertTrue(
+        "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is "
+            + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)),
+        (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum
+            * redundancy);
+  }
+
+  private void validateEventsProcessedByPQRM(int expectedNum, int redundancy) {
+    ArrayList<Integer> vm4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> vm5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> vm6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> vm7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    assertTrue(
+        "Event processed by queue removal message should be " + (expectedNum * redundancy)
+            + ", but is " + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)),
+        (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum
             * redundancy);
   }
 

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.