You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/19 05:13:33 UTC

[geode] 01/01: GEODE-4624: add a stats to trace total event removed by PQRM

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 2805dfa31c0e17b02117a2d9f19104244ab691e0
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Apr 18 22:12:40 2018 -0700

    GEODE-4624: add a stats to trace total event removed by PQRM
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |  3 ++
 .../internal/cache/AbstractBucketRegionQueue.java  |  4 +++
 .../internal/cache/wan/AbstractGatewaySender.java  |  1 +
 .../internal/cache/wan/GatewaySenderStats.java     | 37 ++++++++++++++++++++--
 .../wan/parallel/ParallelQueueRemovalMessage.java  |  1 +
 .../SerialAsyncEventQueueImplJUnitTest.java        |  3 ++
 .../bean/stats/AsyncEventQueueStatsJUnitTest.java  |  5 +++
 7 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 8d68cee..c4d7baa 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -44,6 +44,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
             f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.",
                 "operations", false),
+            f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE, "Total number of events processed by queue removal message.",
+                    "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
                 "operations", false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -111,6 +113,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
     eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 2406b18..891df46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     this.gatewaySenderStats.incSecondaryQueueSize(size);
   }
 
+  public void incEventProcessedByQueueRemovalMessage(int size) {
+	  this.gatewaySenderStats.incEventProcessedByQueueRemovalMessage(size);
+  }
+
   public void incQueueSize() {
     this.gatewaySenderStats.incQueueSize();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 59547b2..1afee67 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
 
     statistics.setQueueSize(0);
     statistics.setSecondaryQueueSize(0);
+    statistics.setEventProcessedByQueueRemovalMessage(0);
     statistics.setTempQueueSize(0);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 15ff18e..a6ba185 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -48,6 +48,9 @@ public class GatewaySenderStats {
   protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
   /** Name of the event secondary queue size statistic */
   protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
+  /** Total number of events processed by queue removal message statistic */
+  protected static final String EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE =
+      "eventProcessedByQueueRemovalMessage";
   /** Name of the event temporary queue size statistic */
   protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
   /** Name of the events distributed statistic */
@@ -108,6 +111,8 @@ public class GatewaySenderStats {
   protected static int eventQueueSizeId;
   /** Id of the event in secondary queue size statistic */
   protected static int eventSecondaryQueueSizeId;
+  /** Id of the event processed by queue removal message statistic */
+  protected static int eventProcessedByQueueRemovalMessageId;
   /** Id of the temp event queue size statistic */
   protected static int eventTmpQueueSizeId;
   /** Id of the events distributed statistic */
@@ -174,6 +179,8 @@ public class GatewaySenderStats {
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
             f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.",
                 "operations", false),
+            f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE,
+                "Total number of events processed by queue removal message.", "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
                 false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -245,6 +252,7 @@ public class GatewaySenderStats {
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
     eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+    eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -374,6 +382,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Returns the current value of the "eventProcessedByQueueRemovalMessage" stat.
+   *
+   * @return the current value of the "eventProcessedByQueueRemovalMessage" stat
+   */
+  public int getEventProcessedByQueueRemovalMessage() {
+    return this.stats.getInt(eventProcessedByQueueRemovalMessageId);
+  }
+
+  /**
    * Returns the current value of the "tempQueueSize" stat.
    *
    * @return the current value of the "tempQueueSize" stat.
@@ -487,6 +504,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Sets the "eventProcessedByQueueRemovalMessage" stat.
+   *
+   * @param size The total number of the events processed by queue removal message
+   */
+  public void setEventProcessedByQueueRemovalMessage(int size) {
+    this.stats.setInt(eventProcessedByQueueRemovalMessageId, size);
+  }
+
+  /**
    * Sets the "tempQueueSize" stat.
    *
    * @param size The size of the temp queue
@@ -508,7 +534,6 @@ public class GatewaySenderStats {
    */
   public void incSecondaryQueueSize() {
     this.stats.incInt(eventSecondaryQueueSizeId, 1);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
   }
 
   /**
@@ -534,7 +559,15 @@ public class GatewaySenderStats {
    */
   public void incSecondaryQueueSize(int delta) {
     this.stats.incInt(eventSecondaryQueueSizeId, delta);
-    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
+   * Increments the "eventProcessedByQueueRemovalMessage" stat by given delta.
+   *
+   * @param delta an integer by which events are processed by queue removal message
+   */
+  public void incEventProcessedByQueueRemovalMessage(int delta) {
+    this.stats.incInt(eventProcessedByQueueRemovalMessageId, delta);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index df89e36..ca3d785 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -185,6 +185,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
       brq.destroyKey(key);
       if (!brq.getBucketAdvisor().isPrimary()) {
         prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+        prQ.getParallelGatewaySender().getStatistics().incEventProcessedByQueueRemovalMessage(1);
       }
       if (isDebugEnabled) {
         logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 4c5caa2..a1976c7 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest {
     queue.getStatistics().incQueueSize(5);
     queue.getStatistics().incSecondaryQueueSize(6);
     queue.getStatistics().incTempQueueSize(10);
+    queue.getStatistics().incEventProcessedByQueueRemovalMessage(3);
 
     assertEquals(5, queue.getStatistics().getEventQueueSize());
     assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(3, queue.getStatistics().getEventProcessedByQueueRemovalMessage());
 
     queue.stop();
 
     assertEquals(0, queue.getStatistics().getEventQueueSize());
     assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+    assertEquals(0, queue.getStatistics().getEventProcessedByQueueRemovalMessage());
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index 7c485be..1113d51 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
     StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics();
     int notQueueEvents = 0;
     int notQueueToPrimary = 0;
+    int eventProcessedByQueueRemovalMessage = 0;
     for (StatisticDescriptor s : sds) {
       if (s.getName().equals("notQueuedEvent")) {
         notQueueEvents++;
@@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
       if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) {
         notQueueToPrimary++;
       }
+      if (s.getName().equals("eventProcessedByQueueRemovalMessage")) {
+    	  eventProcessedByQueueRemovalMessage++;
+        }
     }
     assertEquals(1, notQueueEvents);
     assertEquals(1, notQueueToPrimary);
+    assertEquals(1, eventProcessedByQueueRemovalMessage);
   }
 
 }

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.