You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jj...@apache.org on 2020/04/17 08:33:06 UTC
[geode] branch develop updated: GEODE-7940: Fix Tracking of
ShadowBuckets Destroyed (#4934)
This is an automated email from the ASF dual-hosted git repository.
jjramos pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new bfbb398 GEODE-7940: Fix Tracking of ShadowBuckets Destroyed (#4934)
bfbb398 is described below
commit bfbb398891c5d96fa3a5975365b29d71bd849ad6
Author: Juan José Ramos <ju...@users.noreply.github.com>
AuthorDate: Fri Apr 17 09:32:22 2020 +0100
GEODE-7940: Fix Tracking of ShadowBuckets Destroyed (#4934)
The BucketAdvisor can now keep track of more than just one shadow
bucket to avoid incorrectly marking all of them as destroyed.
- Added unit and distributed tests.
---
.../apache/geode/internal/cache/BucketAdvisor.java | 20 +++-
.../internal/cache/PartitionedRegionDataStore.java | 10 +-
.../wan/parallel/ParallelGatewaySenderQueue.java | 99 +++++++++---------
.../geode/internal/cache/BucketAdvisorTest.java | 80 +++++++++++++-
.../ParallelGatewaySenderOperationsDUnitTest.java | 116 +++++++++++++++++++++
5 files changed, 261 insertions(+), 64 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 822c20e..e4045c3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -33,6 +33,8 @@ import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -164,7 +166,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
private PartitionedRegion pRegion;
- private volatile boolean shadowBucketDestroyed;
+ final ConcurrentMap<String, Boolean> destroyedShadowBuckets = new ConcurrentHashMap<>();
/**
* Constructs a new BucketAdvisor for the Bucket owned by RegionAdvisor.
@@ -2742,11 +2744,19 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
}
}
- void setShadowBucketDestroyed(boolean destroyed) {
- shadowBucketDestroyed = destroyed;
+ void markAllShadowBucketsAsNonDestroyed() {
+ destroyedShadowBuckets.clear();
}
- public boolean getShadowBucketDestroyed() {
- return shadowBucketDestroyed;
+ void markAllShadowBucketsAsDestroyed() {
+ destroyedShadowBuckets.forEach((k, v) -> destroyedShadowBuckets.put(k, true));
+ }
+
+ void markShadowBucketAsDestroyed(String shadowBucketPath) {
+ destroyedShadowBuckets.put(shadowBucketPath, true);
+ }
+
+ public boolean isShadowBucketDestroyed(String shadowBucketPath) {
+ return destroyedShadowBuckets.getOrDefault(shadowBucketPath, false);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 1506de2..23a7487 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -451,11 +451,11 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
try {
buk.initializePrimaryElector(creationRequestor);
if (getPartitionedRegion().getColocatedWith() == null) {
- buk.getBucketAdvisor().setShadowBucketDestroyed(false);
+ buk.getBucketAdvisor().markAllShadowBucketsAsNonDestroyed();
}
if (getPartitionedRegion().isShadowPR()) {
getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
- .getBucketAdvisor(possiblyFreeBucketId).setShadowBucketDestroyed(false);
+ .getBucketAdvisor(possiblyFreeBucketId).markAllShadowBucketsAsNonDestroyed();
}
bukReg = createBucketRegion(possiblyFreeBucketId);
// Mark the bucket as hosting and distribute to peers
@@ -472,7 +472,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
bukReg.invokePartitionListenerAfterBucketCreated();
} else {
if (buk.getPartitionedRegion().getColocatedWith() == null) {
- buk.getBucketAdvisor().setShadowBucketDestroyed(true);
+ buk.getBucketAdvisor().markAllShadowBucketsAsDestroyed();
// clear tempQueue for all the shadowPR buckets
clearAllTempQueueForShadowPR(buk.getBucketId());
}
@@ -1423,7 +1423,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
&& buk.getPartitionedRegion().isShadowPR()) {
if (buk.getPartitionedRegion().getColocatedWithRegion() != null) {
buk.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
- .getBucketAdvisor(bucketId).setShadowBucketDestroyed(true);
+ .getBucketAdvisor(bucketId).markShadowBucketAsDestroyed(buk.getFullPath());
}
}
} catch (RegionDestroyedException ignore) {
@@ -1593,7 +1593,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
if (bucketRegion.getPartitionedRegion().isShadowPR()) {
if (bucketRegion.getPartitionedRegion().getColocatedWithRegion() != null) {
bucketRegion.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor()
- .getBucketAdvisor(bucketId).setShadowBucketDestroyed(true);
+ .getBucketAdvisor(bucketId).markAllShadowBucketsAsDestroyed();
}
}
bucketAdvisor.getProxyBucketRegion().removeBucket();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index a62eae6..97d64a7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -94,21 +94,18 @@ import org.apache.geode.management.internal.beans.AsyncEventQueueMBean;
import org.apache.geode.management.internal.beans.GatewaySenderMBean;
public class ParallelGatewaySenderQueue implements RegionQueue {
-
protected static final Logger logger = LogService.getLogger();
-
- protected final Map<String, PartitionedRegion> userRegionNameToshadowPRMap =
- new ConcurrentHashMap<String, PartitionedRegion>();
+ protected final Map<String, PartitionedRegion> userRegionNameToShadowPRMap =
+ new ConcurrentHashMap<>();
+ private static final String SHADOW_BUCKET_PATH_PREFIX =
+ Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + Region.SEPARATOR;
// <PartitionedRegion, Map<Integer, List<Object>>>
private final Map regionToDispatchedKeysMap = new ConcurrentHashMap();
protected final StoppableReentrantLock buckToDispatchLock;
-
private final StoppableCondition regionToDispatchedKeysMapEmpty;
-
protected final StoppableReentrantLock queueEmptyLock;
-
private volatile boolean isQueueEmpty = true;
/**
@@ -197,7 +194,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
try {
String regionPath =
ColocationHelper.getLeaderRegion((PartitionedRegion) event.getRegion()).getFullPath();
- prQ = userRegionNameToshadowPRMap.get(regionPath);
+ prQ = userRegionNameToShadowPRMap.get(regionPath);
destroyEventFromQueue(prQ, bucketId, previousTailKeyTobeRemoved);
} catch (EntryNotFoundException e) {
if (logger.isDebugEnabled()) {
@@ -313,7 +310,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
try {
String regionName = userRegion.getFullPath();
- if (this.userRegionNameToshadowPRMap.containsKey(regionName))
+ if (this.userRegionNameToShadowPRMap.containsKey(regionName))
return;
InternalCache cache = sender.getCache();
@@ -408,7 +405,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
}
} finally {
if (prQ != null) {
- this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ);
+ this.userRegionNameToShadowPRMap.put(userRegion.getFullPath(), prQ);
}
this.sender.getLifeCycleLock().writeLock().unlock();
}
@@ -436,13 +433,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// to leader PR)
// though, internally, colocate the GatewaySender's shadowPR with the leader PR in
// colocation chain
- if (!this.userRegionNameToshadowPRMap.containsKey(leaderRegionName)) {
+ if (!this.userRegionNameToShadowPRMap.containsKey(leaderRegionName)) {
addShadowPartitionedRegionForUserPR(ColocationHelper.getLeaderRegion(userPR));
}
return;
}
- if (this.userRegionNameToshadowPRMap.containsKey(regionName))
+ if (this.userRegionNameToShadowPRMap.containsKey(regionName))
return;
if (userPR.getDataPolicy().withPersistence() && !sender.isPersistenceEnabled()) {
@@ -539,7 +536,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
} finally {
if (prQ != null) {
- this.userRegionNameToshadowPRMap.put(userPR.getFullPath(), prQ);
+ this.userRegionNameToShadowPRMap.put(userPR.getFullPath(), prQ);
}
/*
* Here, enqueueTempEvents need to be invoked when a sender is already running and userPR is
@@ -663,9 +660,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (isDebugEnabled) {
logger.debug("Put is for the region {}", region);
}
- if (!this.userRegionNameToshadowPRMap.containsKey(regionPath)) {
+ if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
if (isDebugEnabled) {
- logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToshadowPRMap);
+ logger.debug("The userRegionNameToshadowPRMap is {}", userRegionNameToShadowPRMap);
}
logger.warn(
"GatewaySender: Not queuing the event {}, as the region for which this event originated is not yet configured in the GatewaySender",
@@ -674,7 +671,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return false;
}
- PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(regionPath);
+ PartitionedRegion prQ = this.userRegionNameToShadowPRMap.get(regionPath);
int bucketId = value.getBucketId();
Object key = null;
if (!isDREvent) {
@@ -703,16 +700,14 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
try {
if (brq == null) {
+ // Full path of the bucket
+ final String bucketFullPath = SHADOW_BUCKET_PATH_PREFIX + prQ.getBucketName(bucketId);
+
// Set the threadInitLevel to BEFORE_INITIAL_IMAGE.
final InitializationLevel oldLevel =
LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
- try {
- // Full path of the bucket:
-
- final String bucketFullPath =
- Region.SEPARATOR + PartitionedRegionHelper.PR_ROOT_REGION_NAME + Region.SEPARATOR
- + prQ.getBucketName(bucketId);
+ try {
brq = (AbstractBucketRegionQueue) prQ.getCache().getInternalRegionByPath(bucketFullPath);
if (isDebugEnabled) {
logger.debug(
@@ -739,8 +734,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
// chain is getting destroyed one by one starting from child region
// i.e this bucket due to moveBucket operation
// In that case we don't want to store this event.
- if (((PartitionedRegion) prQ.getColocatedWithRegion()).getRegionAdvisor()
- .getBucketAdvisor(bucketId).getShadowBucketDestroyed()) {
+ if (prQ.getColocatedWithRegion().getRegionAdvisor().getBucketAdvisor(bucketId)
+ .isShadowBucketDestroyed(bucketFullPath)) {
if (isDebugEnabled) {
logger.debug(
"ParallelGatewaySenderOrderedQueue not putting key {} : Value : {} as shadowPR bucket is destroyed.",
@@ -790,17 +785,16 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
} else {
- boolean thisbucketDestroyed = false;
+ boolean thisBucketDestroyed = brq.isDestroyed();
if (!isDREvent) {
- thisbucketDestroyed =
- ((PartitionedRegion) prQ.getColocatedWithRegion()).getRegionAdvisor()
- .getBucketAdvisor(bucketId).getShadowBucketDestroyed() || brq.isDestroyed();
- } else {
- thisbucketDestroyed = brq.isDestroyed();
+ // Full path of the bucket
+ final String bucketFullPath = SHADOW_BUCKET_PATH_PREFIX + prQ.getBucketName(bucketId);
+ thisBucketDestroyed |= prQ.getColocatedWithRegion().getRegionAdvisor()
+ .getBucketAdvisor(bucketId).isShadowBucketDestroyed(bucketFullPath);
}
- if (!thisbucketDestroyed) {
+ if (!thisBucketDestroyed) {
putIntoBucketRegionQueue(brq, key, value);
putDone = true;
} else {
@@ -815,6 +809,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
} finally {
notifyEventProcessorIfRequired();
}
+
return putDone;
}
@@ -874,19 +869,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
*/
@Override
public Region getRegion() {
- return this.userRegionNameToshadowPRMap.size() == 1
- ? (Region) this.userRegionNameToshadowPRMap.values().toArray()[0] : null;
+ return this.userRegionNameToShadowPRMap.size() == 1
+ ? (Region) this.userRegionNameToShadowPRMap.values().toArray()[0] : null;
}
public PartitionedRegion getRegion(String fullpath) {
- return this.userRegionNameToshadowPRMap.get(fullpath);
+ return this.userRegionNameToShadowPRMap.get(fullpath);
}
public PartitionedRegion removeShadowPR(String fullpath) {
try {
this.sender.getLifeCycleLock().writeLock().lock();
this.sender.setEnqueuedAllTempQueueEvents(false);
- return this.userRegionNameToshadowPRMap.remove(fullpath);
+ return this.userRegionNameToShadowPRMap.remove(fullpath);
} finally {
sender.getLifeCycleLock().writeLock().unlock();
}
@@ -900,15 +895,15 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
* Returns the set of shadowPR backign this queue.
*/
public Set<PartitionedRegion> getRegions() {
- return new HashSet(this.userRegionNameToshadowPRMap.values());
+ return new HashSet(this.userRegionNameToShadowPRMap.values());
}
// TODO: Find optimal way to get Random shadow pr as this will be called in each put and peek.
protected PartitionedRegion getRandomShadowPR() {
PartitionedRegion prQ = null;
- if (this.userRegionNameToshadowPRMap.values().size() > 0) {
- int randomIndex = new Random().nextInt(this.userRegionNameToshadowPRMap.size());
- prQ = (PartitionedRegion) this.userRegionNameToshadowPRMap.values().toArray()[randomIndex];
+ if (this.userRegionNameToShadowPRMap.values().size() > 0) {
+ int randomIndex = new Random().nextInt(this.userRegionNameToShadowPRMap.size());
+ prQ = (PartitionedRegion) this.userRegionNameToShadowPRMap.values().toArray()[randomIndex];
}
return prQ;
}
@@ -930,7 +925,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
protected boolean areLocalBucketQueueRegionsPresent() {
boolean bucketsAvailable = false;
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
if (prQ.getDataStore().getAllLocalBucketRegions().size() > 0)
return true;
}
@@ -994,11 +989,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
Object key = null;
if (event.getRegion() != null) {
if (isDREvent(sender.getCache(), event)) {
- prQ = this.userRegionNameToshadowPRMap.get(event.getRegion().getFullPath());
+ prQ = this.userRegionNameToShadowPRMap.get(event.getRegion().getFullPath());
bucketId = event.getEventId().getBucketID();
key = event.getEventId();
} else {
- prQ = this.userRegionNameToshadowPRMap.get(ColocationHelper
+ prQ = this.userRegionNameToShadowPRMap.get(ColocationHelper
.getLeaderRegion((PartitionedRegion) event.getRegion()).getFullPath());
bucketId = event.getBucketId();
key = event.getShadowKey();
@@ -1010,11 +1005,11 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
if (region != null && !region.isDestroyed()) {
// TODO: We have to get colocated parent region for this region
if (region instanceof DistributedRegion) {
- prQ = this.userRegionNameToshadowPRMap.get(region.getFullPath());
+ prQ = this.userRegionNameToShadowPRMap.get(region.getFullPath());
event.getBucketId();
key = event.getEventId();
} else {
- prQ = this.userRegionNameToshadowPRMap
+ prQ = this.userRegionNameToShadowPRMap
.get(ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath());
event.getBucketId();
key = event.getShadowKey();
@@ -1429,7 +1424,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public String displayContent() {
int size = 0;
StringBuffer sb = new StringBuffer();
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
if (prQ != null && prQ.getDataStore() != null) {
Set<BucketRegion> allLocalBuckets = prQ.getDataStore().getAllLocalBucketRegions();
for (BucketRegion br : allLocalBuckets) {
@@ -1448,7 +1443,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public int localSize(boolean includeSecondary) {
int size = 0;
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
if (prQ != null && prQ.getDataStore() != null) {
if (includeSecondary) {
size += prQ.getDataStore().getSizeOfLocalBuckets();
@@ -1466,7 +1461,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public int localSizeForProcessor() {
int size = 0;
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
if (((PartitionedRegion) prQ.getRegion()).getDataStore() != null) {
Set<BucketRegion> primaryBuckets =
((PartitionedRegion) prQ.getRegion()).getDataStore().getAllLocalPrimaryBucketRegions();
@@ -1487,7 +1482,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
@Override
public int size() {
int size = 0;
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
if (logger.isDebugEnabled()) {
logger.debug("The name of the queue region is {} and the size is {}. keyset size is {}",
prQ.getName(), prQ.size(), prQ.keys().size());
@@ -1500,7 +1495,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
@Override
public void addCacheListener(CacheListener listener) {
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
AttributesMutator mutator = prQ.getAttributesMutator();
mutator.addCacheListener(listener);
}
@@ -1526,7 +1521,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public long getNumEntriesOverflowOnDiskTestOnly() {
long numEntriesOnDisk = 0;
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
DiskRegionStats diskStats = prQ.getDiskRegionStats();
if (diskStats == null) {
if (logger.isDebugEnabled()) {
@@ -1548,7 +1543,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public long getNumEntriesInVMTestOnly() {
long numEntriesInVM = 0;
- for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
+ for (PartitionedRegion prQ : this.userRegionNameToShadowPRMap.values()) {
DiskRegionStats diskStats = prQ.getDiskRegionStats();
if (diskStats == null) {
if (logger.isDebugEnabled()) {
@@ -1835,7 +1830,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
public long estimateMemoryFootprint(SingleObjectSizer sizer) {
return sizer.sizeof(this) + sizer.sizeof(regionToDispatchedKeysMap)
- + sizer.sizeof(userRegionNameToshadowPRMap) + sizer.sizeof(bucketToTempQueueMap)
+ + sizer.sizeof(userRegionNameToShadowPRMap) + sizer.sizeof(bucketToTempQueueMap)
+ sizer.sizeof(peekedEvents) + sizer.sizeof(conflationExecutor);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 2eac9ac..d49430b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -14,8 +14,10 @@
*/
package org.apache.geode.internal.cache;
+import static com.google.common.collect.ImmutableMap.of;
import static org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
@@ -28,6 +30,7 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.junit.Test;
import org.mockito.Mockito;
@@ -59,6 +62,7 @@ public class BucketAdvisorTest {
ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
PartitionedRegion mockPartitionedRegion = mock(PartitionedRegion.class);
+ @SuppressWarnings("rawtypes")
PartitionAttributes mockPartitionAttributes = mock(PartitionAttributes.class);
DistributionManager mockDistributionManager = mock(DistributionManager.class);
List<CacheServer> cacheServers = new ArrayList<>();
@@ -80,12 +84,13 @@ public class BucketAdvisorTest {
assertThat(bucketAdvisor.getBucketServerLocations(0).size()).isEqualTo(0);
}
- @Test(expected = IllegalStateException.class)
+ @Test
public void whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown() {
InternalCache mockCache = mock(InternalCache.class);
ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
PartitionedRegion mockPartitionedRegion = mock(PartitionedRegion.class);
+ @SuppressWarnings("rawtypes")
PartitionAttributes mockPartitionAttributes = mock(PartitionAttributes.class);
DistributionManager mockDistributionManager = mock(DistributionManager.class);
List<CacheServer> cacheServers = new ArrayList<>();
@@ -103,7 +108,8 @@ public class BucketAdvisorTest {
when(mockCacheServer.getExternalAddress()).thenThrow(new IllegalStateException());
BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(mockBucket, mockRegionAdvisor);
- bucketAdvisor.getBucketServerLocations(0).size();
+ assertThatThrownBy(() -> bucketAdvisor.getBucketServerLocations(0))
+ .isInstanceOf(IllegalStateException.class);
}
@Test
@@ -150,4 +156,74 @@ public class BucketAdvisorTest {
advisorSpy.volunteerForPrimary();
verify(volunteeringDelegate).volunteerForPrimary();
}
+
+ BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(Map<String, Boolean> shadowBuckets) {
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ when(distributionManager.getId()).thenReturn(new InternalDistributedMember("localhost", 321));
+
+ Bucket bucket = mock(Bucket.class);
+ when(bucket.isHosting()).thenReturn(true);
+ when(bucket.isPrimary()).thenReturn(false);
+ when(bucket.getDistributionManager()).thenReturn(distributionManager);
+
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ when(partitionedRegion.getRedundantCopies()).thenReturn(0);
+ when(partitionedRegion.getPartitionAttributes()).thenReturn(new PartitionAttributesImpl());
+ RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+ when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+
+ BucketAdvisor bucketAdvisor = BucketAdvisor.createBucketAdvisor(bucket, regionAdvisor);
+ bucketAdvisor.destroyedShadowBuckets.putAll(shadowBuckets);
+
+ return bucketAdvisor;
+ }
+
+ @Test
+ public void markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
+ Map<String, Boolean> buckets = of("/b1", false, "/b2", true);
+ BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+ assertThat(bucketAdvisor.destroyedShadowBuckets).isNotEmpty();
+ bucketAdvisor.markAllShadowBucketsAsNonDestroyed();
+ assertThat(bucketAdvisor.destroyedShadowBuckets).isEmpty();
+ }
+
+ @Test
+ public void markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket() {
+ Map<String, Boolean> buckets = of("/b1", false, "/b2", false, "/b3", false);
+ BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+ bucketAdvisor.destroyedShadowBuckets.forEach((k, v) -> assertThat(v).isFalse());
+ bucketAdvisor.markAllShadowBucketsAsDestroyed();
+ bucketAdvisor.destroyedShadowBuckets.forEach((k, v) -> assertThat(v).isTrue());
+ }
+
+ @Test
+ public void markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
+ Map<String, Boolean> buckets = of("/b1", false);
+ BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+ // Known Shadow Bucket
+ assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b1")).isFalse();
+ bucketAdvisor.markShadowBucketAsDestroyed("/b1");
+ assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b1")).isTrue();
+
+ // Unknown Shadow Bucket
+ assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b5")).isNull();
+ bucketAdvisor.markShadowBucketAsDestroyed("/b5");
+ assertThat(bucketAdvisor.destroyedShadowBuckets.get("/b5")).isTrue();
+ }
+
+ @Test
+ public void isShadowBucketDestroyedShouldReturnCorrectly() {
+ Map<String, Boolean> buckets = of("/b1", true, "/b2", false);
+ BucketAdvisor bucketAdvisor = mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
+
+ // Known Shadow Buckets
+ assertThat(bucketAdvisor.isShadowBucketDestroyed("/b1")).isTrue();
+ assertThat(bucketAdvisor.isShadowBucketDestroyed("/b2")).isFalse();
+
+ // Unknown Shadow Bucket
+ assertThat(bucketAdvisor.isShadowBucketDestroyed("/b5")).isFalse();
+ }
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 1d0ad7b..ff5f0f5 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -15,7 +15,9 @@
package org.apache.geode.internal.cache.wan.parallel;
import static org.apache.geode.distributed.internal.DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite;
import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
+import static org.apache.geode.internal.util.ArrayUtils.asList;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
@@ -25,14 +27,21 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import org.junit.Before;
import org.junit.Rule;
@@ -63,6 +72,7 @@ import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
/**
* DUnit test for operations on ParallelGatewaySender
@@ -72,6 +82,9 @@ import org.apache.geode.test.junit.categories.WanTest;
public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
@Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@Rule
@@ -643,6 +656,109 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
}
@Test
+ public void destroyParallelGatewaySenderShouldNotStopDispatchingFromOtherSendersAttachedToTheRegion() {
+ String site2SenderId = "site2-sender";
+ String site3SenderId = "site3-sender";
+ String regionName = testName.getMethodName();
+ int[] ports = getRandomAvailableTCPPortsForDUnitSite(3);
+ int site1Port = ports[0];
+ int site2Port = ports[1];
+ int site3Port = ports[2];
+ Set<String> site1RemoteLocators =
+ Stream.of("localhost[" + site2Port + "]", "localhost[" + site3Port + "]")
+ .collect(Collectors.toSet());
+ Set<String> site2RemoteLocators =
+ Stream.of("localhost[" + site1Port + "]", "localhost[" + site3Port + "]")
+ .collect(Collectors.toSet());
+ Set<String> site3RemoteLocators =
+ Stream.of("localhost[" + site1Port + "]", "localhost[" + site2Port + "]")
+ .collect(Collectors.toSet());
+
+ // Start 3 sites.
+ vm0.invoke(() -> createLocator(1, site1Port,
+ Collections.singleton("localhost[" + site1Port + "]"), site1RemoteLocators));
+ vm1.invoke(() -> createLocator(2, site2Port,
+ Collections.singleton("localhost[" + site2Port + "]"), site2RemoteLocators));
+ vm2.invoke(() -> createLocator(3, site3Port,
+ Collections.singleton("localhost[" + site3Port + "]"), site3RemoteLocators));
+
+ // Create the cache on the 3 sites.
+ createCacheInVMs(site1Port, vm3);
+ createCacheInVMs(site2Port, vm4);
+ createCacheInVMs(site3Port, vm5);
+
+ // Create receiver and region on sites 2 and 3.
+ asList(vm4, vm5).forEach(vm -> vm.invoke(() -> {
+ createReceiver();
+ createPartitionedRegion(regionName, null, 1, 113, isOffHeap());
+ }));
+
+ // Create senders and partitioned region on site 1.
+ vm3.invoke(() -> {
+ createSender(site2SenderId, 2, true, 100, 20, false, false, null, false);
+ createSender(site3SenderId, 3, true, 100, 20, false, false, null, false);
+ waitForSenderRunningState(site2SenderId);
+ waitForSenderRunningState(site3SenderId);
+
+ createPartitionedRegion(regionName, String.join(",", site2SenderId, site3SenderId), 1, 113,
+ isOffHeap());
+ });
+
+ // #################################################################################### //
+
+ final int FIRST_BATCH = 100;
+ final int SECOND_BATCH = 200;
+ final Map<String, String> firstBatch = new HashMap<>();
+ IntStream.range(0, FIRST_BATCH).forEach(i -> firstBatch.put("Key" + i, "Value" + i));
+ final Map<String, String> secondBatch = new HashMap<>();
+ IntStream.range(FIRST_BATCH, SECOND_BATCH)
+ .forEach(i -> secondBatch.put("Key" + i, "Value" + i));
+
+ // Insert first batch and wait until the queues are empty.
+ vm3.invoke(() -> {
+ cache.getRegion(regionName).putAll(firstBatch);
+ checkQueueSize(site2SenderId, 0);
+ checkQueueSize(site3SenderId, 0);
+ });
+
+ // Wait until sites 2 and 3 have received all updates.
+ asList(vm4, vm5).forEach(vm -> vm.invoke(() -> {
+ Region<String, String> region = cache.getRegion(regionName);
+ await().untilAsserted(() -> assertThat(region.size()).isEqualTo(FIRST_BATCH));
+ firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+ }));
+
+ // Stop sender to site3, remove it from the region and destroy it.
+ vm3.invoke(() -> {
+ stopSender(site3SenderId);
+ removeSenderFromTheRegion(site3SenderId, regionName);
+ destroySender(site3SenderId);
+ verifySenderDestroyed(site3SenderId, true);
+ });
+
+ // Insert second batch and wait until the queue is empty.
+ vm3.invoke(() -> {
+ cache.getRegion(regionName).putAll(secondBatch);
+ checkQueueSize(site2SenderId, 0);
+ });
+
+ // Site 3 should only have the first batch.
+ vm5.invoke(() -> {
+ Region<String, String> region = cache.getRegion(regionName);
+ await().untilAsserted(() -> assertThat(region.size()).isEqualTo(FIRST_BATCH));
+ firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+ });
+
+ // Site 2 should have both batches.
+ vm4.invoke(() -> {
+ Region<String, String> region = cache.getRegion(regionName);
+ await().untilAsserted(() -> assertThat(region.size()).isEqualTo(SECOND_BATCH));
+ firstBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+ secondBatch.forEach((key, value) -> assertThat(region.get(key)).isEqualTo(value));
+ });
+ }
+
+ @Test
public void testParallelGatewaySenderMessageTooLargeException() {
vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, String.valueOf(1024 * 1024)));