You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/11 02:18:23 UTC
[geode] branch develop updated: GEODE-4942: when sender is starting,
and not running yet,
put event at primary should be saved for QueueRemovalMessage (#1771)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new ab243c8 GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1771)
ab243c8 is described below
commit ab243c890f47f67a55cff344f3f41946512eee24
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Tue Apr 10 19:18:18 2018 -0700
GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1771)
---
.../asyncqueue/internal/AsyncEventQueueStats.java | 6 +-
.../geode/internal/cache/EntryEventImpl.java | 3 +
.../internal/cache/PartitionedRegionDataStore.java | 6 +-
.../internal/cache/wan/AbstractGatewaySender.java | 15 ++++-
.../wan/AbstractGatewaySenderEventProcessor.java | 50 +++++++++++++-
.../internal/cache/wan/GatewaySenderStats.java | 16 +++++
.../ConcurrentParallelGatewaySenderQueue.java | 9 +++
.../wan/parallel/ParallelGatewaySenderQueue.java | 20 +++++-
.../bean/stats/AsyncEventQueueStatsJUnitTest.java | 18 +++++
.../geode/internal/cache/wan/WANTestBase.java | 78 +++++++++++++++++++---
.../ParallelGatewaySenderOperationsDUnitTest.java | 38 +++++++++--
.../SerialGatewaySenderOperationsDUnitTest.java | 49 +++++++++++++-
12 files changed, 281 insertions(+), 27 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..2f3029a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -26,7 +26,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
public static final String typeName = "AsyncEventQueueStatistics";
/** The <code>StatisticsType</code> of the statistics */
- private static final StatisticsType type;
+ public static final StatisticsType type;
static {
@@ -87,6 +87,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE,
"Current number of entries in the conflation indexes map.", "events"),
f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
+ f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+ "Number of events not added to primary queue due to sender yet running.", "events"),
f.createIntCounter(EVENTS_FILTERED,
"Number of events filtered through GatewayEventFilter.", "events"),
f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed",
@@ -122,6 +124,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+ notQueuedEventsAtYetRunningPrimarySenderId =
+ type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
eventsFilteredId = type.nameToId(EVENTS_FILTERED);
eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index c91d236..664d054 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -2158,6 +2158,9 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
if (this.getInhibitDistribution()) {
buf.append(";inhibitDistribution");
}
+ if (this.tailKey != -1) {
+ buf.append(";tailKey=" + tailKey);
+ }
buf.append("]");
return buf.toString();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index ef8eb99..d468ef4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
return sizeOfLocalPrimaries;
}
- public int getSizeOfLocalBuckets(boolean includeSecondary) {
+ public int getSizeOfLocalBuckets() {
int sizeOfLocal = 0;
- Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions();
- for (BucketRegion br : primaryBuckets) {
+ Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions();
+ for (BucketRegion br : allLocalBuckets) {
sizeOfLocal += br.size();
}
return sizeOfLocal;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..034d810 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,7 +849,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
// If this gateway is not running, return
if (!isRunning()) {
if (isDebugEnabled) {
- logger.debug("Returning back without putting into the gateway sender queue");
+ logger.debug("Returning back without putting into the gateway sender queue:" + event);
+ }
+ if (this.eventProcessor != null) {
+ this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
}
return;
}
@@ -962,7 +965,10 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
// The sender may have stopped, after we have checked the status in the beginning.
if (!isRunning()) {
if (isDebugEnabled) {
- logger.debug("Returning back without putting into the gateway sender queue");
+ logger.debug("Returning back without putting into the gateway sender queue:" + event);
+ }
+ if (this.eventProcessor != null) {
+ this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
}
return;
}
@@ -1251,6 +1257,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return localProcessor == null ? 0 : localProcessor.eventQueueSize();
}
+ public int getEventSecondaryQueueSize() {
+ AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+ return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+ }
+
public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..eea7480 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,6 +33,7 @@ import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
@@ -49,6 +50,7 @@ import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -261,15 +263,57 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
// This should be local size instead of pr size
- if (this.queue instanceof ParallelGatewaySenderQueue) {
- return ((ParallelGatewaySenderQueue) queue).localSize();
- }
if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
return ((ConcurrentParallelGatewaySenderQueue) queue).localSize();
}
return this.queue.size();
}
+ public int eventSecondaryQueueSize() {
+ if (queue == null) {
+ return 0;
+ }
+
+ // if parallel, get both primary and secondary queues' size, then substract primary queue's size
+ if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+ int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
+ - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
+ return size;
+ }
+ return this.queue.size();
+ }
+
+ public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
+ if (queue == null) {
+ return;
+ }
+ if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+ ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
+ PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
+ if (prQ == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
+ + " is not created yet.");
+ }
+ return;
+ }
+ int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
+ long shadowKey = event.getTailKey();
+
+ ParallelGatewaySenderQueue pgsq =
+ (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
+ boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
+ if (isPrimary) {
+ pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
+ this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender();
+ if (logger.isDebugEnabled()) {
+ logger.debug("register dropped event for primary queue. BucketId is " + bucketId
+ + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
+ }
+ }
+ }
+ }
+
/**
* @return the sender
*/
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index c7fd370..2b93082 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -84,6 +84,8 @@ public class GatewaySenderStats {
protected static final String EVENTS_FILTERED = "eventsFiltered";
protected static final String NOT_QUEUED_EVENTS = "notQueuedEvent";
+ protected static final String NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER =
+ "notQueuedEventAtYetRunningPrimarySender";
protected static final String LOAD_BALANCES_COMPLETED = "loadBalancesCompleted";
protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress";
@@ -135,6 +137,8 @@ public class GatewaySenderStats {
protected static int eventsFilteredId;
/** Id of not queued events */
protected static int notQueuedEventsId;
+ /** Id of not queued events due to the primary sender is yet running */
+ protected static int notQueuedEventsAtYetRunningPrimarySenderId;
/** Id of events conflated in batch */
protected static int eventsConflatedFromBatchesId;
/** Id of load balances completed */
@@ -213,6 +217,8 @@ public class GatewaySenderStats {
f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE,
"Current number of entries in the conflation indexes map.", "events"),
f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
+ f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
+ "Number of events not added to primary queue due to sender yet running.", "events"),
f.createIntCounter(EVENTS_FILTERED,
"Number of events filtered through GatewayEventFilter.", "events"),
f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed",
@@ -249,6 +255,8 @@ public class GatewaySenderStats {
unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
+ notQueuedEventsAtYetRunningPrimarySenderId =
+ type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
eventsFilteredId = type.nameToId(EVENTS_FILTERED);
eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
@@ -599,6 +607,14 @@ public class GatewaySenderStats {
return this.stats.getInt(notQueuedEventsId);
}
+ public void incEventsNotQueuedAtYetRunningPrimarySender() {
+ this.stats.incInt(notQueuedEventsAtYetRunningPrimarySenderId, 1);
+ }
+
+ public int getEventsNotQueuedAtYetRunningPrimarySender() {
+ return this.stats.getInt(notQueuedEventsAtYetRunningPrimarySenderId);
+ }
+
public void incEventsFiltered() {
this.stats.incInt(eventsFilteredId, 1);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index 4fc940c..e556910 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -121,6 +121,11 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
return this.processors[0].getQueue().size();
}
+ public String displayContent() {
+ ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue());
+ return pgsq.displayContent();
+ }
+
public int localSize() {
return localSize(false);
}
@@ -190,6 +195,10 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
return processors[index];
}
+ public RegionQueue getQueueByBucket(int bucketId) {
+ return getPGSProcessor(bucketId).getQueue();
+ }
+
public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 3aa8534..89880fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// This method may need synchronization in case it is used by
// ConcurrentParallelGatewaySender
- protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
+ public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
StoppableReentrantLock lock = buckToDispatchLock;
if (lock != null) {
lock.lock();
@@ -1405,12 +1405,28 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return localSize(false);
}
+ public String displayContent() {
+ int size = 0;
+ StringBuffer sb = new StringBuffer();
+ for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ if (prQ != null && prQ.getDataStore() != null) {
+ Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions();
+ for (BucketRegion br : allLocalBuckets) {
+ if (br.size() > 0) {
+ sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";");
+ }
+ }
+ }
+ }
+ return sb.toString();
+ }
+
public int localSize(boolean includeSecondary) {
int size = 0;
for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
if (prQ != null && prQ.getDataStore() != null) {
if (includeSecondary) {
- size += prQ.getDataStore().getSizeOfLocalBuckets(true);
+ size += prQ.getDataStore().getSizeOfLocalBuckets();
} else {
size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
}
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index ea246bc..c4d0b7c 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.StatisticDescriptor;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueStats;
import org.apache.geode.management.internal.beans.AsyncEventQueueMBeanBridge;
import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -60,4 +61,21 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
return bridge.getEventQueueSize();
}
+ @Test
+ public void testStatDescriptors() {
+ StatisticDescriptor[] sds = asyncEventQueueStats.type.getStatistics();
+ int notQueueEvents = 0;
+ int notQueueToPrimary = 0;
+ for (StatisticDescriptor s : sds) {
+ if (s.getName().equals("notQueuedEvent")) {
+ notQueueEvents++;
+ }
+ if (s.getName().equals("notQueuedEventAtYetRunningPrimarySender")) {
+ notQueueToPrimary++;
+ }
+ }
+ assertEquals(1, notQueueEvents);
+ assertEquals(1, notQueueToPrimary);
+ }
+
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 226595b..3799083 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -933,6 +933,8 @@ public class WANTestBase extends DistributedTestCase {
}
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+ String logLevel = System.getProperty(LOG_LEVEL, "info");
+ props.setProperty(LOG_LEVEL, logLevel);
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
}
@@ -1155,6 +1157,21 @@ public class WANTestBase extends DistributedTestCase {
return stats;
}
+ public static List<Integer> getSenderStatsForDroppedEvents(String senderId) {
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+ GatewaySenderStats statistics = sender.getStatistics();
+ ArrayList<Integer> stats = new ArrayList<Integer>();
+ int eventNotQueued = statistics.getEventsNotQueuedAtYetRunningPrimarySender();
+ if (eventNotQueued > 0) {
+ logger.info(
+ "Found " + eventNotQueued + " not queued events due to primary sender is yet running");
+ }
+ stats.add(eventNotQueued);
+ stats.add(statistics.getEventsNotQueued());
+ stats.add(statistics.getEventsNotQueuedConflated());
+ return stats;
+ }
+
public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
final int eventsQueued, final int eventsDistributed) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -2746,11 +2763,21 @@ public class WANTestBase extends DistributedTestCase {
public static void validateQueueSizeStat(String id, final int queueSize) {
final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ Awaitility.await().atMost(60, TimeUnit.SECONDS)
.until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
assertEquals(queueSize, sender.getEventQueueSize());
}
+ public static void validateSecondaryQueueSizeStat(String id, final int queueSize) {
+ final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
+ Awaitility.await().atMost(120, TimeUnit.SECONDS)
+ .until(() -> assertEquals(
+ "Expected unprocessedEventMap is drained but actual is "
+ + sender.getStatistics().getUnprocessedEventMapSize(),
+ queueSize, sender.getStatistics().getUnprocessedEventMapSize()));
+ assertEquals(queueSize, sender.getStatistics().getUnprocessedEventMapSize());
+ }
+
/**
* This method is specifically written for pause and stop operations. This method validates that
* the region size remains same for at least minimum number of verification attempts and also it
@@ -3053,6 +3080,31 @@ public class WANTestBase extends DistributedTestCase {
});
}
+ public static Integer getSecondaryQueueContentSize(final String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+ int size = abstractSender.getEventSecondaryQueueSize();
+ return size;
+ }
+
+ public static String displayQueueContent(final RegionQueue queue) {
+ if (queue instanceof ParallelGatewaySenderQueue) {
+ ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue;
+ return pgsq.displayContent();
+ } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
+ ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue;
+ return pgsq.displayContent();
+ }
+ return null;
+ }
+
public static Integer getQueueContentSize(final String senderId) {
return getQueueContentSize(senderId, false);
}
@@ -3135,14 +3187,22 @@ public class WANTestBase extends DistributedTestCase {
((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
.getAllLocalPrimaryBucketRegions();
- for (final BucketRegion bucket : buckets) {
- Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
- assertEquals("Expected bucket entries for bucket: " + bucket.getId()
- + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: "
- + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0,
- bucket.keySet().size());
- });
- } // for loop ends
+ final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+ RegionQueue queue = abstractSender.getEventProcessor().queue;
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+ assertEquals("Expected events in all primary queues are drained but actual is "
+ + abstractSender.getEventQueueSize() + ". Queue content is: "
+ + displayQueueContent(queue), 0, abstractSender.getEventQueueSize());
+ });
+ assertEquals("Expected events in all primary queues after drain is 0", 0,
+ abstractSender.getEventQueueSize());
+ Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+ assertEquals("Expected events in all secondary queues are drained but actual is "
+ + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
+ + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
+ });
+ assertEquals("Except events in all secondary queues after drain is 0", 0,
+ abstractSender.getEventSecondaryQueueSize());
} finally {
exp.remove();
exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index eaef4f9..f5b98b7 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -20,6 +20,8 @@ import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_S
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.ArrayList;
+
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -407,18 +409,42 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200));
// SECOND RUN: start async puts on region
- AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
-
- // when puts are happening by another thread, start the senders
- startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
- async.join();
+ ArrayList<Integer> vm4List = null;
+ ArrayList<Integer> vm5List = null;
+ ArrayList<Integer> vm6List = null;
+ ArrayList<Integer> vm7List = null;
+ boolean foundDroppedAtYetStartedPrimarySender = false;
+ int count = 0;
+
+ do {
+ stopSenders();
+ AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
+
+ // when puts are happening by another thread, start the senders
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ async.join();
+ vm4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ vm5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ vm6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ vm7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
+ if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) {
+ foundDroppedAtYetStartedPrimarySender = true;
+ }
+ count++;
+ } while (foundDroppedAtYetStartedPrimarySender == false && count < 5);
+ assertThat(foundDroppedAtYetStartedPrimarySender);
// verify all the buckets on all the sender nodes are drained
validateParallelSenderQueueAllBucketsDrained();
// verify that the queue size ultimately becomes zero. That means all the events propagate to
// remote site.
+
vm4.invoke(() -> validateQueueContents("ln", 0));
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index ee43b83..8df5650 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -269,6 +269,53 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
}
@Test
+ public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createSenderCaches(lnPort);
+
+ createSenderVM4();
+ createSenderVM5();
+
+ createReceiverRegions();
+
+ createSenderRegions();
+
+ vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
+
+ vm4.invoke(() -> WANTestBase.stopSender("ln"));
+ vm5.invoke(() -> WANTestBase.stopSender("ln"));
+
+ vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
+ vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+ vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+
+ // do a lot of puts while senders are restarting
+ AsyncInvocation async = vm7.invokeAsync(() -> doPuts(getTestMethodName() + "_RR", 5000));
+
+ startSenderInVMsAsync("ln", vm4, vm5);
+ async.join();
+
+ vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+ vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
+ vm4.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
+ vm5.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
+ }
+
+ @Test
public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -298,7 +345,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
- // Do some puts while restarting a sender
+ // Do some puts from both vm4 and vm5 while restarting a sender
AsyncInvocation asyncPuts =
vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 300));
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.