You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/19 05:13:33 UTC
[geode] 01/01: GEODE-4624: add a stats to trace total event removed
by PQRM
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-4624
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 2805dfa31c0e17b02117a2d9f19104244ab691e0
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Apr 18 22:12:40 2018 -0700
GEODE-4624: add a stats to trace total event removed by PQRM
---
.../asyncqueue/internal/AsyncEventQueueStats.java | 3 ++
.../internal/cache/AbstractBucketRegionQueue.java | 4 +++
.../internal/cache/wan/AbstractGatewaySender.java | 1 +
.../internal/cache/wan/GatewaySenderStats.java | 37 ++++++++++++++++++++--
.../wan/parallel/ParallelQueueRemovalMessage.java | 1 +
.../SerialAsyncEventQueueImplJUnitTest.java | 3 ++
.../bean/stats/AsyncEventQueueStatsJUnitTest.java | 5 +++
7 files changed, 52 insertions(+), 2 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 8d68cee..c4d7baa 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -44,6 +44,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.",
"operations", false),
+ f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE, "Total number of events processed by queue removal message.",
+ "operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
"operations", false),
f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -111,6 +113,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+ eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 2406b18..891df46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
this.gatewaySenderStats.incSecondaryQueueSize(size);
}
+ public void incEventProcessedByQueueRemovalMessage(int size) {
+ this.gatewaySenderStats.incEventProcessedByQueueRemovalMessage(size);
+ }
+
public void incQueueSize() {
this.gatewaySenderStats.incQueueSize();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 59547b2..1afee67 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
statistics.setQueueSize(0);
statistics.setSecondaryQueueSize(0);
+ statistics.setEventProcessedByQueueRemovalMessage(0);
statistics.setTempQueueSize(0);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 15ff18e..a6ba185 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -48,6 +48,9 @@ public class GatewaySenderStats {
protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
/** Name of the event secondary queue size statistic */
protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
+ /** Total number of events processed by queue removal message statistic */
+ protected static final String EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE =
+ "eventProcessedByQueueRemovalMessage";
/** Name of the event temporary queue size statistic */
protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
/** Name of the events distributed statistic */
@@ -108,6 +111,8 @@ public class GatewaySenderStats {
protected static int eventQueueSizeId;
/** Id of the event in secondary queue size statistic */
protected static int eventSecondaryQueueSizeId;
+ /** Id of the event processed by queue removal message statistic */
+ protected static int eventProcessedByQueueRemovalMessageId;
/** Id of the temp event queue size statistic */
protected static int eventTmpQueueSizeId;
/** Id of the events distributed statistic */
@@ -174,6 +179,8 @@ public class GatewaySenderStats {
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.",
"operations", false),
+ f.createIntGauge(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE,
+ "Total number of events processed by queue removal message.", "operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
false),
f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -245,6 +252,7 @@ public class GatewaySenderStats {
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+ eventProcessedByQueueRemovalMessageId = type.nameToId(EVENT_PROCESSED_BY_QUEUE_REMOVAL_MESSAGE);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -374,6 +382,15 @@ public class GatewaySenderStats {
}
/**
+ * Returns the current value of the "eventProcessedByQueueRemovalMessage" stat.
+ *
+ * @return the current value of the "eventProcessedByQueueRemovalMessage" stat
+ */
+ public int getEventProcessedByQueueRemovalMessage() {
+ return this.stats.getInt(eventProcessedByQueueRemovalMessageId);
+ }
+
+ /**
* Returns the current value of the "tempQueueSize" stat.
*
* @return the current value of the "tempQueueSize" stat.
@@ -487,6 +504,15 @@ public class GatewaySenderStats {
}
/**
+ * Sets the "eventProcessedByQueueRemovalMessage" stat.
+ *
+ * @param size The total number of the events processed by queue removal message
+ */
+ public void setEventProcessedByQueueRemovalMessage(int size) {
+ this.stats.setInt(eventProcessedByQueueRemovalMessageId, size);
+ }
+
+ /**
* Sets the "tempQueueSize" stat.
*
* @param size The size of the temp queue
@@ -508,7 +534,6 @@ public class GatewaySenderStats {
*/
public void incSecondaryQueueSize() {
this.stats.incInt(eventSecondaryQueueSizeId, 1);
- assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
}
/**
@@ -534,7 +559,15 @@ public class GatewaySenderStats {
*/
public void incSecondaryQueueSize(int delta) {
this.stats.incInt(eventSecondaryQueueSizeId, delta);
- assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+ }
+
+ /**
+ * Increments the "eventProcessedByQueueRemovalMessage" stat by given delta.
+ *
+ * @param delta an integer by which events are processed by queue removal message
+ */
+ public void incEventProcessedByQueueRemovalMessage(int delta) {
+ this.stats.incInt(eventProcessedByQueueRemovalMessageId, delta);
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index df89e36..ca3d785 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -185,6 +185,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
brq.destroyKey(key);
if (!brq.getBucketAdvisor().isPrimary()) {
prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+ prQ.getParallelGatewaySender().getStatistics().incEventProcessedByQueueRemovalMessage(1);
}
if (isDebugEnabled) {
logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 4c5caa2..a1976c7 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest {
queue.getStatistics().incQueueSize(5);
queue.getStatistics().incSecondaryQueueSize(6);
queue.getStatistics().incTempQueueSize(10);
+ queue.getStatistics().incEventProcessedByQueueRemovalMessage(3);
assertEquals(5, queue.getStatistics().getEventQueueSize());
assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+ assertEquals(3, queue.getStatistics().getEventProcessedByQueueRemovalMessage());
queue.stop();
assertEquals(0, queue.getStatistics().getEventQueueSize());
assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+ assertEquals(0, queue.getStatistics().getEventProcessedByQueueRemovalMessage());
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index 7c485be..1113d51 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics();
int notQueueEvents = 0;
int notQueueToPrimary = 0;
+ int eventProcessedByQueueRemovalMessage = 0;
for (StatisticDescriptor s : sds) {
if (s.getName().equals("notQueuedEvent")) {
notQueueEvents++;
@@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) {
notQueueToPrimary++;
}
+ if (s.getName().equals("eventProcessedByQueueRemovalMessage")) {
+ eventProcessedByQueueRemovalMessage++;
+ }
}
assertEquals(1, notQueueEvents);
assertEquals(1, notQueueToPrimary);
+ assertEquals(1, eventProcessedByQueueRemovalMessage);
}
}
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.