You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/24 16:03:29 UTC
[geode] branch develop updated: GEODE-4624: add a stats to trace
total event removed by PQRM (#1850)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a53b935 GEODE-4624: add a stats to trace total event removed by PQRM (#1850)
a53b935 is described below
commit a53b9350bbf72a5b3be27983c7b07d73daad6884
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Tue Apr 24 09:03:24 2018 -0700
GEODE-4624: add a stats to trace total event removed by PQRM (#1850)
---
.../asyncqueue/internal/AsyncEventQueueStats.java | 8 ++-
.../internal/cache/AbstractBucketRegionQueue.java | 4 ++
.../geode/internal/cache/BucketRegionQueue.java | 2 +-
.../internal/cache/wan/AbstractGatewaySender.java | 5 +-
.../wan/AbstractGatewaySenderEventProcessor.java | 2 +-
.../internal/cache/wan/GatewaySenderStats.java | 79 +++++++++++++++-------
.../wan/parallel/ParallelQueueRemovalMessage.java | 10 +++
.../SerialAsyncEventQueueImplJUnitTest.java | 7 +-
.../cache/wan/AsyncEventQueueTestBase.java | 6 +-
.../ParallelQueueRemovalMessageJUnitTest.java | 10 +--
.../bean/stats/AsyncEventQueueStatsJUnitTest.java | 5 ++
.../geode/internal/cache/wan/WANTestBase.java | 13 ++--
.../parallel/ParallelWANConflationDUnitTest.java | 51 ++++++++++----
13 files changed, 142 insertions(+), 60 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index 8d68cee..963b828 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -42,7 +42,10 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
"nanoseconds"),
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
- f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.",
+ f.createIntGauge(SECONDARY_EVENT_QUEUE_SIZE, "Size of the secondary event queue.",
+ "operations", false),
+ f.createIntGauge(EVENTS_PROCESSED_BY_PQRM,
+ "Total number of events processed by Parallel Queue Removal Message(PQRM).",
"operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
"operations", false),
@@ -110,7 +113,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
- eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+ secondaryEventQueueSizeId = type.nameToId(SECONDARY_EVENT_QUEUE_SIZE);
+ eventsProcessedByPQRMId = type.nameToId(EVENTS_PROCESSED_BY_PQRM);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index 2406b18..be0eb88 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -236,6 +236,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
this.gatewaySenderStats.incSecondaryQueueSize(size);
}
+ public void incEventsProcessedByPQRM(int size) {
+ this.gatewaySenderStats.incEventsProcessedByPQRM(size);
+ }
+
public void incQueueSize() {
this.gatewaySenderStats.incQueueSize();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index afc1544..26309a3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -292,7 +292,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
Long previousTailKey = (Long) latestIndexesForRegion.put(keyToConflate, tailKey);
if (previousTailKey != null) {
if (logger.isDebugEnabled()) {
- logger.debug("{}: Conflating {} at queue index={} and previousTailKey: ", this, object,
+ logger.debug("{}: Conflating {} at queue index={} and previousTailKey={} ", this, object,
tailKey, previousTailKey);
}
AbstractGatewaySenderEventProcessor ep =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 59547b2..1027582 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1099,6 +1099,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
statistics.setQueueSize(0);
statistics.setSecondaryQueueSize(0);
+ statistics.setEventsProcessedByPQRM(0);
statistics.setTempQueueSize(0);
}
@@ -1252,9 +1253,9 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return localProcessor == null ? 0 : localProcessor.eventQueueSize();
}
- public int getEventSecondaryQueueSize() {
+ public int getSecondaryEventQueueSize() {
AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
- return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+ return localProcessor == null ? 0 : localProcessor.secondaryEventQueueSize();
}
public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index baaabd0..cdf6125 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -265,7 +265,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
return this.queue.size();
}
- public int eventSecondaryQueueSize() {
+ public int secondaryEventQueueSize() {
if (queue == null) {
return 0;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index 15ff18e..a9b4c21 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -46,8 +46,10 @@ public class GatewaySenderStats {
protected static final String EVENT_QUEUE_TIME = "eventQueueTime";
/** Name of the event queue size statistic */
protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
- /** Name of the event secondary queue size statistic */
- protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
+ /** Name of the secondary event queue size statistic */
+ protected static final String SECONDARY_EVENT_QUEUE_SIZE = "secondaryEventQueueSize";
+ /** Total number of events processed by queue removal message statistic */
+ protected static final String EVENTS_PROCESSED_BY_PQRM = "eventsProcessedByPQRM";
/** Name of the event temporary queue size statistic */
protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
/** Name of the events distributed statistic */
@@ -107,7 +109,9 @@ public class GatewaySenderStats {
/** Id of the event queue size statistic */
protected static int eventQueueSizeId;
/** Id of the event in secondary queue size statistic */
- protected static int eventSecondaryQueueSizeId;
+ protected static int secondaryEventQueueSizeId;
+ /** Id of the events processed by Parallel Queue Removal Message(PQRM) statistic */
+ protected static int eventsProcessedByPQRMId;
/** Id of the temp event queue size statistic */
protected static int eventTmpQueueSizeId;
/** Id of the events distributed statistic */
@@ -172,7 +176,10 @@ public class GatewaySenderStats {
f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
"nanoseconds"),
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
- f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.",
+ f.createIntGauge(SECONDARY_EVENT_QUEUE_SIZE, "Size of secondary event queue.",
+ "operations", false),
+ f.createIntGauge(EVENTS_PROCESSED_BY_PQRM,
+ "Total number of events processed by Parallel Queue Removal Message(PQRM).",
"operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
false),
@@ -244,7 +251,8 @@ public class GatewaySenderStats {
eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
- eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
+ secondaryEventQueueSizeId = type.nameToId(SECONDARY_EVENT_QUEUE_SIZE);
+ eventsProcessedByPQRMId = type.nameToId(EVENTS_PROCESSED_BY_PQRM);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -365,12 +373,21 @@ public class GatewaySenderStats {
}
/**
- * Returns the current value of the "eventSecondaryQueueSize" stat.
+ * Returns the current value of the "secondaryEventQueueSize" stat.
*
- * @return the current value of the "eventSecondaryQueueSize" stat
+ * @return the current value of the "secondaryEventQueueSize" stat
*/
- public int getEventSecondaryQueueSize() {
- return this.stats.getInt(eventSecondaryQueueSizeId);
+ public int getSecondaryEventQueueSize() {
+ return this.stats.getInt(secondaryEventQueueSizeId);
+ }
+
+ /**
+ * Returns the current value of the "eventsProcessedByPQRM" stat.
+ *
+ * @return the current value of the "eventsProcessedByPQRM" stat
+ */
+ public int getEventsProcessedByPQRM() {
+ return this.stats.getInt(eventsProcessedByPQRMId);
}
/**
@@ -478,12 +495,21 @@ public class GatewaySenderStats {
}
/**
- * Sets the "eventSecondaryQueueSize" stat.
+ * Sets the "secondaryEventQueueSize" stat.
*
* @param size The size of the secondary queue
*/
public void setSecondaryQueueSize(int size) {
- this.stats.setInt(eventSecondaryQueueSizeId, size);
+ this.stats.setInt(secondaryEventQueueSizeId, size);
+ }
+
+ /**
+ * Sets the "eventsProcessedByPQRM" stat.
+ *
+ * @param size The total number of the events processed by queue removal message
+ */
+ public void setEventsProcessedByPQRM(int size) {
+ this.stats.setInt(eventsProcessedByPQRMId, size);
}
/**
@@ -504,11 +530,10 @@ public class GatewaySenderStats {
}
/**
- * Increments the "eventSecondaryQueueSize" stat by 1.
+ * Increments the "secondaryEventQueueSize" stat by 1.
*/
public void incSecondaryQueueSize() {
- this.stats.incInt(eventSecondaryQueueSizeId, 1);
- assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+ this.stats.incInt(secondaryEventQueueSizeId, 1);
}
/**
@@ -528,13 +553,21 @@ public class GatewaySenderStats {
}
/**
- * Increments the "eventSecondaryQueueSize" stat by given delta.
+ * Increments the "secondaryEventQueueSize" stat by given delta.
*
- * @param delta an integer by which secondary queue size to be increased
+ * @param delta an integer by which secondary event queue size to be increased
*/
public void incSecondaryQueueSize(int delta) {
- this.stats.incInt(eventSecondaryQueueSizeId, delta);
- assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+ this.stats.incInt(secondaryEventQueueSizeId, delta);
+ }
+
+ /**
+ * Increments the "eventsProcessedByPQRM" stat by given delta.
+ *
+ * @param delta an integer by which events are processed by queue removal message
+ */
+ public void incEventsProcessedByPQRM(int delta) {
+ this.stats.incInt(eventsProcessedByPQRMId, delta);
}
/**
@@ -554,11 +587,10 @@ public class GatewaySenderStats {
}
/**
- * Decrements the "eventSecondaryQueueSize" stat by 1.
+ * Decrements the "secondaryEventQueueSize" stat by 1.
*/
public void decSecondaryQueueSize() {
- this.stats.incInt(eventSecondaryQueueSizeId, -1);
- assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+ this.stats.incInt(secondaryEventQueueSizeId, -1);
}
/**
@@ -578,13 +610,12 @@ public class GatewaySenderStats {
}
/**
- * Decrements the "eventSecondaryQueueSize" stat by given delta.
+ * Decrements the "secondaryEventQueueSize" stat by given delta.
*
* @param delta an integer by which secondary queue size to be increased
*/
public void decSecondaryQueueSize(int delta) {
- this.stats.incInt(eventSecondaryQueueSizeId, -delta);
- assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+ this.stats.incInt(secondaryEventQueueSizeId, -delta);
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index df89e36..6d47266 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -70,6 +70,15 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
}
@Override
+ public String toString() {
+ String cname = getShortClassName();
+ final StringBuilder sb = new StringBuilder(cname);
+ sb.append("regionToDispatchedKeysMap=" + regionToDispatchedKeysMap);
+ sb.append(" sender=").append(getSender());
+ return sb.toString();
+ }
+
+ @Override
protected void process(ClusterDistributionManager dm) {
final boolean isDebugEnabled = logger.isDebugEnabled();
final InternalCache cache = dm.getCache();
@@ -185,6 +194,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
brq.destroyKey(key);
if (!brq.getBucketAdvisor().isPrimary()) {
prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+ prQ.getParallelGatewaySender().getStatistics().incEventsProcessedByPQRM(1);
}
if (isDebugEnabled) {
logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index 4c5caa2..666a0e8 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -52,16 +52,19 @@ public class SerialAsyncEventQueueImplJUnitTest {
queue.getStatistics().incQueueSize(5);
queue.getStatistics().incSecondaryQueueSize(6);
queue.getStatistics().incTempQueueSize(10);
+ queue.getStatistics().incEventsProcessedByPQRM(3);
assertEquals(5, queue.getStatistics().getEventQueueSize());
- assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
+ assertEquals(6, queue.getStatistics().getSecondaryEventQueueSize());
assertEquals(10, queue.getStatistics().getTempEventQueueSize());
+ assertEquals(3, queue.getStatistics().getEventsProcessedByPQRM());
queue.stop();
assertEquals(0, queue.getStatistics().getEventQueueSize());
- assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
+ assertEquals(0, queue.getStatistics().getSecondaryEventQueueSize());
assertEquals(0, queue.getStatistics().getTempEventQueueSize());
+ assertEquals(0, queue.getStatistics().getEventsProcessedByPQRM());
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 2074e9e..2ff8886 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -738,12 +738,12 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
assertEquals(
"Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is "
- + statistics.getEventSecondaryQueueSize(),
- secondaryQueueSize, statistics.getEventSecondaryQueueSize());
+ + statistics.getSecondaryEventQueueSize(),
+ secondaryQueueSize, statistics.getSecondaryEventQueueSize());
});
} else {
// for serial queue, evenvSecondaryQueueSize is not used
- assertEquals(0, statistics.getEventSecondaryQueueSize());
+ assertEquals(0, statistics.getSecondaryEventQueueSize());
}
assertEquals(queueSize, statistics.getEventQueueSize());
assertEquals(eventsReceived, statistics.getEventsReceived());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index d1ea59f..578c030 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -197,7 +197,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
// failed BatchRemovalMessage will not modify stats
- assertEquals(1, stats.getEventSecondaryQueueSize());
+ assertEquals(1, stats.getSecondaryEventQueueSize());
}
@Test
@@ -209,7 +209,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Add an event to the BucketRegionQueue and verify BucketRegionQueue state
this.bucketRegionQueueHelper.addEvent(KEY);
assertEquals(1, this.bucketRegionQueue.size());
- assertEquals(1, stats.getEventSecondaryQueueSize());
+ assertEquals(1, stats.getSecondaryEventQueueSize());
// Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
// DESTROYED)
@@ -219,7 +219,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Clean up destroyed tokens and validate BucketRegionQueue
this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
assertEquals(0, this.bucketRegionQueue.size());
- assertEquals(0, stats.getEventSecondaryQueueSize());
+ assertEquals(0, stats.getSecondaryEventQueueSize());
}
@Test
@@ -257,7 +257,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Add an event to the BucketRegionQueue and verify BucketRegionQueue state
GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY);
assertEquals(1, this.bucketRegionQueue.size());
- assertEquals(1, stats.getEventSecondaryQueueSize());
+ assertEquals(1, stats.getSecondaryEventQueueSize());
// Add a mock GatewaySenderEventImpl to the temp queue
BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event);
@@ -270,7 +270,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate temp queue is empty after processing ParallelQueueRemovalMessage
assertEquals(0, tempQueue.size());
- assertEquals(0, stats.getEventSecondaryQueueSize());
+ assertEquals(0, stats.getSecondaryEventQueueSize());
// Clean up destroyed tokens
this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index 7c485be..c475d4e 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -60,6 +60,7 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics();
int notQueueEvents = 0;
int notQueueToPrimary = 0;
+ int eventsProcessedByPQRM = 0;
for (StatisticDescriptor s : sds) {
if (s.getName().equals("notQueuedEvent")) {
notQueueEvents++;
@@ -67,9 +68,13 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) {
notQueueToPrimary++;
}
+ if (s.getName().equals("eventsProcessedByPQRM")) {
+ eventsProcessedByPQRM++;
+ }
}
assertEquals(1, notQueueEvents);
assertEquals(1, notQueueToPrimary);
+ assertEquals(1, eventsProcessedByPQRM);
}
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index f989405..f429919 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -1170,7 +1170,7 @@ public class WANTestBase extends DistributedTestCase {
public static int getSecondaryQueueSizeInStats(String senderId) {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
GatewaySenderStats statistics = sender.getStatistics();
- return statistics.getEventSecondaryQueueSize();
+ return statistics.getSecondaryEventQueueSize();
}
public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
@@ -1201,7 +1201,8 @@ public class WANTestBase extends DistributedTestCase {
stats.add(statistics.getEventsNotQueuedConflated());
stats.add(statistics.getEventsConflatedFromBatches());
stats.add(statistics.getConflationIndexesMapSize());
- stats.add(statistics.getEventSecondaryQueueSize());
+ stats.add(statistics.getSecondaryEventQueueSize());
+ stats.add(statistics.getEventsProcessedByPQRM());
return stats;
}
@@ -3154,7 +3155,7 @@ public class WANTestBase extends DistributedTestCase {
}
}
AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
- int size = abstractSender.getEventSecondaryQueueSize();
+ int size = abstractSender.getSecondaryEventQueueSize();
return size;
}
@@ -3256,11 +3257,11 @@ public class WANTestBase extends DistributedTestCase {
abstractSender.getEventQueueSize());
Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
assertEquals("Expected events in all secondary queues are drained but actual is "
- + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
- + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
+ + abstractSender.getSecondaryEventQueueSize() + ". Queue content is: "
+ + displayQueueContent(queue), 0, abstractSender.getSecondaryEventQueueSize());
});
assertEquals("Except events in all secondary queues after drain is 0", 0,
- abstractSender.getEventSecondaryQueueSize());
+ abstractSender.getSecondaryEventQueueSize());
} finally {
exp.remove();
exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 41cd89a..1542b0d 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -75,7 +75,6 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
keyValues.putAll(updateKeyValues);
validateReceiverRegionSize(keyValues);
-
}
/**
@@ -138,11 +137,12 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
assertTrue("No events conflated in batch",
(v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
- verifyEventSecondaryQueuesDrained("ln");
+ verifySecondaryEventQueuesDrained("ln");
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
+ validateEventsProcessedByPQRM(100, 1);
}
- private void verifyEventSecondaryQueuesDrained(final String senderId) {
+ private void verifySecondaryEventQueuesDrained(final String senderId) {
Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln"));
int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln"));
@@ -185,14 +185,18 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
// aren't
// conflated
- validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
+ validateSecondaryEventQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size();
+
+ // ParallelQueueRemovalMessage will send for each event conflated at primary to secondary queues
+ int totalEventsProcessedByPQRM = expectedEventNumAfterConflation + updateKeyValues.size();
+
vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation));
- validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy);
+ validateSecondaryEventQueueSize(expectedEventNumAfterConflation, redundancy);
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
@@ -203,21 +207,40 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
// after dispatch, both primary and secondary queues are empty
vm4.invoke(() -> checkQueueSize("ln", 0));
- verifyEventSecondaryQueuesDrained("ln");
- validateEventSecondaryQueueSize(0, redundancy);
+ verifySecondaryEventQueuesDrained("ln");
+ validateSecondaryEventQueueSize(0, redundancy);
+ validateEventsProcessedByPQRM(totalEventsProcessedByPQRM, redundancy);
}
- private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) {
- ArrayList<Integer> v4List =
+ private void validateSecondaryEventQueueSize(int expectedNum, int redundancy) {
+ ArrayList<Integer> vm4List =
(ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- ArrayList<Integer> v5List =
+ ArrayList<Integer> vm5List =
(ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- ArrayList<Integer> v6List =
+ ArrayList<Integer> vm6List =
(ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- ArrayList<Integer> v7List =
+ ArrayList<Integer> vm7List =
(ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
- assertTrue("Event in secondary queue should be 100",
- (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum
+ assertTrue(
+ "Event in secondary queue should be " + (expectedNum * redundancy) + ", but is "
+ + (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)),
+ (vm4List.get(10) + vm5List.get(10) + vm6List.get(10) + vm7List.get(10)) == expectedNum
+ * redundancy);
+ }
+
+ private void validateEventsProcessedByPQRM(int expectedNum, int redundancy) {
+ ArrayList<Integer> vm4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> vm5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> vm6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> vm7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ assertTrue(
+ "Event processed by queue removal message should be " + (expectedNum * redundancy)
+ + ", but is " + (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)),
+ (vm4List.get(11) + vm5List.get(11) + vm6List.get(11) + vm7List.get(11)) == expectedNum
* redundancy);
}
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.