You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/08/30 23:17:44 UTC
[geode] branch develop updated: GEODE-5653: Synchronize and free GatewaySenderEvents in background th… (#2407)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 1a62c5a GEODE-5653: Synchronize and free GatewaySenderEvents in background th… (#2407)
1a62c5a is described below
commit 1a62c5a29f7d5a339803ea1125051e6a5d7d2c99
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Thu Aug 30 16:17:39 2018 -0700
GEODE-5653: Synchronize and free GatewaySenderEvents in background th… (#2407)
* Synchronize and free GatewaySenderEvents in background thread
* Await off-heap release in test
---
.../concurrent/CustomEntryConcurrentHashMap.java | 174 +++++++++++----------
.../ParallelGatewaySenderOperationsDUnitTest.java | 139 ++++++++++++++++
2 files changed, 232 insertions(+), 81 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
index 8b3c958..9edb789 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
@@ -60,6 +60,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.geode.CancelException;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
+import org.apache.geode.internal.cache.tier.sockets.command.KeySet;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
import org.apache.geode.internal.size.SingleObjectSizer;
@@ -111,6 +112,10 @@ import org.apache.geode.internal.util.ArrayUtils;
* This class is a member of the <a href="{@docRoot} /../technotes/guides/collections/index.html">
* Java Collections Framework</a>.
*
+ * <p>
+ * Modifications made to the Java {@link java.util.Hashtable} implementation
+ * are indicated using comments to mark the beginning and end of Geode changes.
+ *
* @since Java 1.5
* @author Doug Lea
* @param <K> the type of keys maintained by this map
@@ -164,14 +169,14 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
*/
static final int RETRIES_BEFORE_LOCK = 2;
- // GemStone addition
+ // Geode addition
/**
* Token object to indicate that {@link #remove(Object)} does not need to compare against provided
* value before removing from segment.
*/
private static final Object NO_OBJECT_TOKEN = new Object();
- // End GemStone addition
+ // End Geode addition
/* ---------------- Fields -------------- */
@@ -204,7 +209,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
transient Set<K> keySet;
transient Set<Map.Entry<K, V>> entrySet;
- transient Set<Map.Entry<K, V>> reusableEntrySet; // GemStone addition
+ transient Set<Map.Entry<K, V>> reusableEntrySet; // Geode addition
transient Collection<V> values;
/* ---------------- Small Utilities -------------- */
@@ -242,8 +247,8 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
/* ---------------- Inner Classes -------------- */
- // GemStone addition
- // GemStone changed HashEntry to be an interface with original HashEntry
+ // Geode addition
+ // Geode changed HashEntry to be an interface with original HashEntry
// as the default implementation HashEntryImpl.
/**
@@ -391,7 +396,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
int keyHashCode(Object key, boolean compareValues);
}
- // End GemStone addition
+ // End Geode addition
/**
* Segments are specialized versions of hash tables. This subclasses from ReentrantLock
@@ -462,7 +467,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
*/
final float loadFactor;
- // GemStone addition
+ // Geode addition
/**
* {@link org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntryCreator}
@@ -478,7 +483,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
*/
final ReentrantReadWriteLock listUpdateLock;
- // End GemStone addition
+ // End Geode addition
Segment(final int initialCapacity, final float lf, final HashEntryCreator<K, V> entryCreator) {
this.loadFactor = lf;
@@ -492,7 +497,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return new Segment[i];
}
- // GemStone added the method below
+ // Geode added the method below
@SuppressWarnings("unchecked")
static <K, V> HashEntry<K, V>[] newEntryArray(final int size) {
return new HashEntry[size];
@@ -552,7 +557,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
V get(final Object key, final int hash) {
if (this.count != 0) { // read-volatile
- // GemStone change to acquire the read lock on list updates
+ // Geode change to acquire the read lock on list updates
final ReentrantReadWriteLock.ReadLock listLock = this.listUpdateLock.readLock();
listLock.lock();
boolean lockAcquired = true;
@@ -581,7 +586,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
V getNoLock(final Object key, final int hash, final boolean lockListForRead) {
if (this.count != 0) { // read-volatile
- // GemStone change to acquire the read lock on list updates
+ // Geode change to acquire the read lock on list updates
ReentrantReadWriteLock.ReadLock listLock = null;
if (lockListForRead) {
listLock = this.listUpdateLock.readLock();
@@ -606,7 +611,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
boolean containsKey(final Object key, final int hash) {
if (this.count != 0) { // read-volatile
- // GemStone change to acquire the read lock on list updates
+ // Geode change to acquire the read lock on list updates
final ReentrantReadWriteLock.ReadLock listLock = this.listUpdateLock.readLock();
listLock.lock();
HashEntry<K, V> e = getFirst(hash);
@@ -626,7 +631,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
boolean containsValue(final Object value) {
if (this.count != 0) { // read-volatile
- // GemStone change to acquire the read lock on list updates
+ // Geode change to acquire the read lock on list updates
ReentrantReadWriteLock.ReadLock readLock = this.listUpdateLock.readLock();
RETRYLOOP: for (;;) {
readLock.lock();
@@ -636,7 +641,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
for (HashEntry<K, V> e = tab[i]; e != null; e = e.getNextEntry()) {
V v = e.getMapValue();
if (v == null) {
- // GemStone changes BEGIN
+ // Geode changes BEGIN
// go back and retry from the very start with segment read lock
readLock.unlock();
readLock = super.readLock();
@@ -644,7 +649,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
/*
* (original code) v = readValueUnderLock(e);
*/
- // GemStone changes END
+ // Geode changes END
}
if (equalityCompare(value, v)) {
readLock.unlock();
@@ -733,7 +738,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
}
}
- // GemStone additions
+ // Geode additions
<C, P> V create(final K key, final int hash, final MapCallback<K, V, C, P> valueCreator,
final C context, final P createParams, final boolean lockForRead) {
@@ -826,7 +831,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return null;
}
- // End GemStone additions
+ // End Geode additions
void rehash() {
final HashEntry<K, V>[] oldTable = this.table;
@@ -874,7 +879,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
newTable[lastIdx] = lastRun;
// Clone all remaining nodes
- // GemStone changes BEGIN
+ // Geode changes BEGIN
// update the next entry instead of cloning the nodes in newTable;
// this is primarily because we don't want to change
// the underlying RegionEntry that may be used elsewhere;
@@ -919,7 +924,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
* p.hash & sizeMask; final HashEntry<K, V> n = newTable[k]; newTable[k] =
* this.entryCreator.newEntry(p.key, p.hash, n, p.value); }
*/
- // GemStone changes END
+ // Geode changes END
}
}
}
@@ -929,11 +934,11 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
/**
* Remove; match on key only if value null, else match both.
*/
- // GemStone change
+ // Geode change
// added "condition" and "removeParams" parameters
<C, P> V remove(final Object key, final int hash, final Object value,
final MapCallback<K, V, C, P> condition, final C context, final P removeParams) {
- // End GemStone change
+ // End Geode change
final ReentrantReadWriteLock.WriteLock writeLock = super.writeLock();
writeLock.lock();
try {
@@ -942,7 +947,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
final int index = hash & (tab.length - 1);
final HashEntry<K, V> first = tab[index];
HashEntry<K, V> e = first;
- // GemStone change
+ // Geode change
// the entry previous to the matched one, if any
HashEntry<K, V> p = null;
while (e != null && (e.getEntryHash() != hash || !equalityKeyCompare(key, e))) {
@@ -957,17 +962,17 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
V oldValue = null;
if (e != null) {
final V v = e.getMapValue();
- // GemStone change
+ // Geode change
// allow for passing in a null object for comparison during remove;
// also invoke the provided condition to check for removal
if ((value == NO_OBJECT_TOKEN || equalityCompareWithNulls(v, value))
&& (condition == null || condition.doRemoveValue(v, context, removeParams))) {
- // End GemStone change
+ // End Geode change
oldValue = v;
// All entries following removed node can stay in list,
// but all preceding ones need to be cloned.
++this.modCount;
- // GemStone changes BEGIN
+ // Geode changes BEGIN
// update the next entry instead of cloning the nodes
// this is primarily because we don't want to change
// the underlying RegionEntry that may be used elsewhere
@@ -987,7 +992,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
* != e; p = p.next) { newFirst = this.entryCreator.newEntry(p.key, p.hash, newFirst,
* p.value); } tab[index] = newFirst;
*/
- // GemStone changes END
+ // Geode changes END
this.count = c; // write-volatile
}
}
@@ -998,7 +1003,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
}
/**
- * GemStone added the clearedEntries param and the result
+ * Geode added the clearedEntries param and the result
*/
ArrayList<HashEntry<?, ?>> clear(ArrayList<HashEntry<?, ?>> clearedEntries) {
if (this.count != 0) {
@@ -1006,25 +1011,26 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
writeLock.lock();
try {
final HashEntry<K, V>[] tab = this.table;
- // GemStone changes BEGIN
- boolean collectEntries = clearedEntries != null;
- if (!collectEntries) {
- // see if we have a map with off-heap region entries
- for (HashEntry<K, V> he : tab) {
- if (he != null) {
- collectEntries = he instanceof OffHeapRegionEntry;
- if (collectEntries) {
- clearedEntries = new ArrayList<HashEntry<?, ?>>();
+ // Geode changes BEGIN
+ if (clearedEntries == null) {
+ final boolean checkForGatewaySenderEvent =
+ OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap();
+ if (checkForGatewaySenderEvent) {
+ clearedEntries = new ArrayList<HashEntry<?, ?>>();
+ } else {
+ // see if we have a map with off-heap region entries
+ for (HashEntry<K, V> he : tab) {
+ if (he != null) {
+ if (he instanceof OffHeapRegionEntry) {
+ clearedEntries = new ArrayList<HashEntry<?, ?>>();
+ }
+ // after the first non-null entry we are done
+ break;
}
- // after the first non-null entry we are done
- break;
}
}
}
- final boolean checkForGatewaySenderEvent =
- OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap();
- final boolean skipProcessOffHeap = !collectEntries && !checkForGatewaySenderEvent;
- if (skipProcessOffHeap) {
+ if (clearedEntries == null) {
Arrays.fill(tab, null);
} else {
for (int i = 0; i < tab.length; i++) {
@@ -1032,20 +1038,9 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
if (he == null)
continue;
tab[i] = null;
- if (collectEntries) {
- clearedEntries.add(he);
- } else {
- for (HashEntry<K, V> p = he; p != null; p = p.getNextEntry()) {
- if (p instanceof RegionEntry) {
- // It is ok to call GatewaySenderEventImpl release without being synced
- // on the region entry. It will not create an orphan.
- GatewaySenderEventImpl.release(((RegionEntry) p).getValue()); // OFFHEAP
- // _getValue ok
- }
- }
- }
+ clearedEntries.add(he);
}
- // GemStone changes END
+ // Geode changes END
}
++this.modCount;
this.count = 0; // write-volatile
@@ -1053,7 +1048,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
writeLock.unlock();
}
}
- return clearedEntries; // GemStone change
+ return clearedEntries; // Geode change
}
}
@@ -1116,7 +1111,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
this(initialCapacity, loadFactor, concurrencyLevel, false, null);
}
- // GemStone addition
+ // Geode addition
/**
* Creates a new, empty map with the specified initial capacity, load factor and concurrency
@@ -1223,7 +1218,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
}
}
- // End GemStone addition
+ // End Geode addition
/**
* Creates a new, empty map with the specified initial capacity and load factor and with the
@@ -1519,7 +1514,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return segmentFor(hash).put(key, hash, value, true);
}
- // GemStone addition
+ // Geode addition
/**
* Create a given key, value mapping if the key does not exist in the map else do nothing. The
@@ -1708,7 +1703,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return segmentFor(hash).remove(key, hash, NO_OBJECT_TOKEN, condition, context, removeParams);
}
- // End GemStone addition
+ // End Geode addition
/**
* Copies all of the mappings from the specified map to this one. These mappings replace any
@@ -1783,7 +1778,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return segmentFor(hash).replace(key, hash, value);
}
- // GemStone addition
+ // Geode addition
@Override
public void clearWithExecutor(Executor executor) {
ArrayList<HashEntry<?, ?>> entries = null;
@@ -1794,17 +1789,34 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
} finally {
if (entries != null) {
final ArrayList<HashEntry<?, ?>> clearedEntries = entries;
- final Runnable runnable = new Runnable() {
- public void run() {
- for (HashEntry<?, ?> he : clearedEntries) {
- for (HashEntry<?, ?> p = he; p != null; p = p.getNextEntry()) {
- synchronized (p) {
- ((OffHeapRegionEntry) p).release();
+ Runnable runnable;
+ if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
+ runnable = new Runnable() {
+ public void run() {
+ for (HashEntry<?, ?> he : clearedEntries) {
+ for (HashEntry<?, ?> p = he; p != null; p = p.getNextEntry()) {
+ if (p instanceof RegionEntry) {
+ synchronized (p) {
+ GatewaySenderEventImpl.release(((RegionEntry) p).getValue()); // OFFHEAP
+ }
+ }
}
}
}
- }
- };
+ };
+ } else {
+ runnable = new Runnable() {
+ public void run() {
+ for (HashEntry<?, ?> he : clearedEntries) {
+ for (HashEntry<?, ?> p = he; p != null; p = p.getNextEntry()) {
+ synchronized (p) {
+ ((OffHeapRegionEntry) p).release();
+ }
+ }
+ }
+ }
+ };
+ }
boolean submitted = false;
if (executor != null) {
try {
@@ -1835,7 +1847,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
public void clear() {
clearWithExecutor(null);
}
- // End GemStone addition
+ // End Geode addition
/**
* Returns a {@link Set} view of the keys contained in this map. The set is backed by the map, so
@@ -1895,7 +1907,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return (es != null) ? es : (this.entrySet = new EntrySet(false));
}
- // GemStone addition
+ // Geode addition
@Override
public Set<Map.Entry<K, V>> entrySetWithReusableEntries() {
@@ -1903,7 +1915,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return (es != null) ? es : (this.reusableEntrySet = new EntrySet(true));
}
- // End GemStone addition
+ // End Geode addition
/**
* Returns an enumeration of the keys in this table.
@@ -1933,7 +1945,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
int nextTableIndex;
- // GemStone changed HashEntry<K, V>[] currentTable to currentSegment
+ // Geode changed HashEntry<K, V>[] currentTable to currentSegment
HashEntry<K, V>[] currentTable;
HashEntry<K, V> nextEntry;
@@ -1957,7 +1969,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
}
void advance() {
- // GemStone changes BEGIN
+ // Geode changes BEGIN
if (this.currentListIndex < this.currentList.size()) {
this.nextEntry = this.currentList.get(this.currentListIndex++);
return;
@@ -1987,7 +1999,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
* while (this.nextTableIndex >= 0) { if ((this.nextEntry =
* this.currentTable[this.nextTableIndex--]) != null) { return; } }
*/
- // GemStone changes END
+ // Geode changes END
while (this.currentSegmentIndex > 0) {
final Segment<K, V> seg =
@@ -2011,7 +2023,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
}
}
- // GemStone added the method below
+ // Geode added the method below
/**
* Copy the tail of list of current matched entry ({@link #nextEntry}) to a temporary list, so
* that the read lock can be released after the copy.
@@ -2083,7 +2095,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
private static final long serialVersionUID = 1591026397367910439L;
- protected K key; // GemStone change; made non-final to enable reuse
+ protected K key; // Geode change; made non-final to enable reuse
private V value;
@@ -2243,7 +2255,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
class EntryIterator extends HashIterator implements Iterator<Map.Entry<K, V>> {
- // GemStone change
+ // Geode change
// added possibility to reuse a single Map.Entry for entire iteration
final WriteThroughEntry reusableEntry;
@@ -2260,7 +2272,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
}
return new WriteThroughEntry(e.getKey(), e.getMapValue());
}
- // End GemStone change
+ // End Geode change
}
class KeySet extends AbstractSet<K> {
@@ -2316,7 +2328,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
class EntrySet extends AbstractSet<Map.Entry<K, V>> {
- // GemStone change
+ // Geode change
// added possibility to reuse a single Map.Entry for entire iteration
final WriteThroughEntry reusableEntry;
@@ -2333,7 +2345,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
return new EntryIterator(this.reusableEntry);
}
- // End GemStone change
+ // End Geode change
@Override
public boolean contains(final Object o) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index b2a1971..df7bbb6 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -14,28 +14,53 @@
*/
package org.apache.geode.internal.cache.wan.parallel;
+import static org.apache.geode.distributed.internal.DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME;
import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import util.TestException;
import org.apache.geode.GemFireIOException;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.internal.offheap.MemoryAllocatorImpl;
+import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.RMIException;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.categories.WanTest;
/**
@@ -46,6 +71,9 @@ import org.apache.geode.test.junit.categories.WanTest;
public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
@Rule
+ public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+ @Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
@@ -649,6 +677,117 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
});
}
+ @Test
+ public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans()
+ throws Exception {
+ MemberVM locator = clusterStartupRule.startLocatorVM(1, new Properties());
+ Properties properties = new Properties();
+ properties.put(OFF_HEAP_MEMORY_SIZE_NAME, "100");
+ MemberVM server = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+ final String regionName = "portfolios";
+ final String gatewaySenderId = "ln";
+
+ server.invoke(() -> {
+ IgnoredException ie = addIgnoredException("could not get remote locator");
+ InternalCache cache = ClusterStartupRule.getCache();
+ GatewaySender sender =
+ cache.createGatewaySenderFactory().setParallel(true).create(gatewaySenderId, 1);
+ Region userRegion = cache.createRegionFactory(RegionShortcut.PARTITION).setOffHeap(true)
+ .addGatewaySenderId("ln").create(regionName);
+ PartitionedRegion shadowRegion = (PartitionedRegion) ((AbstractGatewaySender) sender)
+ .getEventProcessor().getQueue().getRegion();
+ CacheWriter mockCacheWriter = mock(CacheWriter.class);
+ CountDownLatch cacheWriterLatch = new CountDownLatch(1);
+ CountDownLatch shadowRegionClearLatch = new CountDownLatch(1);
+
+ doAnswer(invocation -> {
+ // The cache writer is invoked between when the region entry is created with value of
+ // Token.REMOVED_PHASE_1 and when it is replaced with the actual GatewaySenderEvent.
+ // We use this hook to trigger the clear logic when the token is still in the
+ // region entry.
+ cacheWriterLatch.countDown();
+ // Wait until the clear is complete before putting the actual GatewaySenderEvent in the
+ // region entry.
+ shadowRegionClearLatch.await();
+ return null;
+ }).when(mockCacheWriter).beforeCreate(any());
+
+ shadowRegion.setCacheWriter(mockCacheWriter);
+
+ ExecutorService service = Executors.newFixedThreadPool(2);
+
+ List<Callable<Object>> callables = new ArrayList<>();
+
+ // In one thread, we are going to put some test data in the user region,
+ // which will eventually put the GatewaySenderEvent into the shadow region
+ callables.add(Executors.callable(() -> {
+ try {
+ userRegion.put("testKey", "testValue");
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+
+ // In another thread, we are clear the shadow region's buckets. If the region entry is a
+ // Token.REMOVED_PHASE_1 at this time, a leak is possible. We can guarantee that the
+ // Token.REMOVED_PHASE_1 is present by using the mocked cache writer defined
+ // above.
+ callables.add(Executors.callable(() -> {
+ try {
+ OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
+ @Override
+ public void run() {
+ // Wait for the cache writer to be invoked to release this countdown latch.
+ // This guarantees that the region entry will contain a Token.REMOVED_PHASE_1.
+ try {
+ cacheWriterLatch.await();
+ } catch (InterruptedException e) {
+ throw new TestException(
+ "Thread was interrupted while waiting for mocked cache writer to be invoked");
+ }
+
+ clearShadowBucketRegions(shadowRegion);
+
+ // Signal to the cache writer that the clear is complete and the put can continue.
+ shadowRegionClearLatch.countDown();
+ }
+ });
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+
+ List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+ for (Future<Object> future : futures) {
+ try {
+ future.get();
+ } catch (Exception ex) {
+ throw new TestException(
+ "Exception thrown while executing put and clear concurrently",
+ ex);
+ }
+ }
+
+ userRegion.close();
+
+ await("Waiting for off-heap to be freed").atMost(10, TimeUnit.SECONDS).until(
+ () -> 0 == ((MemoryAllocatorImpl) cache.getOffHeapStore()).getOrphans(cache).size());
+ });
+ }
+
+ private void clearShadowBucketRegions(PartitionedRegion shadowRegion) {
+ PartitionedRegionDataStore.BucketVisitor bucketVisitor =
+ new PartitionedRegionDataStore.BucketVisitor() {
+ @Override
+ public void visit(Integer bucketId, Region r) {
+ ((BucketRegion) r).clearEntries(null);
+ }
+ };
+
+ shadowRegion.getDataStore().visitBuckets(bucketVisitor);
+ }
+
private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
boolean createAccessors, boolean startSenders) {
createSendersAndReceivers(lnPort, nyPort);