You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/03/05 07:50:30 UTC
[geode] 01/01: GEODE-4647: add a stats eventSecondaryQueueSizeId to
track events in secondary gateway sender queue
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-4647
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 140c83e006ba8b604a2c2df0f6e7502260e03ab9
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Feb 21 21:31:07 2018 -0800
GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary
gateway sender queue
---
.../asyncqueue/internal/AsyncEventQueueStats.java | 3 +
.../internal/cache/AbstractBucketRegionQueue.java | 8 +
.../geode/internal/cache/AbstractRegionMap.java | 4 +
.../apache/geode/internal/cache/BucketAdvisor.java | 4 +
.../geode/internal/cache/BucketRegionQueue.java | 2 +
.../internal/cache/wan/AbstractGatewaySender.java | 6 +
.../wan/AbstractGatewaySenderEventProcessor.java | 17 +++
.../internal/cache/wan/GatewaySenderStats.java | 57 +++++++
.../wan/parallel/ParallelQueueRemovalMessage.java | 3 +
.../SerialAsyncEventQueueImplJUnitTest.java | 3 +
.../cache/wan/AsyncEventQueueTestBase.java | 16 +-
.../asyncqueue/AsyncEventQueueStatsDUnitTest.java | 45 ++++--
.../ParallelQueueRemovalMessageJUnitTest.java | 20 +++
.../bean/stats/AsyncEventQueueStatsJUnitTest.java | 2 -
.../geode/internal/cache/wan/WANTestBase.java | 97 +++++++++++-
.../parallel/ParallelWANConflationDUnitTest.java | 56 +++++--
.../wan/parallel/ParallelWANStatsDUnitTest.java | 167 +++++++++++++++++++++
.../serial/SerialGatewaySenderQueueDUnitTest.java | 13 +-
.../wan/serial/SerialWANConflationDUnitTest.java | 73 ++++++++-
.../wan/serial/SerialWANPropagationDUnitTest.java | 1 +
20 files changed, 564 insertions(+), 33 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..faa5db4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -42,6 +42,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
"nanoseconds"),
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
+ f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the event secondary queue.",
+ "operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
"operations", false),
f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -106,6 +108,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+ eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index d6822da..267ec36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -230,6 +230,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
this.gatewaySenderStats.decQueueSize(size);
}
+ public void decSecondaryQueueSize(int size) {
+ this.gatewaySenderStats.decSecondaryQueueSize(size);
+ }
+
public void decQueueSize() {
this.gatewaySenderStats.decQueueSize();
}
@@ -238,6 +242,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
this.gatewaySenderStats.incQueueSize(size);
}
+ public void incSecondaryQueueSize(int size) {
+ this.gatewaySenderStats.incSecondaryQueueSize(size);
+ }
+
public void incQueueSize() {
this.gatewaySenderStats.incQueueSize();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 68d7134..6c47b71 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1042,6 +1042,10 @@ public abstract class AbstractRegionMap
} finally {
if (done && result) {
initialImagePutEntry(newRe);
+ if (owner instanceof BucketRegionQueue) {
+ BucketRegionQueue brq = (BucketRegionQueue) owner;
+ brq.addToEventQueue(key, done, event);
+ }
}
if (!done) {
removeEntry(key, newRe, false);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index fc14773..8fd0e87 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -282,8 +282,11 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
if (this.getBucket() instanceof BucketRegionQueue) {
BucketRegionQueue brq = (BucketRegionQueue) this.getBucket();
brq.decQueueSize(brq.size());
+ brq.incSecondaryQueueSize(brq.size());
}
sendProfileUpdate();
+ } else {
+ BucketRegionQueue brq = (BucketRegionQueue) this.getBucket();
}
}
} else {
@@ -1192,6 +1195,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
BucketRegionQueue brq = (BucketRegionQueue) br;
brq.incQueueSize(brq.size());
+ brq.decSecondaryQueueSize(brq.size());
}
if (br != null && br instanceof BucketRegion) {
((BucketRegion) br).afterAcquiringPrimaryState();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 4e5451e..0baa204 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -449,6 +449,8 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
if (this.getBucketAdvisor().isPrimary()) {
incQueueSize(1);
+ } else {
+ incSecondaryQueueSize(1);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..268bbb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1095,6 +1095,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
}
statistics.setQueueSize(0);
+ statistics.setSecondaryQueueSize(0);
statistics.setTempQueueSize(0);
}
@@ -1251,6 +1252,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return localProcessor == null ? 0 : localProcessor.eventQueueSize();
}
+ public int getEventSecondaryQueueSize() {
+ AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+ return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+ }
+
public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..1a12abf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -270,6 +270,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
return this.queue.size();
}
+ public int eventSecondaryQueueSize() {
+ if (queue == null) {
+ return 0;
+ }
+
+ // if parallel, get both primary and secondary queues' size, then substract primary queue's size
+ if (this.queue instanceof ParallelGatewaySenderQueue) {
+ return ((ParallelGatewaySenderQueue) queue).localSize(true)
+ - ((ParallelGatewaySenderQueue) queue).localSize(false);
+ }
+ if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+ return ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
+ - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
+ }
+ return this.queue.size();
+ }
+
/**
* @return the sender
*/
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index c7fd370..32129be 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -46,6 +46,8 @@ public class GatewaySenderStats {
protected static final String EVENT_QUEUE_TIME = "eventQueueTime";
/** Name of the event queue size statistic */
protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
+ /** Name of the event secondary queue size statistic */
+ protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
/** Name of the event temporary queue size statistic */
protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
/** Name of the events distributed statistic */
@@ -102,6 +104,8 @@ public class GatewaySenderStats {
protected static int eventQueueTimeId;
/** Id of the event queue size statistic */
protected static int eventQueueSizeId;
+ /** Id of the event in secondary queue size statistic */
+ protected static int eventSecondaryQueueSizeId;
/** Id of the temp event queue size statistic */
protected static int eventTmpQueueSizeId;
/** Id of the events distributed statistic */
@@ -164,6 +168,8 @@ public class GatewaySenderStats {
f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
"nanoseconds"),
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
+ f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the event in secondary queue.",
+ "operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
false),
f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -232,6 +238,7 @@ public class GatewaySenderStats {
eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+ eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -350,6 +357,15 @@ public class GatewaySenderStats {
}
/**
+ * Returns the current value of the "eventSecondaryQueueSize" stat.
+ *
+ * @return the current value of the "eventSecondaryQueueSize" stat
+ */
+ public int getEventSecondaryQueueSize() {
+ return this.stats.getInt(eventSecondaryQueueSizeId);
+ }
+
+ /**
* Returns the current value of the "tempQueueSize" stat.
*
* @return the current value of the "tempQueueSize" stat.
@@ -454,6 +470,15 @@ public class GatewaySenderStats {
}
/**
+ * Sets the "eventSecondaryQueueSize" stat.
+ *
+ * @param size The size of the secondary queue
+ */
+ public void setSecondaryQueueSize(int size) {
+ this.stats.setInt(eventSecondaryQueueSizeId, size);
+ }
+
+ /**
* Sets the "tempQueueSize" stat.
*
* @param size The size of the temp queue
@@ -471,6 +496,13 @@ public class GatewaySenderStats {
}
/**
+ * Increments the "eventSecondaryQueueSize" stat by 1.
+ */
+ public void incSecondaryQueueSize() {
+ this.stats.incInt(eventSecondaryQueueSizeId, 1);
+ }
+
+ /**
* Increments the "tempQueueSize" stat by 1.
*/
public void incTempQueueSize() {
@@ -487,6 +519,15 @@ public class GatewaySenderStats {
}
/**
+ * Increments the "eventSecondaryQueueSize" stat by given delta.
+ *
+ * @param delta an integer by which secondary queue size to be increased
+ */
+ public void incSecondaryQueueSize(int delta) {
+ this.stats.incInt(eventSecondaryQueueSizeId, delta);
+ }
+
+ /**
* Increments the "tempQueueSize" stat by given delta.
*
* @param delta an integer by which temp queue size to be increased
@@ -503,6 +544,13 @@ public class GatewaySenderStats {
}
/**
+ * Decrements the "eventSecondaryQueueSize" stat by 1.
+ */
+ public void decSecondaryQueueSize() {
+ this.stats.incInt(eventSecondaryQueueSizeId, -1);
+ }
+
+ /**
* Decrements the "tempQueueSize" stat by 1.
*/
public void decTempQueueSize() {
@@ -519,6 +567,15 @@ public class GatewaySenderStats {
}
/**
+ * Decrements the "eventSecondaryQueueSize" stat by given delta.
+ *
+ * @param delta an integer by which secondary queue size to be increased
+ */
+ public void decSecondaryQueueSize(int delta) {
+ this.stats.incInt(eventSecondaryQueueSizeId, -delta);
+ }
+
+ /**
* Decrements the "tempQueueSize" stat by given delta.
*
* @param delta an integer by which temp queue size to be increased
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index 39fedbf..df89e36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -183,6 +183,9 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
brq.destroyKey(key);
+ if (!brq.getBucketAdvisor().isPrimary()) {
+ prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+ }
if (isDebugEnabled) {
logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
brq.getId());
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index cdef8bd..aea1fb4 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -50,14 +50,17 @@ public class SerialAsyncEventQueueImplJUnitTest {
attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, attrs);
queue.getStatistics().incQueueSize(5);
+ queue.getStatistics().incSecondaryQueueSize(6);
queue.getStatistics().incTempQueueSize(10);
assertEquals(5, queue.getStatistics().getEventQueueSize());
+ assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
assertEquals(10, queue.getStatistics().getTempEventQueueSize());
queue.stop();
assertEquals(0, queue.getStatistics().getEventQueueSize());
+ assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
assertEquals(0, queue.getStatistics().getTempEventQueueSize());
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index de03f2b..0e81e49 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -44,8 +44,10 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.awaitility.Awaitility;
import org.junit.experimental.categories.Category;
import org.apache.geode.DataSerializable;
@@ -737,17 +739,29 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
}
public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
- final int eventsReceived, final int eventsQueued, final int eventsDistributed) {
+ int secondaryQueueSize, final int eventsReceived, final int eventsQueued,
+ final int eventsDistributed) {
Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
AsyncEventQueue queue = null;
+ boolean isParallel = false;
for (AsyncEventQueue q : asyncQueues) {
+ isParallel = q.isParallel();
if (q.getId().equals(queueId)) {
queue = q;
break;
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
+ Awaitility.await().atMost(30000, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+ .until(() -> assertEquals("Expected queue size: " + queueSize + " but actual queue size: "
+ + statistics.getEventQueueSize(), queueSize, statistics.getEventQueueSize()));
assertEquals(queueSize, statistics.getEventQueueSize());
+ if (isParallel) {
+ assertEquals(secondaryQueueSize, statistics.getEventSecondaryQueueSize());
+ } else {
+ // for serial queue, evenvSecondaryQueueSize is not used
+ assertEquals(0, statistics.getEventSecondaryQueueSize());
+ }
assertEquals(eventsReceived, statistics.getEventsReceived());
assertEquals(eventsQueued, statistics.getEventsQueued());
assert (statistics.getEventsDistributed() >= eventsDistributed);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
index b131b46..17288e0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
@@ -75,9 +75,10 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
// sender
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 1000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 1000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 10));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 0));
}
@@ -120,19 +121,43 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue(
getTestMethodName() + "_RR", "ln1,ln2", isOffHeap()));
+ vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+
vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 1000, 0, 1000, 1000, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 1000, 0, 1000, 0, 0));
+
+ vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln1", 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln2", 1000));
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 1000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 1000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 10));
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 1000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 1000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 10));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 0));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 0));
}
@@ -230,11 +255,12 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1500));
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 1500, 1500));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 1500, 1500));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 0));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 1500));
}
@@ -302,7 +328,8 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000));
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 2000, 2000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 2000, 2000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueConflatedStats("ln", 500));
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 5e0f704..44afc0b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -65,6 +65,8 @@ import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.test.fake.Fakes;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -177,12 +179,21 @@ public class ParallelQueueRemovalMessageJUnitTest {
new BucketRegionQueueHelper(this.cache, this.queueRegion, this.bucketRegionQueue);
}
+ private GatewaySenderStats setSecondaryQueueSize(int size) {
+ GatewaySenderStats stats = new GatewaySenderStats(new DummyStatisticsFactory(), "ln");
+ stats.setSecondaryQueueSize(1);
+ when(this.queueRegion.getParallelGatewaySender().getStatistics()).thenReturn(stats);
+ assertEquals(1, stats.getEventSecondaryQueueSize());
+ return stats;
+ }
+
@Test
public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue()
throws Exception {
// Validate initial BucketRegionQueue state
assertFalse(this.bucketRegionQueue.isInitialized());
assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+ GatewaySenderStats stats = setSecondaryQueueSize(1);
// Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to
// add a key)
@@ -190,6 +201,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+ assertEquals(1, stats.getEventSecondaryQueueSize());
}
@Test
@@ -201,6 +213,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Add an event to the BucketRegionQueue and verify BucketRegionQueue state
this.bucketRegionQueueHelper.addEvent(KEY);
assertEquals(1, this.bucketRegionQueue.size());
+ GatewaySenderStats stats = setSecondaryQueueSize(1);
// Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
// DESTROYED)
@@ -210,6 +223,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Clean up destroyed tokens and validate BucketRegionQueue
this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
assertEquals(0, this.bucketRegionQueue.size());
+ assertEquals(0, stats.getEventSecondaryQueueSize());
}
@Test
@@ -225,6 +239,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
BlockingQueue<GatewaySenderEventImpl> tempQueue =
createTempQueueAndAddEvent(processor, mock(GatewaySenderEventImpl.class));
assertEquals(1, tempQueue.size());
+ GatewaySenderStats stats = setSecondaryQueueSize(1);
// Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to
// add a key)
@@ -232,6 +247,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate temp queue is empty after processing ParallelQueueRemovalMessage
assertEquals(0, tempQueue.size());
+ assertEquals(1, stats.getEventSecondaryQueueSize());
}
@Test
@@ -251,6 +267,9 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Add a mock GatewaySenderEventImpl to the temp queue
BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event);
assertEquals(1, tempQueue.size());
+ GatewaySenderStats stats = mock(GatewaySenderStats.class);
+ stats.setSecondaryQueueSize(1);
+ when(this.queueRegion.getParallelGatewaySender().getStatistics()).thenReturn(stats);
// Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
// DESTROYED)
@@ -259,6 +278,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate temp queue is empty after processing ParallelQueueRemovalMessage
assertEquals(0, tempQueue.size());
+ assertEquals(0, stats.getEventSecondaryQueueSize());
// Clean up destroyed tokens
this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index ea246bc..e7d38b4 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -52,8 +52,6 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
sample();
assertEquals(0, getEventQueueSize());
-
-
}
private int getEventQueueSize() {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index c29d66e..a792815 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -120,9 +120,11 @@ import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.CacheConfig;
import org.apache.geode.internal.cache.CacheServerImpl;
@@ -139,6 +141,8 @@ import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
@@ -1175,12 +1179,57 @@ public class WANTestBase extends JUnit4DistributedTestCase {
return connectionInfo;
}
+ public static void moveAllPrimaryBuckets(String senderId, final DistributedMember destination,
+ final String regionName) {
+
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+ final RegionQueue regionQueue;
+ regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+ if (sender.isParallel()) {
+ ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+ (ConcurrentParallelGatewaySenderQueue) regionQueue;
+ PartitionedRegion prQ =
+ parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0];
+
+ Set<Integer> primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds();
+ for (int bid : primaryBucketIds) {
+ movePrimary(destination, regionName, bid);
+ }
+
+ // double check after moved all primary buckets
+ primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds();
+ assertTrue(primaryBucketIds.isEmpty());
+ }
+ }
+
+ public static void movePrimary(final DistributedMember destination, final String regionName,
+ final int bucketId) {
+ PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
+
+ BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage
+ .send((InternalDistributedMember) destination, region, bucketId, true);
+ assertNotNull(response);
+ assertTrue(response.waitForResponse());
+ }
+
+ public static int getSecondaryQueueSizeInStats(String senderId) {
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+ GatewaySenderStats statistics = sender.getStatistics();
+ return statistics.getEventSecondaryQueueSize();
+ }
+
public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
GatewaySenderStats statistics = sender.getStatistics();
if (expectedQueueSize != -1) {
final RegionQueue regionQueue;
regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+ if (sender.isParallel()) {
+ ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+ (ConcurrentParallelGatewaySenderQueue) regionQueue;
+ PartitionedRegion pr =
+ parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0];
+ }
Awaitility.await().atMost(120, TimeUnit.SECONDS)
.until(() -> assertEquals("Expected queue entries: " + expectedQueueSize
+ " but actual entries: " + regionQueue.size(), expectedQueueSize,
@@ -1197,9 +1246,28 @@ public class WANTestBase extends JUnit4DistributedTestCase {
stats.add(statistics.getEventsNotQueuedConflated());
stats.add(statistics.getEventsConflatedFromBatches());
stats.add(statistics.getConflationIndexesMapSize());
+ stats.add(statistics.getEventSecondaryQueueSize());
return stats;
}
+ protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) {
+ int size = 0;
+ if (prQ != null) {
+ Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets();
+ List<Integer> thisProcessorBuckets = new ArrayList<Integer>();
+
+ for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) {
+ BucketRegion bucket = bucketEntry.getValue();
+ int bId = bucket.getId();
+ if ((isPrimary && bucket.getBucketAdvisor().isPrimary())
+ || (!isPrimary && !bucket.getBucketAdvisor().isPrimary())) {
+ size += bucket.size();
+ }
+ }
+ }
+ return size;
+ }
+
public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
final int eventsQueued, final int eventsDistributed) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -3102,6 +3170,19 @@ public class WANTestBase extends JUnit4DistributedTestCase {
return getQueueContentSize(senderId, false);
}
+ public static Integer getSecondaryQueueContentSize(final String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+ return abstractSender.getEventSecondaryQueueSize();
+ }
+
public static Integer getQueueContentSize(final String senderId, boolean includeSecondary) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
@@ -3113,9 +3194,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
if (!sender.isParallel()) {
- if (includeSecondary) {
- fail("Not implemented yet");
- }
+ // if sender is serial, the queues will be all primary or all secondary at one member
final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
int size = 0;
for (RegionQueue q : queues) {
@@ -3131,10 +3210,10 @@ public class WANTestBase extends JUnit4DistributedTestCase {
return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
} else {
if (includeSecondary) {
- fail("Not Implemented yet");
+ fail("Not implemented yet");
+ regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
+ return regionQueue.getRegion().size();
}
- regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
- return regionQueue.getRegion().size();
}
}
fail("Not yet implemented?");
@@ -3180,6 +3259,8 @@ public class WANTestBase extends JUnit4DistributedTestCase {
((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
.getAllLocalPrimaryBucketRegions();
+ final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+
for (final BucketRegion bucket : buckets) {
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
assertEquals("Expected bucket entries for bucket: " + bucket.getId()
@@ -3188,6 +3269,10 @@ public class WANTestBase extends JUnit4DistributedTestCase {
bucket.keySet().size());
});
} // for loop ends
+ assertEquals("Except events in all primary queues after drain is 0", 0,
+ abstractSender.getEventQueueSize());
+ assertEquals("Except events in all secondary queues after drain is 0", 0,
+ abstractSender.getEventSecondaryQueueSize());
} finally {
exp.remove();
exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index a792133..6cdfb74 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -66,6 +66,11 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> checkQueueSize("ln", (keyValues.size() + updateKeyValues.size())));
+ vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
+
+ // Since no conflation, all updates are in queue
+ vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 2 * updateKeyValues.size()));
+
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
resumeSenders();
@@ -91,7 +96,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm6.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true));
vm7.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true));
- createSenderPRs();
+ createSenderPRs(1);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -108,24 +113,35 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
}
+ // sender did not turn on conflation, so queue size will be 100 (otherwise it will be 20)
+ vm4.invoke(() -> checkQueueSize("ln", 100));
vm4.invoke(() -> enableConflation("ln"));
vm5.invoke(() -> enableConflation("ln"));
vm6.invoke(() -> enableConflation("ln"));
vm7.invoke(() -> enableConflation("ln"));
- resumeSenders();
-
ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100));
ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100));
ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100));
ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+ assertTrue("Event in secondary queue should be 100",
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100);
+
+ resumeSenders();
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
assertTrue("No events conflated in batch",
(v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+ assertEquals("Event in secondary queue should be 0 after dispatched", 0,
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)));
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
@@ -160,12 +176,14 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
// aren't
// conflated
+ validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
- vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
- // aren't
- // conflated
+ int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size();
+ vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation));
+
+ validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy);
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
@@ -173,6 +191,24 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
keyValues.putAll(updateKeyValues);
validateReceiverRegionSize(keyValues);
+
+ // after dispatch, both primary and secondary queues are empty
+ vm4.invoke(() -> checkQueueSize("ln", 0));
+ validateEventSecondaryQueueSize(0, redundancy);
+ }
+
+ private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) {
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ assertTrue("Event in secondary queue should be 100",
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum
+ * redundancy);
}
@Test
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 3b6e860..61e75f6 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
@@ -52,6 +53,172 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
}
@Test
+ public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSendersWithConflation(lnPort);
+
+ createSenderPRs(1);
+
+ startPausedSenders();
+
+ createReceiverPR(vm2, 1);
+ putKeyValues();
+
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+ assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue
+ // size
+ assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
+ assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
+ // queued
+ assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+
+ // stop vm7 to trigger rebalance and move some primary buckets
+ System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
+ + ":" + v6List.get(10) + ":" + v7List.get(10));
+ vm7.invoke(() -> WANTestBase.closeCache());
+ Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+ int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+ int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+ int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+ assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + v6secondarySize); // secondary
+ // queue
+ // size
+ });
+ System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":"
+ + v6List.get(10));
+
+ vm7.invoke(() -> WANTestBase.createCache(lnPort));
+ vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 10, isOffHeap()));
+ startSenderInVMs("ln", vm7);
+ vm7.invoke(() -> pauseSender("ln"));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
+ + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+
+ vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ }
+
+ // TODO: add a test without redudency for primary switch
+ @Test
+ public void testQueueSizeInSecondaryWithPrimarySwitch() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSendersWithConflation(lnPort);
+
+ createSenderPRs(1);
+
+ startPausedSenders();
+
+ createReceiverPR(vm2, 1);
+
+ putKeyValues();
+
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+ assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue
+ // size
+ assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
+ assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
+ // queued
+ assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ // int vm7secondarySizeBeforeMovePrimary = v7List.get(10);
+ // System.out.println("Current secondary queue
+ // sizes:"+v4List.get(10)+":"+v5List.get(10)+":"+v6List.get(10)+":"+v7List.get(10));
+ // System.out.println("Now move a primary bucket");
+ // final DistributedMember vm6member = vm6.invoke(() -> WANTestBase.getMember());
+ // vm7.invoke(() -> WANTestBase.moveAllPrimaryBuckets("ln", vm6member, testName));
+ //
+ // v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); //
+ // secondary
+ // // queue
+ // // size
+ // assertTrue(v7List.get(10) > vm7secondarySizeBeforeMovePrimary);
+
+ vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ }
+
+ @Test
public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index 504a2f8..71b8c6d 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
import static org.junit.Assert.*;
import java.io.File;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
@@ -46,6 +47,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
@@ -102,7 +104,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
- Wait.pause(5000);
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+ // secondary queue size stats in serial queue should be 0
+ assertEquals(0, v4List.get(10) + v5List.get(10));
+
HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
assertEquals(primarySenderUpdates, secondarySenderUpdates);
@@ -137,6 +145,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
// removing all the keys.
secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
+
+ vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
}
protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index 1e78539..4d9b5c9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -42,10 +42,10 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
- vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
- vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
- vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
vm4.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true));
vm5.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true));
@@ -90,6 +90,71 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
assertTrue("No events conflated in batch",
(v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+ }
+
+ @Test
+ public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ vm2.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap()));
+ vm3.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap()));
+
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+
+ vm4.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+ vm5.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+ vm6.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+ vm7.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> pauseSender("ln"));
+ vm5.invoke(() -> pauseSender("ln"));
+ vm6.invoke(() -> pauseSender("ln"));
+ vm7.invoke(() -> pauseSender("ln"));
+
+
+ final Map keyValues = new HashMap();
+
+ for (int i = 1; i <= 10; i++) {
+ for (int j = 1; j <= 10; j++) {
+ keyValues.put(j, i);
+ }
+ vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
+ }
+
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
+ assertTrue("After conflation during enqueue, there should be only 20 events",
+ v4List.get(0) == 20);
+
+ vm4.invoke(() -> resumeSender("ln"));
+ vm5.invoke(() -> resumeSender("ln"));
+ vm6.invoke(() -> resumeSender("ln"));
+ vm7.invoke(() -> resumeSender("ln"));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertTrue("No events in secondary queue stats since it's serial sender",
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0);
+ assertTrue("Total queued events should be 100",
+ (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100);
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
index db861e4..3819151 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
@@ -162,6 +162,7 @@ public class SerialWANPropagationDUnitTest extends WANTestBase {
IgnoredException.addIgnoredException(BatchException70.class.getName());
IgnoredException.addIgnoredException(ServerOperationException.class.getName());
IgnoredException.addIgnoredException(IOException.class.getName());
+ IgnoredException.addIgnoredException(java.net.SocketException.class.getName());
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 10000));
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.