You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/04/10 21:09:44 UTC
[geode] branch develop updated: Revert "GEODE-4942: when sender is
starting, and not running yet,
put event at primary should be saved for QueueRemovalMessage (#1740)"
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 26f976b Revert "GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)"
26f976b is described below
commit 26f976b12637b64f34d0c58833e094fce999956e
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Tue Apr 10 14:08:49 2018 -0700
Revert "GEODE-4942: when sender is starting, and not running yet, put event at primary should be saved for QueueRemovalMessage (#1740)"
This reverts commit e42ebec32518c014ab6bd3ffe6d1ee3cab813762.
Due to acceptance test failed after merge. Need to double check the merge.
---
.../asyncqueue/internal/AsyncEventQueueStats.java | 5 --
.../geode/internal/cache/EntryEventImpl.java | 3 -
.../internal/cache/PartitionedRegionDataStore.java | 6 +-
.../internal/cache/wan/AbstractGatewaySender.java | 15 +----
.../wan/AbstractGatewaySenderEventProcessor.java | 50 +-------------
.../internal/cache/wan/GatewaySenderStats.java | 16 -----
.../ConcurrentParallelGatewaySenderQueue.java | 9 ---
.../wan/parallel/ParallelGatewaySenderQueue.java | 20 +-----
.../geode/internal/cache/wan/WANTestBase.java | 78 +++-------------------
.../ParallelGatewaySenderOperationsDUnitTest.java | 38 ++---------
.../SerialGatewaySenderOperationsDUnitTest.java | 49 +-------------
11 files changed, 26 insertions(+), 263 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index b8259a3..dee2c92 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -47,9 +47,6 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
"Number of events received but not added to the event queue because the queue already contains an event with the event's key.",
"operations"),
- f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
- f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
- "Number of events not added to primary queue due to sender yet runing.", "events"),
f.createIntCounter(EVENTS_CONFLATED_FROM_BATCHES,
"Number of events conflated from batches.", "operations"),
f.createIntCounter(EVENTS_DISTRIBUTED,
@@ -125,8 +122,6 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
- notQueuedEventsAtYetRunningPrimarySenderId =
- type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
eventsFilteredId = type.nameToId(EVENTS_FILTERED);
eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 664d054..c91d236 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -2158,9 +2158,6 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
if (this.getInhibitDistribution()) {
buf.append(";inhibitDistribution");
}
- if (this.tailKey != -1) {
- buf.append(";tailKey=" + tailKey);
- }
buf.append("]");
return buf.toString();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index d468ef4..ef8eb99 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
return sizeOfLocalPrimaries;
}
- public int getSizeOfLocalBuckets() {
+ public int getSizeOfLocalBuckets(boolean includeSecondary) {
int sizeOfLocal = 0;
- Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions();
- for (BucketRegion br : allLocalBuckets) {
+ Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions();
+ for (BucketRegion br : primaryBuckets) {
sizeOfLocal += br.size();
}
return sizeOfLocal;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 034d810..a134e1e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -849,10 +849,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
// If this gateway is not running, return
if (!isRunning()) {
if (isDebugEnabled) {
- logger.debug("Returning back without putting into the gateway sender queue:" + event);
- }
- if (this.eventProcessor != null) {
- this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
+ logger.debug("Returning back without putting into the gateway sender queue");
}
return;
}
@@ -965,10 +962,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
// The sender may have stopped, after we have checked the status in the beginning.
if (!isRunning()) {
if (isDebugEnabled) {
- logger.debug("Returning back without putting into the gateway sender queue:" + event);
- }
- if (this.eventProcessor != null) {
- this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
+ logger.debug("Returning back without putting into the gateway sender queue");
}
return;
}
@@ -1257,11 +1251,6 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return localProcessor == null ? 0 : localProcessor.eventQueueSize();
}
- public int getEventSecondaryQueueSize() {
- AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
- return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
- }
-
public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index eea7480..9309e43 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -33,7 +33,6 @@ import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
@@ -50,7 +49,6 @@ import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -263,57 +261,15 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
// This should be local size instead of pr size
- if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
- return ((ConcurrentParallelGatewaySenderQueue) queue).localSize();
- }
- return this.queue.size();
- }
-
- public int eventSecondaryQueueSize() {
- if (queue == null) {
- return 0;
+ if (this.queue instanceof ParallelGatewaySenderQueue) {
+ return ((ParallelGatewaySenderQueue) queue).localSize();
}
-
- // if parallel, get both primary and secondary queues' size, then substract primary queue's size
if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
- int size = ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
- - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
- return size;
+ return ((ConcurrentParallelGatewaySenderQueue) queue).localSize();
}
return this.queue.size();
}
- public void registerEventDroppedInPrimaryQueue(EntryEventImpl event) {
- if (queue == null) {
- return;
- }
- if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
- ConcurrentParallelGatewaySenderQueue cpgsq = (ConcurrentParallelGatewaySenderQueue) queue;
- PartitionedRegion prQ = cpgsq.getRegion(event.getRegion().getFullPath());
- if (prQ == null) {
- if (logger.isDebugEnabled()) {
- logger.debug("shadow partitioned region " + event.getRegion().getFullPath()
- + " is not created yet.");
- }
- return;
- }
- int bucketId = PartitionedRegionHelper.getHashKey((EntryOperation) event);
- long shadowKey = event.getTailKey();
-
- ParallelGatewaySenderQueue pgsq =
- (ParallelGatewaySenderQueue) cpgsq.getQueueByBucket(bucketId);
- boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
- if (isPrimary) {
- pgsq.addRemovedEvent(prQ, bucketId, shadowKey);
- this.sender.getStatistics().incEventsNotQueuedAtYetRunningPrimarySender();
- if (logger.isDebugEnabled()) {
- logger.debug("register dropped event for primary queue. BucketId is " + bucketId
- + ", shadowKey is " + shadowKey + ", prQ is " + prQ.getFullPath());
- }
- }
- }
- }
-
/**
* @return the sender
*/
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index adaf928..c7fd370 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -84,8 +84,6 @@ public class GatewaySenderStats {
protected static final String EVENTS_FILTERED = "eventsFiltered";
protected static final String NOT_QUEUED_EVENTS = "notQueuedEvent";
- protected static final String NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER =
- "notQueuedEventAtYetRunningPrimarySender";
protected static final String LOAD_BALANCES_COMPLETED = "loadBalancesCompleted";
protected static final String LOAD_BALANCES_IN_PROGRESS = "loadBalancesInProgress";
@@ -137,8 +135,6 @@ public class GatewaySenderStats {
protected static int eventsFilteredId;
/** Id of not queued events */
protected static int notQueuedEventsId;
- /** Id of not queued events due to the primary sender is yet running */
- protected static int notQueuedEventsAtYetRunningPrimarySenderId;
/** Id of events conflated in batch */
protected static int eventsConflatedFromBatchesId;
/** Id of load balances completed */
@@ -217,8 +213,6 @@ public class GatewaySenderStats {
f.createIntGauge(CONFLATION_INDEXES_MAP_SIZE,
"Current number of entries in the conflation indexes map.", "events"),
f.createIntCounter(NOT_QUEUED_EVENTS, "Number of events not added to queue.", "events"),
- f.createIntCounter(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER,
- "Number of events not added to primary queue due to sender yet runing.", "events"),
f.createIntCounter(EVENTS_FILTERED,
"Number of events filtered through GatewayEventFilter.", "events"),
f.createIntCounter(LOAD_BALANCES_COMPLETED, "Number of load balances completed",
@@ -255,8 +249,6 @@ public class GatewaySenderStats {
unprocessedTokenMapSizeId = type.nameToId(UNPROCESSED_TOKEN_MAP_SIZE);
conflationIndexesMapSizeId = type.nameToId(CONFLATION_INDEXES_MAP_SIZE);
notQueuedEventsId = type.nameToId(NOT_QUEUED_EVENTS);
- notQueuedEventsAtYetRunningPrimarySenderId =
- type.nameToId(NOT_QUEUED_EVENTS_AT_YET_RUNNING_PRIMARY_SENDER);
eventsFilteredId = type.nameToId(EVENTS_FILTERED);
eventsConflatedFromBatchesId = type.nameToId(EVENTS_CONFLATED_FROM_BATCHES);
loadBalancesCompletedId = type.nameToId(LOAD_BALANCES_COMPLETED);
@@ -607,14 +599,6 @@ public class GatewaySenderStats {
return this.stats.getInt(notQueuedEventsId);
}
- public void incEventsNotQueuedAtYetRunningPrimarySender() {
- this.stats.incInt(notQueuedEventsAtYetRunningPrimarySenderId, 1);
- }
-
- public int getEventsNotQueuedAtYetRunningPrimarySender() {
- return this.stats.getInt(notQueuedEventsAtYetRunningPrimarySenderId);
- }
-
public void incEventsFiltered() {
this.stats.incInt(eventsFilteredId, 1);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index e556910..4fc940c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -121,11 +121,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
return this.processors[0].getQueue().size();
}
- public String displayContent() {
- ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) (processors[0].getQueue());
- return pgsq.displayContent();
- }
-
public int localSize() {
return localSize(false);
}
@@ -195,10 +190,6 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
return processors[index];
}
- public RegionQueue getQueueByBucket(int bucketId) {
- return getPGSProcessor(bucketId).getQueue();
- }
-
public BlockingQueue<GatewaySenderEventImpl> getBucketTmpQueue(int bucketId) {
return getPGSProcessor(bucketId).getBucketTmpQueue(bucketId);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 89880fc..3aa8534 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1112,7 +1112,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// This method may need synchronization in case it is used by
// ConcurrentParallelGatewaySender
- public void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
+ protected void addRemovedEvent(PartitionedRegion prQ, int bucketId, Object key) {
StoppableReentrantLock lock = buckToDispatchLock;
if (lock != null) {
lock.lock();
@@ -1405,28 +1405,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return localSize(false);
}
- public String displayContent() {
- int size = 0;
- StringBuffer sb = new StringBuffer();
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
- if (prQ != null && prQ.getDataStore() != null) {
- Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions();
- for (BucketRegion br : allLocalBuckets) {
- if (br.size() > 0) {
- sb.append("bucketId=" + br.getId() + ":" + br.keySet() + ";");
- }
- }
- }
- }
- return sb.toString();
- }
-
public int localSize(boolean includeSecondary) {
int size = 0;
for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
if (prQ != null && prQ.getDataStore() != null) {
if (includeSecondary) {
- size += prQ.getDataStore().getSizeOfLocalBuckets();
+ size += prQ.getDataStore().getSizeOfLocalBuckets(true);
} else {
size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 3799083..226595b 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -933,8 +933,6 @@ public class WANTestBase extends DistributedTestCase {
}
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "localhost[" + locPort + "]");
- String logLevel = System.getProperty(LOG_LEVEL, "info");
- props.setProperty(LOG_LEVEL, logLevel);
InternalDistributedSystem ds = test.getSystem(props);
cache = CacheFactory.create(ds);
}
@@ -1157,21 +1155,6 @@ public class WANTestBase extends DistributedTestCase {
return stats;
}
- public static List<Integer> getSenderStatsForDroppedEvents(String senderId) {
- AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
- GatewaySenderStats statistics = sender.getStatistics();
- ArrayList<Integer> stats = new ArrayList<Integer>();
- int eventNotQueued = statistics.getEventsNotQueuedAtYetRunningPrimarySender();
- if (eventNotQueued > 0) {
- logger.info(
- "Found " + eventNotQueued + " not queued events due to primary sender is yet running");
- }
- stats.add(eventNotQueued);
- stats.add(statistics.getEventsNotQueued());
- stats.add(statistics.getEventsNotQueuedConflated());
- return stats;
- }
-
public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
final int eventsQueued, final int eventsDistributed) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -2763,21 +2746,11 @@ public class WANTestBase extends DistributedTestCase {
public static void validateQueueSizeStat(String id, final int queueSize) {
final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
- Awaitility.await().atMost(60, TimeUnit.SECONDS)
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
assertEquals(queueSize, sender.getEventQueueSize());
}
- public static void validateSecondaryQueueSizeStat(String id, final int queueSize) {
- final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
- Awaitility.await().atMost(120, TimeUnit.SECONDS)
- .until(() -> assertEquals(
- "Expected unprocessedEventMap is drained but actual is "
- + sender.getStatistics().getUnprocessedEventMapSize(),
- queueSize, sender.getStatistics().getUnprocessedEventMapSize()));
- assertEquals(queueSize, sender.getStatistics().getUnprocessedEventMapSize());
- }
-
/**
* This method is specifically written for pause and stop operations. This method validates that
* the region size remains same for at least minimum number of verification attempts and also it
@@ -3080,31 +3053,6 @@ public class WANTestBase extends DistributedTestCase {
});
}
- public static Integer getSecondaryQueueContentSize(final String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
- int size = abstractSender.getEventSecondaryQueueSize();
- return size;
- }
-
- public static String displayQueueContent(final RegionQueue queue) {
- if (queue instanceof ParallelGatewaySenderQueue) {
- ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue;
- return pgsq.displayContent();
- } else if (queue instanceof ConcurrentParallelGatewaySenderQueue) {
- ConcurrentParallelGatewaySenderQueue pgsq = (ConcurrentParallelGatewaySenderQueue) queue;
- return pgsq.displayContent();
- }
- return null;
- }
-
public static Integer getQueueContentSize(final String senderId) {
return getQueueContentSize(senderId, false);
}
@@ -3187,22 +3135,14 @@ public class WANTestBase extends DistributedTestCase {
((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
.getAllLocalPrimaryBucketRegions();
- final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
- RegionQueue queue = abstractSender.getEventProcessor().queue;
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
- assertEquals("Expected events in all primary queues are drained but actual is "
- + abstractSender.getEventQueueSize() + ". Queue content is: "
- + displayQueueContent(queue), 0, abstractSender.getEventQueueSize());
- });
- assertEquals("Expected events in all primary queues after drain is 0", 0,
- abstractSender.getEventQueueSize());
- Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
- assertEquals("Expected events in all secondary queues are drained but actual is "
- + abstractSender.getEventSecondaryQueueSize() + ". Queue content is: "
- + displayQueueContent(queue), 0, abstractSender.getEventSecondaryQueueSize());
- });
- assertEquals("Except events in all secondary queues after drain is 0", 0,
- abstractSender.getEventSecondaryQueueSize());
+ for (final BucketRegion bucket : buckets) {
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ assertEquals("Expected bucket entries for bucket: " + bucket.getId()
+ + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: "
+ + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0,
+ bucket.keySet().size());
+ });
+ } // for loop ends
} finally {
exp.remove();
exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index f5b98b7..eaef4f9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -20,8 +20,6 @@ import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_S
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
-import java.util.ArrayList;
-
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -409,42 +407,18 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm2.invoke(() -> validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200));
// SECOND RUN: start async puts on region
- ArrayList<Integer> vm4List = null;
- ArrayList<Integer> vm5List = null;
- ArrayList<Integer> vm6List = null;
- ArrayList<Integer> vm7List = null;
- boolean foundDroppedAtYetStartedPrimarySender = false;
- int count = 0;
-
- do {
- stopSenders();
- AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
-
- // when puts are happening by another thread, start the senders
- startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
- async.join();
- vm4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
- vm5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
- vm6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
- vm7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln"));
- if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) {
- foundDroppedAtYetStartedPrimarySender = true;
- }
- count++;
- } while (foundDroppedAtYetStartedPrimarySender == false && count < 5);
- assertThat(foundDroppedAtYetStartedPrimarySender);
+ AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getTestMethodName() + "_PR", 5000));
+
+ // when puts are happening by another thread, start the senders
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ async.join();
// verify all the buckets on all the sender nodes are drained
validateParallelSenderQueueAllBucketsDrained();
// verify that the queue size ultimately becomes zero. That means all the events propagate to
// remote site.
-
vm4.invoke(() -> validateQueueContents("ln", 0));
}
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index 8df5650..ee43b83 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -269,53 +269,6 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
}
@Test
- public void testRestartSerialGatewaySendersWhilePutting() throws Throwable {
- Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createSenderCaches(lnPort);
-
- createSenderVM4();
- createSenderVM5();
-
- createReceiverRegions();
-
- createSenderRegions();
-
- vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 20));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
- vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 20));
-
- vm4.invoke(() -> WANTestBase.stopSender("ln"));
- vm5.invoke(() -> WANTestBase.stopSender("ln"));
-
- vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
- vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState("ln"));
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
- vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
-
- // do a lot of puts while senders are restarting
- AsyncInvocation async = vm7.invokeAsync(() -> doPuts(getTestMethodName() + "_RR", 5000));
-
- startSenderInVMsAsync("ln", vm4, vm5);
- async.join();
-
- vm4.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
- vm5.invoke(() -> WANTestBase.validateQueueSizeStat("ln", 0));
- vm4.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
- vm5.invoke(() -> WANTestBase.validateSecondaryQueueSizeStat("ln", 0));
- }
-
- @Test
public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
@@ -345,7 +298,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 200));
- // Do some puts from both vm4 and vm5 while restarting a sender
+ // Do some puts while restarting a sender
AsyncInvocation asyncPuts =
vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 300));
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.