You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/08/30 23:17:44 UTC

[geode] branch develop updated: GEODE-5653: Synchronize and free GatewaySenderEvents in background th… (#2407)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1a62c5a  GEODE-5653: Synchronize and free GatewaySenderEvents in background th… (#2407)
1a62c5a is described below

commit 1a62c5a29f7d5a339803ea1125051e6a5d7d2c99
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Thu Aug 30 16:17:39 2018 -0700

    GEODE-5653: Synchronize and free GatewaySenderEvents in background th… (#2407)
    
       * Synchronize and free GatewaySenderEvents in background thread
       * Await off-heap release in test
---
 .../concurrent/CustomEntryConcurrentHashMap.java   | 174 +++++++++++----------
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 139 ++++++++++++++++
 2 files changed, 232 insertions(+), 81 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
index 8b3c958..9edb789 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/concurrent/CustomEntryConcurrentHashMap.java
@@ -60,6 +60,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.geode.CancelException;
 import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
+import org.apache.geode.internal.cache.tier.sockets.command.KeySet;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
 import org.apache.geode.internal.size.SingleObjectSizer;
@@ -111,6 +112,10 @@ import org.apache.geode.internal.util.ArrayUtils;
  * This class is a member of the <a href="{@docRoot} /../technotes/guides/collections/index.html">
  * Java Collections Framework</a>.
  *
+ * <p>
+ * Modifications made to the Java {@link java.util.Hashtable} implementation
+ * are indicated using comments to mark the beginning and end of Geode changes.
+ *
  * @since Java 1.5
  * @author Doug Lea
  * @param <K> the type of keys maintained by this map
@@ -164,14 +169,14 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
    */
   static final int RETRIES_BEFORE_LOCK = 2;
 
-  // GemStone addition
+  // Geode addition
   /**
    * Token object to indicate that {@link #remove(Object)} does not need to compare against provided
    * value before removing from segment.
    */
   private static final Object NO_OBJECT_TOKEN = new Object();
 
-  // End GemStone addition
+  // End Geode addition
 
   /* ---------------- Fields -------------- */
 
@@ -204,7 +209,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
   transient Set<K> keySet;
   transient Set<Map.Entry<K, V>> entrySet;
-  transient Set<Map.Entry<K, V>> reusableEntrySet; // GemStone addition
+  transient Set<Map.Entry<K, V>> reusableEntrySet; // Geode addition
   transient Collection<V> values;
 
   /* ---------------- Small Utilities -------------- */
@@ -242,8 +247,8 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
   /* ---------------- Inner Classes -------------- */
 
-  // GemStone addition
-  // GemStone changed HashEntry to be an interface with original HashEntry
+  // Geode addition
+  // Geode changed HashEntry to be an interface with original HashEntry
   // as the default implementation HashEntryImpl.
 
   /**
@@ -391,7 +396,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     int keyHashCode(Object key, boolean compareValues);
   }
 
-  // End GemStone addition
+  // End Geode addition
 
   /**
    * Segments are specialized versions of hash tables. This subclasses from ReentrantLock
@@ -462,7 +467,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
      */
     final float loadFactor;
 
-    // GemStone addition
+    // Geode addition
 
     /**
      * {@link org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntryCreator}
@@ -478,7 +483,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
      */
     final ReentrantReadWriteLock listUpdateLock;
 
-    // End GemStone addition
+    // End Geode addition
 
     Segment(final int initialCapacity, final float lf, final HashEntryCreator<K, V> entryCreator) {
       this.loadFactor = lf;
@@ -492,7 +497,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
       return new Segment[i];
     }
 
-    // GemStone added the method below
+    // Geode added the method below
     @SuppressWarnings("unchecked")
     static <K, V> HashEntry<K, V>[] newEntryArray(final int size) {
       return new HashEntry[size];
@@ -552,7 +557,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
     V get(final Object key, final int hash) {
       if (this.count != 0) { // read-volatile
-        // GemStone change to acquire the read lock on list updates
+        // Geode change to acquire the read lock on list updates
         final ReentrantReadWriteLock.ReadLock listLock = this.listUpdateLock.readLock();
         listLock.lock();
         boolean lockAcquired = true;
@@ -581,7 +586,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
     V getNoLock(final Object key, final int hash, final boolean lockListForRead) {
       if (this.count != 0) { // read-volatile
-        // GemStone change to acquire the read lock on list updates
+        // Geode change to acquire the read lock on list updates
         ReentrantReadWriteLock.ReadLock listLock = null;
         if (lockListForRead) {
           listLock = this.listUpdateLock.readLock();
@@ -606,7 +611,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
     boolean containsKey(final Object key, final int hash) {
       if (this.count != 0) { // read-volatile
-        // GemStone change to acquire the read lock on list updates
+        // Geode change to acquire the read lock on list updates
         final ReentrantReadWriteLock.ReadLock listLock = this.listUpdateLock.readLock();
         listLock.lock();
         HashEntry<K, V> e = getFirst(hash);
@@ -626,7 +631,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
     boolean containsValue(final Object value) {
       if (this.count != 0) { // read-volatile
-        // GemStone change to acquire the read lock on list updates
+        // Geode change to acquire the read lock on list updates
         ReentrantReadWriteLock.ReadLock readLock = this.listUpdateLock.readLock();
         RETRYLOOP: for (;;) {
           readLock.lock();
@@ -636,7 +641,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
             for (HashEntry<K, V> e = tab[i]; e != null; e = e.getNextEntry()) {
               V v = e.getMapValue();
               if (v == null) {
-                // GemStone changes BEGIN
+                // Geode changes BEGIN
                 // go back and retry from the very start with segment read lock
                 readLock.unlock();
                 readLock = super.readLock();
@@ -644,7 +649,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
                 /*
                  * (original code) v = readValueUnderLock(e);
                  */
-                // GemStone changes END
+                // Geode changes END
               }
               if (equalityCompare(value, v)) {
                 readLock.unlock();
@@ -733,7 +738,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
       }
     }
 
-    // GemStone additions
+    // Geode additions
 
     <C, P> V create(final K key, final int hash, final MapCallback<K, V, C, P> valueCreator,
         final C context, final P createParams, final boolean lockForRead) {
@@ -826,7 +831,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
       return null;
     }
 
-    // End GemStone additions
+    // End Geode additions
 
     void rehash() {
       final HashEntry<K, V>[] oldTable = this.table;
@@ -874,7 +879,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
             newTable[lastIdx] = lastRun;
 
             // Clone all remaining nodes
-            // GemStone changes BEGIN
+            // Geode changes BEGIN
             // update the next entry instead of cloning the nodes in newTable;
             // this is primarily because we don't want to change
             // the underlying RegionEntry that may be used elsewhere;
@@ -919,7 +924,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
              * p.hash & sizeMask; final HashEntry<K, V> n = newTable[k]; newTable[k] =
              * this.entryCreator.newEntry(p.key, p.hash, n, p.value); }
              */
-            // GemStone changes END
+            // Geode changes END
           }
         }
       }
@@ -929,11 +934,11 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     /**
      * Remove; match on key only if value null, else match both.
      */
-    // GemStone change
+    // Geode change
     // added "condition" and "removeParams" parameters
     <C, P> V remove(final Object key, final int hash, final Object value,
         final MapCallback<K, V, C, P> condition, final C context, final P removeParams) {
-      // End GemStone change
+      // End Geode change
       final ReentrantReadWriteLock.WriteLock writeLock = super.writeLock();
       writeLock.lock();
       try {
@@ -942,7 +947,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
         final int index = hash & (tab.length - 1);
         final HashEntry<K, V> first = tab[index];
         HashEntry<K, V> e = first;
-        // GemStone change
+        // Geode change
         // the entry previous to the matched one, if any
         HashEntry<K, V> p = null;
         while (e != null && (e.getEntryHash() != hash || !equalityKeyCompare(key, e))) {
@@ -957,17 +962,17 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
         V oldValue = null;
         if (e != null) {
           final V v = e.getMapValue();
-          // GemStone change
+          // Geode change
           // allow for passing in a null object for comparison during remove;
           // also invoke the provided condition to check for removal
           if ((value == NO_OBJECT_TOKEN || equalityCompareWithNulls(v, value))
               && (condition == null || condition.doRemoveValue(v, context, removeParams))) {
-            // End GemStone change
+            // End Geode change
             oldValue = v;
             // All entries following removed node can stay in list,
             // but all preceding ones need to be cloned.
             ++this.modCount;
-            // GemStone changes BEGIN
+            // Geode changes BEGIN
             // update the next entry instead of cloning the nodes
             // this is primarily because we don't want to change
             // the underlying RegionEntry that may be used elsewhere
@@ -987,7 +992,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
              * != e; p = p.next) { newFirst = this.entryCreator.newEntry(p.key, p.hash, newFirst,
              * p.value); } tab[index] = newFirst;
              */
-            // GemStone changes END
+            // Geode changes END
             this.count = c; // write-volatile
           }
         }
@@ -998,7 +1003,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     }
 
     /**
-     * GemStone added the clearedEntries param and the result
+     * Geode added the clearedEntries param and the result
      */
     ArrayList<HashEntry<?, ?>> clear(ArrayList<HashEntry<?, ?>> clearedEntries) {
       if (this.count != 0) {
@@ -1006,25 +1011,26 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
         writeLock.lock();
         try {
           final HashEntry<K, V>[] tab = this.table;
-          // GemStone changes BEGIN
-          boolean collectEntries = clearedEntries != null;
-          if (!collectEntries) {
-            // see if we have a map with off-heap region entries
-            for (HashEntry<K, V> he : tab) {
-              if (he != null) {
-                collectEntries = he instanceof OffHeapRegionEntry;
-                if (collectEntries) {
-                  clearedEntries = new ArrayList<HashEntry<?, ?>>();
+          // Geode changes BEGIN
+          if (clearedEntries == null) {
+            final boolean checkForGatewaySenderEvent =
+                OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap();
+            if (checkForGatewaySenderEvent) {
+              clearedEntries = new ArrayList<HashEntry<?, ?>>();
+            } else {
+              // see if we have a map with off-heap region entries
+              for (HashEntry<K, V> he : tab) {
+                if (he != null) {
+                  if (he instanceof OffHeapRegionEntry) {
+                    clearedEntries = new ArrayList<HashEntry<?, ?>>();
+                  }
+                  // after the first non-null entry we are done
+                  break;
                 }
-                // after the first non-null entry we are done
-                break;
               }
             }
           }
-          final boolean checkForGatewaySenderEvent =
-              OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap();
-          final boolean skipProcessOffHeap = !collectEntries && !checkForGatewaySenderEvent;
-          if (skipProcessOffHeap) {
+          if (clearedEntries == null) {
             Arrays.fill(tab, null);
           } else {
             for (int i = 0; i < tab.length; i++) {
@@ -1032,20 +1038,9 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
               if (he == null)
                 continue;
               tab[i] = null;
-              if (collectEntries) {
-                clearedEntries.add(he);
-              } else {
-                for (HashEntry<K, V> p = he; p != null; p = p.getNextEntry()) {
-                  if (p instanceof RegionEntry) {
-                    // It is ok to call GatewaySenderEventImpl release without being synced
-                    // on the region entry. It will not create an orphan.
-                    GatewaySenderEventImpl.release(((RegionEntry) p).getValue()); // OFFHEAP
-                                                                                  // _getValue ok
-                  }
-                }
-              }
+              clearedEntries.add(he);
             }
-            // GemStone changes END
+            // Geode changes END
           }
           ++this.modCount;
           this.count = 0; // write-volatile
@@ -1053,7 +1048,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
           writeLock.unlock();
         }
       }
-      return clearedEntries; // GemStone change
+      return clearedEntries; // Geode change
     }
   }
 
@@ -1116,7 +1111,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     this(initialCapacity, loadFactor, concurrencyLevel, false, null);
   }
 
-  // GemStone addition
+  // Geode addition
 
   /**
    * Creates a new, empty map with the specified initial capacity, load factor and concurrency
@@ -1223,7 +1218,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     }
   }
 
-  // End GemStone addition
+  // End Geode addition
 
   /**
    * Creates a new, empty map with the specified initial capacity and load factor and with the
@@ -1519,7 +1514,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     return segmentFor(hash).put(key, hash, value, true);
   }
 
-  // GemStone addition
+  // Geode addition
 
   /**
    * Create a given key, value mapping if the key does not exist in the map else do nothing. The
@@ -1708,7 +1703,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     return segmentFor(hash).remove(key, hash, NO_OBJECT_TOKEN, condition, context, removeParams);
   }
 
-  // End GemStone addition
+  // End Geode addition
 
   /**
    * Copies all of the mappings from the specified map to this one. These mappings replace any
@@ -1783,7 +1778,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     return segmentFor(hash).replace(key, hash, value);
   }
 
-  // GemStone addition
+  // Geode addition
   @Override
   public void clearWithExecutor(Executor executor) {
     ArrayList<HashEntry<?, ?>> entries = null;
@@ -1794,17 +1789,34 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     } finally {
       if (entries != null) {
         final ArrayList<HashEntry<?, ?>> clearedEntries = entries;
-        final Runnable runnable = new Runnable() {
-          public void run() {
-            for (HashEntry<?, ?> he : clearedEntries) {
-              for (HashEntry<?, ?> p = he; p != null; p = p.getNextEntry()) {
-                synchronized (p) {
-                  ((OffHeapRegionEntry) p).release();
+        Runnable runnable;
+        if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
+          runnable = new Runnable() {
+            public void run() {
+              for (HashEntry<?, ?> he : clearedEntries) {
+                for (HashEntry<?, ?> p = he; p != null; p = p.getNextEntry()) {
+                  if (p instanceof RegionEntry) {
+                    synchronized (p) {
+                      GatewaySenderEventImpl.release(((RegionEntry) p).getValue()); // OFFHEAP
+                    }
+                  }
                 }
               }
             }
-          }
-        };
+          };
+        } else {
+          runnable = new Runnable() {
+            public void run() {
+              for (HashEntry<?, ?> he : clearedEntries) {
+                for (HashEntry<?, ?> p = he; p != null; p = p.getNextEntry()) {
+                  synchronized (p) {
+                    ((OffHeapRegionEntry) p).release();
+                  }
+                }
+              }
+            }
+          };
+        }
         boolean submitted = false;
         if (executor != null) {
           try {
@@ -1835,7 +1847,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
   public void clear() {
     clearWithExecutor(null);
   }
-  // End GemStone addition
+  // End Geode addition
 
   /**
    * Returns a {@link Set} view of the keys contained in this map. The set is backed by the map, so
@@ -1895,7 +1907,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     return (es != null) ? es : (this.entrySet = new EntrySet(false));
   }
 
-  // GemStone addition
+  // Geode addition
 
   @Override
   public Set<Map.Entry<K, V>> entrySetWithReusableEntries() {
@@ -1903,7 +1915,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     return (es != null) ? es : (this.reusableEntrySet = new EntrySet(true));
   }
 
-  // End GemStone addition
+  // End Geode addition
 
   /**
    * Returns an enumeration of the keys in this table.
@@ -1933,7 +1945,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
     int nextTableIndex;
 
-    // GemStone changed HashEntry<K, V>[] currentTable to currentSegment
+    // Geode changed HashEntry<K, V>[] currentTable to currentSegment
     HashEntry<K, V>[] currentTable;
 
     HashEntry<K, V> nextEntry;
@@ -1957,7 +1969,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
     }
 
     void advance() {
-      // GemStone changes BEGIN
+      // Geode changes BEGIN
       if (this.currentListIndex < this.currentList.size()) {
         this.nextEntry = this.currentList.get(this.currentListIndex++);
         return;
@@ -1987,7 +1999,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
        * while (this.nextTableIndex >= 0) { if ((this.nextEntry =
        * this.currentTable[this.nextTableIndex--]) != null) { return; } }
        */
-      // GemStone changes END
+      // Geode changes END
 
       while (this.currentSegmentIndex > 0) {
         final Segment<K, V> seg =
@@ -2011,7 +2023,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
       }
     }
 
-    // GemStone added the method below
+    // Geode added the method below
     /**
      * Copy the tail of list of current matched entry ({@link #nextEntry}) to a temporary list, so
      * that the read lock can be released after the copy.
@@ -2083,7 +2095,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
     private static final long serialVersionUID = 1591026397367910439L;
 
-    protected K key; // GemStone change; made non-final to enable reuse
+    protected K key; // Geode change; made non-final to enable reuse
 
     private V value;
 
@@ -2243,7 +2255,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
   class EntryIterator extends HashIterator implements Iterator<Map.Entry<K, V>> {
 
-    // GemStone change
+    // Geode change
     // added possibility to reuse a single Map.Entry for entire iteration
     final WriteThroughEntry reusableEntry;
 
@@ -2260,7 +2272,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
       }
       return new WriteThroughEntry(e.getKey(), e.getMapValue());
     }
-    // End GemStone change
+    // End Geode change
   }
 
   class KeySet extends AbstractSet<K> {
@@ -2316,7 +2328,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
 
   class EntrySet extends AbstractSet<Map.Entry<K, V>> {
 
-    // GemStone change
+    // Geode change
     // added possibility to reuse a single Map.Entry for entire iteration
     final WriteThroughEntry reusableEntry;
 
@@ -2333,7 +2345,7 @@ public class CustomEntryConcurrentHashMap<K, V> extends AbstractMap<K, V>
       return new EntryIterator(this.reusableEntry);
     }
 
-    // End GemStone change
+    // End Geode change
 
     @Override
     public boolean contains(final Object o) {
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index b2a1971..df7bbb6 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -14,28 +14,53 @@
  */
 package org.apache.geode.internal.cache.wan.parallel;
 
+import static org.apache.geode.distributed.internal.DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME;
 import static org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import util.TestException;
 
 import org.apache.geode.GemFireIOException;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.internal.offheap.MemoryAllocatorImpl;
+import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
 import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.RMIException;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.categories.WanTest;
 
 /**
@@ -46,6 +71,9 @@ import org.apache.geode.test.junit.categories.WanTest;
 public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
 
   @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Rule
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
 
@@ -649,6 +677,117 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
     });
   }
 
+  @Test
+  public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans()
+      throws Exception {
+    MemberVM locator = clusterStartupRule.startLocatorVM(1, new Properties());
+    Properties properties = new Properties();
+    properties.put(OFF_HEAP_MEMORY_SIZE_NAME, "100");
+    MemberVM server = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    final String regionName = "portfolios";
+    final String gatewaySenderId = "ln";
+
+    server.invoke(() -> {
+      IgnoredException ie = addIgnoredException("could not get remote locator");
+      InternalCache cache = ClusterStartupRule.getCache();
+      GatewaySender sender =
+          cache.createGatewaySenderFactory().setParallel(true).create(gatewaySenderId, 1);
+      Region userRegion = cache.createRegionFactory(RegionShortcut.PARTITION).setOffHeap(true)
+          .addGatewaySenderId("ln").create(regionName);
+      PartitionedRegion shadowRegion = (PartitionedRegion) ((AbstractGatewaySender) sender)
+          .getEventProcessor().getQueue().getRegion();
+      CacheWriter mockCacheWriter = mock(CacheWriter.class);
+      CountDownLatch cacheWriterLatch = new CountDownLatch(1);
+      CountDownLatch shadowRegionClearLatch = new CountDownLatch(1);
+
+      doAnswer(invocation -> {
+        // The cache writer is invoked between when the region entry is created with value of
+        // Token.REMOVED_PHASE_1 and when it is replaced with the actual GatewaySenderEvent.
+        // We use this hook to trigger the clear logic when the token is still in the
+        // region entry.
+        cacheWriterLatch.countDown();
+        // Wait until the clear is complete before putting the actual GatewaySenderEvent in the
+        // region entry.
+        shadowRegionClearLatch.await();
+        return null;
+      }).when(mockCacheWriter).beforeCreate(any());
+
+      shadowRegion.setCacheWriter(mockCacheWriter);
+
+      ExecutorService service = Executors.newFixedThreadPool(2);
+
+      List<Callable<Object>> callables = new ArrayList<>();
+
+      // In one thread, we are going to put some test data in the user region,
+      // which will eventually put the GatewaySenderEvent into the shadow region
+      callables.add(Executors.callable(() -> {
+        try {
+          userRegion.put("testKey", "testValue");
+        } catch (Exception ex) {
+          throw new RuntimeException(ex);
+        }
+      }));
+
+      // In another thread, we are clear the shadow region's buckets. If the region entry is a
+      // Token.REMOVED_PHASE_1 at this time, a leak is possible. We can guarantee that the
+      // Token.REMOVED_PHASE_1 is present by using the mocked cache writer defined
+      // above.
+      callables.add(Executors.callable(() -> {
+        try {
+          OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
+            @Override
+            public void run() {
+              // Wait for the cache writer to be invoked to release this countdown latch.
+              // This guarantees that the region entry will contain a Token.REMOVED_PHASE_1.
+              try {
+                cacheWriterLatch.await();
+              } catch (InterruptedException e) {
+                throw new TestException(
+                    "Thread was interrupted while waiting for mocked cache writer to be invoked");
+              }
+
+              clearShadowBucketRegions(shadowRegion);
+
+              // Signal to the cache writer that the clear is complete and the put can continue.
+              shadowRegionClearLatch.countDown();
+            }
+          });
+        } catch (Exception ex) {
+          throw new RuntimeException(ex);
+        }
+      }));
+
+      List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+      for (Future<Object> future : futures) {
+        try {
+          future.get();
+        } catch (Exception ex) {
+          throw new TestException(
+              "Exception thrown while executing put and clear concurrently",
+              ex);
+        }
+      }
+
+      userRegion.close();
+
+      await("Waiting for off-heap to be freed").atMost(10, TimeUnit.SECONDS).until(
+          () -> 0 == ((MemoryAllocatorImpl) cache.getOffHeapStore()).getOrphans(cache).size());
+    });
+  }
+
+  private void clearShadowBucketRegions(PartitionedRegion shadowRegion) {
+    PartitionedRegionDataStore.BucketVisitor bucketVisitor =
+        new PartitionedRegionDataStore.BucketVisitor() {
+          @Override
+          public void visit(Integer bucketId, Region r) {
+            ((BucketRegion) r).clearEntries(null);
+          }
+        };
+
+    shadowRegion.getDataStore().visitBuckets(bucketVisitor);
+  }
+
   private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort,
       boolean createAccessors, boolean startSenders) {
     createSendersAndReceivers(lnPort, nyPort);